Hadoop Shuffle Combiner

HadoopHadoopBeginner
Practice Now

Introduction

Imagine a scenario where you are a talented engineer in charge of managing data in a space battle simulation system. Your goal is to optimize the performance of the system by implementing the Hadoop Shuffle Combiner technique in the MapReduce process. By utilizing the Combiner, you aim to reduce network traffic and improve overall efficiency in processing data during the simulation.


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL hadoop(("`Hadoop`")) -.-> hadoop/HadoopMapReduceGroup(["`Hadoop MapReduce`"]) hadoop/HadoopMapReduceGroup -.-> hadoop/shuffle_combiner("`Shuffle Combiner`") subgraph Lab Skills hadoop/shuffle_combiner -.-> lab-271904{{"`Hadoop Shuffle Combiner`"}} end

Write Mapper

In this step, you will write the Mapper class to process the input data and emit intermediate key-value pairs.

Open the terminal and follow the steps below to get started.

Change the user to hadoop and then switch to the home directory of the hadoop user:

su - hadoop

Create a Java file for the Mapper class:

nano /home/hadoop/SpaceBattleMapper.java

Then, add the following code to the SpaceBattleMapper.java file:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import java.io.IOException;

public class SpaceBattleMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // Split the input line into words
        String[] words = value.toString().split("\\s+");
        // Emit a key-value pair for each word
        for (String w : words) {
            word.set(w);
            context.write(word, one);
        }
    }
}

Tips: You can copy the code from the prompt box on the right and paste it with Ctrl + Shift + V into the open nano editor. Press Ctrl + O to save the file and Enter to confirm when prompted by the nano editor. Finally, use Ctrl + X to exit the editor.

The SpaceBattleMapper class extends the Mapper class from the Hadoop framework. It is used to process input data in the form of key-value pairs, where the key is a LongWritable representing the byte offset of the line in the input file, and the value is a Text object representing the line of text.

The class defines two private fields:

  • one: An IntWritable object with a constant value of 1. This is used as the value in the emitted key-value pairs.
  • word: A Text object used to store each word extracted from the input line.

The map method is overridden to provide the specific mapping logic:

  • The input Text value is converted to a string and split into words based on whitespace.
  • For each word in the array, the word object is set to that word, and a key-value pair is emitted with the word as the key and one as the value. This is done using the context.write method.

This Mapper class is designed to emit a key-value pair for each word in the input data, with the word as the key and the integer 1 as the value. This setup is commonly used in word count applications, where the goal is to count the occurrences of each word in a dataset.

Implement Combiner

In this step, you will implement the Combiner class to perform local aggregation before data shuffling.

Create a Java file for the Combiner class:

nano /home/hadoop/SpaceBattleCombiner.java

Then, add the following code to the SpaceBattleCombiner.java file:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class SpaceBattleCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        // Sum up the values for each key
        for (IntWritable val : values) {
            sum += val.get();
        }
        // Emit the key and the sum of its values
        context.write(key, new IntWritable(sum));
    }
}

The SpaceBattleCombiner class extends the Reducer class from the Hadoop framework. It is used as a combiner in the MapReduce process to perform local aggregation of intermediate key-value pairs emitted by the Mapper.

The class overrides the reduce method to provide the specific combiner logic:

  • The method takes a key of type Text and an iterable of values of type IntWritable as input. The key represents a word, and the iterable contains counts of occurrences of that word.
  • The method iterates over the values, summing them up to get the total count for the word.
  • Finally, the method emits a key-value pair with the word as the key and the total count as the value using the context.write method.

The purpose of the SpaceBattleCombiner is to perform local aggregation of the counts for each word before the data is shuffled across the network to the Reducer. This reduces the amount of data transferred between the Mapper and Reducer phases, improving the efficiency of the MapReduce job.

Implement Reducer

In this step, you will implement the Reducer class to perform the final aggregation of the key-value pairs.

  1. Create a Java file for the Reducer class:
nano /home/hadoop/SpaceBattleReducer.java

Then, add the following code to the SpaceBattleReducer.java file:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class SpaceBattleReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

The SpaceBattleReducer class extends the Reducer class from the Hadoop framework. It is used to perform the final aggregation of the intermediate key-value pairs emitted by the Mapper and optionally processed by the Combiner.

The class overrides the reduce method to provide the specific reducer logic:

  • The method takes a key of type Text and an iterable of values of type IntWritable as input. The key represents a word, and the iterable contains counts of occurrences of that word.
  • The method iterates over the values, summing them up to get the total count for the word.
  • Finally, the method emits a key-value pair with the word as the key and the total count as the value using the context.write method.

The SpaceBattleReducer performs the final aggregation of the data, summing up the counts for each word across all the Mapper outputs. This provides the final count of occurrences for each word in the input data.

Write Driver

In this step, you will create a Java file to manage the MapReduce job, including setting up the job configuration and specifying the Mapper, Combiner, and Reducer classes.

Create a Java file for the Driver class:

nano /home/hadoop/SpaceBattleDriver.java

Then, add the following code to the SpaceBattleDriver.java file:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SpaceBattleDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Space Battle Simulation");
        job.setJarByClass(SpaceBattleDriver.class);
        job.setMapperClass(SpaceBattleMapper.class);
        job.setCombinerClass(SpaceBattleCombiner.class);
        job.setReducerClass(SpaceBattleReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

The SpaceBattleDriver class is responsible for configuring and running the MapReduce job for the space battle simulation.

  • The class starts by creating a new Configuration object and a Job instance with this configuration. The job is given a name, "Space Battle Simulation," for identification.

  • The setJarByClass method is called with the SpaceBattleDriver class to set the jar file that contains the classes needed for the job.

  • The setMapperClass, setCombinerClass, and setReducerClass methods are used to specify the classes that will perform the mapping, combining, and reducing tasks, respectively.

  • The setOutputKeyClass and setOutputValueClass methods define the types of the output key and value, which are Text and IntWritable in this case.

  • The FileInputFormat.addInputPath and FileOutputFormat.setOutputPath methods set the paths for the input and output data. These paths are passed as command-line arguments to the main method.

  • Finally, the job.waitForCompletion method is called to submit the job and wait for its completion. The method returns true if the job completes successfully and false otherwise. The program exits with a status code of 0 if the job is successful and 1 if it is not.

This driver class ties together all the components of the MapReduce job and is the entry point for running the job.

Summary

In this lab, you walked through the implementation of the Hadoop Shuffle Combiner technique in a space battle simulation scenario. By following the steps to create Mapper, Combiner, Reducer, and Driver classes, you gained hands-on experience in optimizing data processing in a Hadoop MapReduce environment. This lab aimed to enhance your understanding of reducing network overhead and improving computational efficiency in big data processing tasks.

Other Hadoop Tutorials you may like