Hadoop Shuffle Combiner

HadoopHadoopBeginner
立即练习

💡 本教程由 AI 辅助翻译自英文原版。如需查看原文,您可以 切换至英文原版

介绍

想象一个场景,你是一位才华横溢的工程师,负责管理一个太空战斗模拟系统中的数据。你的目标是通过在 MapReduce 过程中实现 Hadoop Shuffle Combiner 技术来优化系统性能。通过使用 Combiner,你旨在减少网络流量,并提高模拟过程中数据处理的总效率。


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL hadoop(("`Hadoop`")) -.-> hadoop/HadoopMapReduceGroup(["`Hadoop MapReduce`"]) hadoop(("`Hadoop`")) -.-> hadoop/HadoopYARNGroup(["`Hadoop YARN`"]) hadoop(("`Hadoop`")) -.-> hadoop/HadoopHiveGroup(["`Hadoop Hive`"]) hadoop/HadoopMapReduceGroup -.-> hadoop/setup_jobs("`Setting up MapReduce Jobs`") hadoop/HadoopMapReduceGroup -.-> hadoop/mappers_reducers("`Coding Mappers and Reducers`") hadoop/HadoopMapReduceGroup -.-> hadoop/shuffle_combiner("`Shuffle Combiner`") hadoop/HadoopYARNGroup -.-> hadoop/yarn_jar("`Yarn Commands jar`") hadoop/HadoopHiveGroup -.-> hadoop/process("`Process Control Function`") hadoop/HadoopHiveGroup -.-> hadoop/aggregating("`Aggregating Function`") hadoop/HadoopHiveGroup -.-> hadoop/udf("`User Defined Function`") subgraph Lab Skills hadoop/setup_jobs -.-> lab-271904{{"`Hadoop Shuffle Combiner`"}} hadoop/mappers_reducers -.-> lab-271904{{"`Hadoop Shuffle Combiner`"}} hadoop/shuffle_combiner -.-> lab-271904{{"`Hadoop Shuffle Combiner`"}} hadoop/yarn_jar -.-> lab-271904{{"`Hadoop Shuffle Combiner`"}} hadoop/process -.-> lab-271904{{"`Hadoop Shuffle Combiner`"}} hadoop/aggregating -.-> lab-271904{{"`Hadoop Shuffle Combiner`"}} hadoop/udf -.-> lab-271904{{"`Hadoop Shuffle Combiner`"}} end

编写 Mapper

在这一步中,你将编写 Mapper 类来处理输入数据并生成中间键值对。

打开终端并按照以下步骤开始操作。

将用户切换为 hadoop,然后切换到 hadoop 用户的主目录:

su - hadoop

为 Mapper 类创建一个 Java 文件:

nano /home/hadoop/SpaceBattleMapper.java

然后,将以下代码添加到 SpaceBattleMapper.java 文件中:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import java.io.IOException;

public class SpaceBattleMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 将输入行拆分为单词
        String[] words = value.toString().split("\\s+");
        // 为每个单词生成一个键值对
        for (String w : words) {
            word.set(w);
            context.write(word, one);
        }
    }
}

提示:你可以从右侧的提示框中复制代码,并使用 Ctrl + Shift + V 粘贴到打开的 nano 编辑器中。按 Ctrl + O 保存文件,并在 nano 编辑器提示时按 Enter 确认。最后,使用 Ctrl + X 退出编辑器。

SpaceBattleMapper 类扩展了 Hadoop 框架中的 Mapper 类。它用于处理键值对形式的输入数据,其中键是一个 LongWritable,表示输入文件中行的字节偏移量,值是一个 Text 对象,表示文本行。

该类定义了两个私有字段:

  • one:一个值为 1 的 IntWritable 对象。它用作生成的键值对中的值。
  • word:一个 Text 对象,用于存储从输入行中提取的每个单词。

map 方法被重写以提供具体的映射逻辑:

  • 输入的 Text 值被转换为字符串,并根据空白字符拆分为单词。
  • 对于数组中的每个单词,word 对象被设置为该单词,并使用 context.write 方法生成一个键值对,其中单词作为键,one 作为值。

这个 Mapper 类的设计是为输入数据中的每个单词生成一个键值对,其中单词作为键,整数 1 作为值。这种设置通常用于词频统计应用中,目标是统计数据集中每个单词的出现次数。

实现 Combiner

在这一步中,你将实现 Combiner 类以在数据洗牌(shuffling)之前执行本地聚合。

为 Combiner 类创建一个 Java 文件:

nano /home/hadoop/SpaceBattleCombiner.java

然后,将以下代码添加到 SpaceBattleCombiner.java 文件中:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class SpaceBattleCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        // 对每个键的值进行求和
        for (IntWritable val : values) {
            sum += val.get();
        }
        // 输出键及其值的总和
        context.write(key, new IntWritable(sum));
    }
}

SpaceBattleCombiner 类扩展了 Hadoop 框架中的 Reducer 类。它在 MapReduce 过程中用作 Combiner,用于对 Mapper 生成的中间键值对进行本地聚合。

该类重写了 reduce 方法以提供具体的 Combiner 逻辑:

  • 该方法接收一个 Text 类型的键和一个 IntWritable 类型的可迭代值作为输入。键表示一个单词,可迭代值包含该单词的出现次数。
  • 该方法遍历值,将它们相加以得到该单词的总计数。
  • 最后,该方法使用 context.write 方法输出一个键值对,其中键为单词,值为总计数。

SpaceBattleCombiner 的目的是在数据通过网络传输到 Reducer 之前,对每个单词的计数进行本地聚合。这减少了 Mapper 和 Reducer 阶段之间传输的数据量,从而提高了 MapReduce 作业的效率。

实现 Reducer

在这一步中,你将实现 Reducer 类以对键值对进行最终聚合。

  1. 为 Reducer 类创建一个 Java 文件:
nano /home/hadoop/SpaceBattleReducer.java

然后,将以下代码添加到 SpaceBattleReducer.java 文件中:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class SpaceBattleReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

SpaceBattleReducer 类扩展了 Hadoop 框架中的 Reducer 类。它用于对 Mapper 生成的中间键值对进行最终聚合,这些键值对可能已经由 Combiner 处理过。

该类重写了 reduce 方法以提供具体的 Reducer 逻辑:

  • 该方法接收一个 Text 类型的键和一个 IntWritable 类型的可迭代值作为输入。键表示一个单词,可迭代值包含该单词的出现次数。
  • 该方法遍历值,将它们相加以得到该单词的总计数。
  • 最后,该方法使用 context.write 方法输出一个键值对,其中键为单词,值为总计数。

SpaceBattleReducer 对数据进行最终聚合,汇总所有 Mapper 输出中每个单词的计数。这提供了输入数据中每个单词的最终出现次数。

编写 Driver

在这一步中,你将创建一个 Java 文件来管理 MapReduce 作业,包括设置作业配置以及指定 Mapper、Combiner 和 Reducer 类。

为 Driver 类创建一个 Java 文件:

nano /home/hadoop/SpaceBattleDriver.java

然后,将以下代码添加到 SpaceBattleDriver.java 文件中:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SpaceBattleDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Space Battle Simulation");
        job.setJarByClass(SpaceBattleDriver.class);
        job.setMapperClass(SpaceBattleMapper.class);
        job.setCombinerClass(SpaceBattleCombiner.class);
        job.setReducerClass(SpaceBattleReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

SpaceBattleDriver 类负责配置和运行太空战斗模拟的 MapReduce 作业。

  • 该类首先创建一个新的 Configuration 对象和一个带有此配置的 Job 实例。作业被赋予一个名称“Space Battle Simulation”以便识别。

  • 调用 setJarByClass 方法并传入 SpaceBattleDriver 类,以设置包含作业所需类的 jar 文件。

  • setMapperClasssetCombinerClasssetReducerClass 方法分别用于指定执行映射、组合和归约任务的类。

  • setOutputKeyClasssetOutputValueClass 方法定义了输出键和值的类型,在本例中为 TextIntWritable

  • FileInputFormat.addInputPathFileOutputFormat.setOutputPath 方法设置输入和输出数据的路径。这些路径作为命令行参数传递给主方法。

  • 最后,调用 job.waitForCompletion 方法提交作业并等待其完成。如果作业成功完成,该方法返回 true,否则返回 false。如果作业成功,程序以状态码 0 退出;如果失败,则以状态码 1 退出。

这个 Driver 类将 MapReduce 作业的所有组件结合在一起,是运行作业的入口点。

总结

在本实验中,你通过一个太空战斗模拟场景,逐步实现了 Hadoop Shuffle Combiner 技术。通过创建 Mapper、Combiner、Reducer 和 Driver 类的步骤,你获得了在 Hadoop MapReduce 环境中优化数据处理的实践经验。本实验旨在增强你对减少网络开销和提高大数据处理任务计算效率的理解。

您可能感兴趣的其他 Hadoop 教程