神秘的 Hadoop 排序奥秘

HadoopHadoopBeginner
立即练习

💡 本教程由 AI 辅助翻译自英文原版。如需查看原文,您可以 切换至英文原版

引言

在一个神秘的夜市中,一位戴着华丽面具的迷人身影优雅地穿梭在熙熙攘攘的人群中。这位神秘的面具舞者似乎拥有一种隐秘的力量,随着每一次旋转和摆动,她都能轻松地将混乱的摊位整理得井然有序。你的目标是通过掌握 Hadoop Shuffle Comparable 的艺术,揭开这种非凡才能背后的奥秘。


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL hadoop(("Hadoop")) -.-> hadoop/HadoopHiveGroup(["Hadoop Hive"]) hadoop(("Hadoop")) -.-> hadoop/HadoopMapReduceGroup(["Hadoop MapReduce"]) hadoop(("Hadoop")) -.-> hadoop/HadoopYARNGroup(["Hadoop YARN"]) hadoop/HadoopMapReduceGroup -.-> hadoop/setup_jobs("Setting up MapReduce Jobs") hadoop/HadoopMapReduceGroup -.-> hadoop/mappers_reducers("Coding Mappers and Reducers") hadoop/HadoopMapReduceGroup -.-> hadoop/shuffle_partitioner("Shuffle Partitioner") hadoop/HadoopMapReduceGroup -.-> hadoop/shuffle_comparable("Shuffle Comparable") hadoop/HadoopMapReduceGroup -.-> hadoop/shuffle_combiner("Shuffle Combiner") hadoop/HadoopYARNGroup -.-> hadoop/yarn_jar("Yarn Commands jar") hadoop/HadoopHiveGroup -.-> hadoop/process("Process Control Function") hadoop/HadoopHiveGroup -.-> hadoop/udf("User Defined Function") subgraph Lab Skills hadoop/setup_jobs -.-> lab-288996{{"神秘的 Hadoop 排序奥秘"}} hadoop/mappers_reducers -.-> lab-288996{{"神秘的 Hadoop 排序奥秘"}} hadoop/shuffle_partitioner -.-> lab-288996{{"神秘的 Hadoop 排序奥秘"}} hadoop/shuffle_comparable -.-> lab-288996{{"神秘的 Hadoop 排序奥秘"}} hadoop/shuffle_combiner -.-> lab-288996{{"神秘的 Hadoop 排序奥秘"}} hadoop/yarn_jar -.-> lab-288996{{"神秘的 Hadoop 排序奥秘"}} hadoop/process -.-> lab-288996{{"神秘的 Hadoop 排序奥秘"}} hadoop/udf -.-> lab-288996{{"神秘的 Hadoop 排序奥秘"}} end

实现 Mapper

在这一步中,我们将创建一个自定义的 Mapper 类来处理输入数据并生成键值对。键将是一个复合键,包含两个字段:每个单词的第一个字符和单词的长度。值将是单词本身。

首先,将用户切换到 hadoop,然后切换到 hadoop 用户的主目录:

su - hadoop

接着,为 Mapper 类创建一个 Java 文件:

touch /home/hadoop/WordLengthMapper.java

将以下代码添加到 WordLengthMapper.java 文件中:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordLengthMapper extends Mapper<LongWritable, Text, CompositeKey, Text> {

    private CompositeKey compositeKey = new CompositeKey();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split("\\s+");

        for (String word : words) {
            compositeKey.setFirstChar(word.charAt(0));
            compositeKey.setLength(word.length());
            context.write(compositeKey, new Text(word));
        }
    }
}

在上述代码中,我们创建了一个 WordLengthMapper 类,它继承了 Hadoop MapReduce API 中的 Mapper 类。map 方法接收一个 LongWritable 键(表示输入行的字节偏移量)和一个 Text 值(输入行本身)。然后,它将输入行拆分为单个单词,为每个单词创建一个 CompositeKey 对象(包含单词的第一个字符和长度),并将 CompositeKey 作为键,单词作为值输出。

实现 CompositeKey

在这一步中,我们将创建一个自定义的 CompositeKey 类,该类实现了 Hadoop MapReduce API 中的 WritableComparable 接口。这个类将作为我们 MapReduce 作业中的键,使我们能够根据每个单词的第一个字符和长度对数据进行排序和分组。

首先,为 CompositeKey 类创建一个 Java 文件:

touch /home/hadoop/CompositeKey.java

然后,将以下代码添加到 CompositeKey.java 文件中:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class CompositeKey implements WritableComparable<CompositeKey> {

    private char firstChar;
    private int length;

    public CompositeKey() {
    }

    public void setFirstChar(char firstChar) {
        this.firstChar = firstChar;
    }

    public char getFirstChar() {
        return firstChar;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public int getLength() {
        return length;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeChar(firstChar);
        out.writeInt(length);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        firstChar = in.readChar();
        length = in.readInt();
    }

    @Override
    public int compareTo(CompositeKey other) {
        int cmp = Character.compare(firstChar, other.firstChar);
        if (cmp != 0) {
            return cmp;
        }
        return Integer.compare(length, other.length);
    }

    @Override
    public int hashCode() {
        return firstChar + length;
    }

    @Override
    public boolean equals(Object obj) {
        if (obj instanceof CompositeKey) {
            CompositeKey other = (CompositeKey) obj;
            return firstChar == other.firstChar && length == other.length;
        }
        return false;
    }

    @Override
    public String toString() {
        return firstChar + ":" + length;
    }
}

在上述代码中,我们创建了一个 CompositeKey 类,它实现了 WritableComparable 接口。该类有两个字段:firstChar(单词的第一个字符)和 length(单词的长度)。该类为这些字段提供了 getter 和 setter 方法,并实现了 WritableComparable 接口所需的 writereadFieldscompareTohashCodeequalstoString 方法。

compareTo 方法尤为重要,因为它定义了 MapReduce 作业中键的排序方式。在我们的实现中,我们首先比较两个键的 firstChar 字段。如果它们不同,则返回比较结果。如果 firstChar 字段相同,则比较 length 字段。

实现 Reducer

在这一步中,我们将创建一个自定义的 Reducer 类来处理 Mapper 生成的键值对,并生成最终输出。

首先,为 Reducer 类创建一个 Java 文件:

touch /home/hadoop/WordLengthReducer.java

然后,将以下代码添加到 WordLengthReducer.java 文件中:

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordLengthReducer extends Reducer<CompositeKey, Text, CompositeKey, Text> {

    public void reduce(CompositeKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        for (Text value : values) {
            sb.append(value.toString()).append(", ");
        }
        sb.setLength(sb.length() - 2);
        context.write(key, new Text(sb.toString()));
    }
}

在上述代码中,我们创建了一个 WordLengthReducer 类,它继承了 Hadoop MapReduce API 中的 Reducer 类。reduce 方法接收一个 CompositeKey 键(包含单词的第一个字符和长度)和一个 Text 值的 Iterable(与键匹配的单词)。

reduce 方法内部,我们将所有与键匹配的单词连接成一个逗号分隔的字符串。我们使用 StringBuilder 来高效地构建输出字符串,并在将键值对写入输出之前移除末尾的逗号和空格。

实现 Driver

在这一步中,我们将创建一个 Driver 类来配置并运行 MapReduce 作业。

首先,为 Driver 类创建一个 Java 文件:

touch /home/hadoop/WordLengthDriver.java

然后,将以下代码添加到 WordLengthDriver.java 文件中:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordLengthDriver {

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: WordLengthDriver <input> <output>");
            System.exit(1);
        }

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Word Length");

        job.setJarByClass(WordLengthDriver.class);
        job.setMapperClass(WordLengthMapper.class);
        job.setReducerClass(WordLengthReducer.class);
        job.setOutputKeyClass(CompositeKey.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

在上述代码中,我们创建了一个 WordLengthDriver 类,它作为我们 MapReduce 作业的入口点。main 方法接收两个命令行参数:作业的输入路径和输出路径。

main 方法内部,我们创建了一个新的 Configuration 对象和一个新的 Job 对象。我们通过设置 mapper 和 reducer 类、输出键和值类以及输入和输出路径来配置作业。

最后,我们提交作业并等待其完成。如果作业成功完成,我们以状态码 0 退出;否则,以状态码 1 退出。

要运行作业,可以使用以下命令:

javac -source 8 -target 8 -classpath "/home/hadoop/:/home/hadoop/hadoop/share/hadoop/common/hadoop-common-3.3.6.jar:/home/hadoop/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.6.jar:/home/hadoop/hadoop/share/hadoop/common/lib/*" -d /home/hadoop /home/hadoop/WordLengthMapper.java /home/hadoop/CompositeKey.java /home/hadoop/WordLengthReducer.java /home/hadoop/WordLengthDriver.java
jar cvf word-length.jar *.class
hadoop jar word-length.jar WordLengthDriver /input /output

最后,可以通过运行以下命令检查结果:

hadoop fs -cat /output/*

示例输出:

A:3 Amr
A:6 AADzCv
A:10 AlGyQumgIl
...
h:7 hgQUIhA
h:8 hyrjMGbY, hSElGKux
h:10 hmfHJjCkwB
...
z:6 zkpRCN
z:8 zfMHRbtk
z:9 zXyUuLHma

总结

在本实验中,我们通过实现一个 MapReduce 作业,探索了 Hadoop Shuffle Comparable 的概念。该作业根据单词的第一个字符和长度对单词进行分组。我们创建了一个自定义的 Mapper 来生成带有复合键的键值对,一个实现了 WritableComparable 接口的自定义 CompositeKey 类,一个用于连接具有相同键的单词的 Reducer,以及一个用于配置和运行作业的 Driver 类。

通过本实验,我对 Hadoop MapReduce 框架有了更深入的理解,并认识到自定义数据类型和排序在分布式计算中的重要性。通过掌握 Hadoop Shuffle Comparable,我们可以设计高效的数据处理和分析算法,释放大数据的潜力,就像神秘的舞者整理混乱的夜市摊位一样。