幽灵数据转换之旅

HadoopHadoopBeginner
立即练习

💡 本教程由 AI 辅助翻译自英文原版。如需查看原文,您可以 切换至英文原版

介绍

在本实验中,你将学习如何在 Hadoop MapReduce 中自定义输出和输入格式,以高效处理数据。通过 Ghost Tutor 的指导,你将掌握处理不同类型数据的技能,并充分发挥 Hadoop 生态系统的潜力。准备好踏上这段激动人心的旅程,掌握在超自然领域中的计算艺术吧!


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL hadoop(("`Hadoop`")) -.-> hadoop/HadoopMapReduceGroup(["`Hadoop MapReduce`"]) hadoop(("`Hadoop`")) -.-> hadoop/HadoopYARNGroup(["`Hadoop YARN`"]) hadoop(("`Hadoop`")) -.-> hadoop/HadoopHiveGroup(["`Hadoop Hive`"]) hadoop/HadoopMapReduceGroup -.-> hadoop/setup_jobs("`Setting up MapReduce Jobs`") hadoop/HadoopMapReduceGroup -.-> hadoop/mappers_reducers("`Coding Mappers and Reducers`") hadoop/HadoopMapReduceGroup -.-> hadoop/handle_io_formats("`Handling Output Formats and Input Formats`") hadoop/HadoopYARNGroup -.-> hadoop/yarn_jar("`Yarn Commands jar`") hadoop/HadoopHiveGroup -.-> hadoop/import_export_data("`Importing and Exporting Data`") hadoop/HadoopHiveGroup -.-> hadoop/process("`Process Control Function`") hadoop/HadoopHiveGroup -.-> hadoop/storage_formats("`Choosing Storage Formats`") hadoop/HadoopHiveGroup -.-> hadoop/integration("`Integration with HDFS and MapReduce`") subgraph Lab Skills hadoop/setup_jobs -.-> lab-288974{{"`幽灵数据转换之旅`"}} hadoop/mappers_reducers -.-> lab-288974{{"`幽灵数据转换之旅`"}} hadoop/handle_io_formats -.-> lab-288974{{"`幽灵数据转换之旅`"}} hadoop/yarn_jar -.-> lab-288974{{"`幽灵数据转换之旅`"}} hadoop/import_export_data -.-> lab-288974{{"`幽灵数据转换之旅`"}} hadoop/process -.-> lab-288974{{"`幽灵数据转换之旅`"}} hadoop/storage_formats -.-> lab-288974{{"`幽灵数据转换之旅`"}} hadoop/integration -.-> lab-288974{{"`幽灵数据转换之旅`"}} end

编写 Mapper 和 Reducer

在这一步中,我们将深入 Hadoop MapReduce 的核心,创建自己的 Mapper 和 Reducer 类。

探索数据文件

首先,使用 su - hadoop 命令切换身份。数据文件 data.txt 存储在 HDFS 的 /user/hadoop/input 目录中,其中存储了一些人的对话内容,使用以下命令查看内容:

hdfs dfs -cat /user/hadoop/input/data.txt

自定义 Mapper

接下来,我们将创建一个名为 WordCountMapper 的自定义 Mapper 类,它继承自 Mapper。这个 Mapper 将处理输入数据并发出键值对,需要注意的是,处理的数据是每行对话的内容,而不是人名。参考以下示例,补充 /home/hadoop/ 目录下 WordCountMapper.java 中的 map 方法。

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    // 将 Text 对象转换为 String
    String line = value.toString();
    // 使用 StringTokenizer 对字符串进行分词
    StringTokenizer tokenizer = new StringTokenizer(line);
    // 遍历每个 token 并将其写入上下文
    while (tokenizer.hasMoreTokens()) {
        // 设置当前单词
        word.set(tokenizer.nextToken().trim());
        // 将单词及其计数写入上下文
        context.write(word, one);
    }
}

然后使用以下命令以 Java 8 版本编译代码:

javac -source 8 -target 8 -cp $HADOOP_HOME/share/hadoop/common/hadoop-common-3.3.6.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.6.jar:. WordCountMapper.java

自定义 Reducer

最后,我们将创建一个名为 WordCountReducer 的自定义 Reducer 类,它继承自 Reducer。这个 Reducer 将聚合每个键的值并发出最终结果。参考以下示例,补充 /home/hadoop/ 目录下 WordCountReducer.java 中的 reduce 方法。

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {
    // 初始化一个变量来存储每个单词的计数总和
    int sum = 0;
    // 遍历当前单词的所有计数并计算总和
    for (IntWritable value : values) {
        sum += value.get();
    }
    // 设置当前单词的最终计数
    result.set(sum);
    // 将单词及其最终计数写入上下文
    context.write(key, result);
}

然后使用以下命令以 Java 8 版本编译代码:

javac -source 8 -target 8 -cp $HADOOP_HOME/share/hadoop/common/hadoop-common-3.3.6.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.6.jar:. WordCountReducer.java

编写自定义输入和输出格式

在这一步中,我们将为 MapReduce 任务指定输入和输出格式。

自定义输入格式

首先,让我们创建一个自定义输入格式,以从特定源读取数据。我们将定义一个名为 PersonInputFormat 的类,它继承自 TextInputFormat,并重写 getCurrentValue 方法来处理输入格式。参考以下示例,补充 /home/hadoop/ 目录下 PersonInputFormat.java 中的 getCurrentValue 方法。

@Override
public synchronized Text getCurrentValue() {
    // 返回当前记录的值,根据 ":" 分割,去除首尾空格并设置为 Text 对象
    Text value = new Text();
    Text line = super.getCurrentValue();
    String[] parts = line.toString().split(":");
    if (parts.length == 2) {
        value.set(parts[1].trim());
    }
    return value;
}

然后使用以下命令以 Java 8 版本编译代码:

javac -source 8 -target 8 -cp $HADOOP_HOME/share/hadoop/common/hadoop-common-3.3.6.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.6.jar:. PersonInputFormat.java

自定义输出格式

接下来,让我们创建一个自定义输出格式。我们将定义一个名为 CSVOutputFormat 的类,它继承自 TextOutputFormat。参考以下示例,补充 /home/hadoop/ 目录下 CSVOutputFormat.java 中的 write 方法。

// 以 CSV 格式将键值对写入输出流
@Override
public void write(Text key, IntWritable value) throws IOException, InterruptedException {
    out.writeBytes(key.toString() + "," + value.toString() + "\n");
}

然后使用以下命令以 Java 8 版本编译代码:

javac -source 8 -target 8 -cp $HADOOP_HOME/share/hadoop/common/hadoop-common-3.3.6.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.6.jar:. CSVOutputFormat.java

与 Driver 集成

在这最后一步中,我们将修改 WordCountDriver 类,以使用我们之前创建的自定义输入和输出格式。

自定义 Driver

参考以下示例,补充 /home/hadoop/ 目录下 WordCountDriver.java 中的 main 函数。

// 设置 Mapper 和 Reducer 类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

// 将输入格式类设置为自定义输入格式 PersonInputFormat
job.setInputFormatClass(PersonInputFormat.class);

// 将输出格式类设置为自定义输出格式 CSVOutputFormat
job.setOutputFormatClass(CSVOutputFormat.class);

执行任务

要使用自定义输入和输出格式执行 Hadoop MapReduce 任务,请按照以下步骤操作:

  1. 使用适当的 Hadoop 依赖项编译 WordCountDriver.java。

    javac -source 8 -target 8 -cp $HADOOP_HOME/share/hadoop/common/hadoop-common-3.3.6.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.6.jar:. WordCountDriver.java
  2. 创建一个包含已编译类的 JAR 文件。

    jar -cvf mywordcount.jar *.class
  3. 使用适当的输入和输出路径运行 WordCountDriver 类。确保输出路径之前不存在。

    hadoop jar ./mywordcount.jar WordCountDriver /user/hadoop/input /output

此命令将使用我们定义的自定义输入和输出格式执行 Hadoop MapReduce 任务。

查看输出结果

使用以下命令检查结果文件是否成功生成:

hdfs dfs -ls /output
hdfs dfs -cat /output/part.csv

总结

恭喜你!在本实验中,你在 Ghost Tutor 的指导下,成功探索了 Hadoop MapReduce 的复杂性,并掌握了输出和输入格式的自定义。从创建 mapper 和 reducer 类到克服诸如处理不可分割输入文件等挑战,你获得了宝贵的经验。通过动手实践,你加深了对 Hadoop MapReduce 功能的理解,现在可以自信地应对数据挑战。这段旅程不仅提升了你的技术能力,还拓宽了你对 Hadoop MapReduce 在大数据处理中潜力的认识。

您可能感兴趣的其他 Hadoop 教程