神秘的な Hadoop ソートの秘密

HadoopBeginner
オンラインで実践に進む

はじめに

不思議な夜市で、華やかな仮面を被った魅力的な人物が賑やかな人混みの中を優雅に移動しています。この謎めいた仮面舞踏者は、何か秘密の力を持っているようで、ひとつひとつの回転と揺れで、混乱した屋台を簡単に整然と並べ替えています。あなたの目標は、Hadoop Shuffle Comparable の技術を習得することで、この素晴らしい才能の裏にある謎を解き明かすことです。

マッパーを実装する

このステップでは、入力データを処理してキーと値のペアを emit するカスタム Mapper クラスを作成します。キーは、2 つのフィールドから構成される複合キーになります。1 つ目は各単語の最初の文字で、2 つ目は単語の長さです。値は単語自体になります。

まず、ユーザーを hadoop に変更してから、hadoop ユーザーのホームディレクトリに切り替えます。

su - hadoop

次に、Mapper クラス用の Java ファイルを作成します。

touch /home/hadoop/WordLengthMapper.java

WordLengthMapper.java ファイルに以下のコードを追加します。

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordLengthMapper extends Mapper<LongWritable, Text, CompositeKey, Text> {

    private CompositeKey compositeKey = new CompositeKey();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split("\\s+");

        for (String word : words) {
            compositeKey.setFirstChar(word.charAt(0));
            compositeKey.setLength(word.length());
            context.write(compositeKey, new Text(word));
        }
    }
}

上記のコードでは、Hadoop MapReduce API の Mapper クラスを拡張する WordLengthMapper クラスを作成しています。map メソッドは、LongWritable 型のキー(入力行のバイトオフセットを表す)と Text 型の値(入力行自体)を受け取ります。そして、入力行を個々の単語に分割し、各単語に対して CompositeKey オブジェクトを作成し(単語の最初の文字と長さを含む)、CompositeKey をキーとして、単語を値として emit します。

複合キーを実装する

このステップでは、Hadoop MapReduce API の WritableComparable インターフェイスを実装するカスタム CompositeKey クラスを作成します。このクラスは、MapReduce ジョブでキーとして使用され、各単語の最初の文字と長さに基づいてデータをソートおよびグループ化することができます。

まず、CompositeKey クラス用の Java ファイルを作成します。

touch /home/hadoop/CompositeKey.java

次に、CompositeKey.java ファイルに以下のコードを追加します。

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class CompositeKey implements WritableComparable<CompositeKey> {

    private char firstChar;
    private int length;

    public CompositeKey() {
    }

    public void setFirstChar(char firstChar) {
        this.firstChar = firstChar;
    }

    public char getFirstChar() {
        return firstChar;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public int getLength() {
        return length;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeChar(firstChar);
        out.writeInt(length);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        firstChar = in.readChar();
        length = in.readInt();
    }

    @Override
    public int compareTo(CompositeKey other) {
        int cmp = Character.compare(firstChar, other.firstChar);
        if (cmp!= 0) {
            return cmp;
        }
        return Integer.compare(length, other.length);
    }

    @Override
    public int hashCode() {
        return firstChar + length;
    }

    @Override
    public boolean equals(Object obj) {
        if (obj instanceof CompositeKey) {
            CompositeKey other = (CompositeKey) obj;
            return firstChar == other.firstChar && length == other.length;
        }
        return false;
    }

    @Override
    public String toString() {
        return firstChar + ":" + length;
    }
}

上記のコードでは、WritableComparable インターフェイスを実装する CompositeKey クラスを作成しています。このクラスには 2 つのフィールドがあります。firstChar(単語の最初の文字)と length(単語の長さ)です。このクラスは、これらのフィールドに対する getter および setter メソッドを提供するとともに、WritableComparable インターフェイスで必要とされる writereadFieldscompareTohashCodeequals、および toString メソッドの実装も提供しています。

compareTo メソッドは特に重要で、MapReduce ジョブでキーがどのようにソートされるかを定義します。私たちの実装では、まず 2 つのキーの firstChar フィールドを比較します。異なる場合、その比較結果を返します。firstChar フィールドが同じ場合、次に length フィールドを比較します。

リデューサを実装する

このステップでは、Mapper から emit されたキーと値のペアを処理し、最終的な出力を生成するカスタム Reducer クラスを作成します。

まず、Reducer クラス用の Java ファイルを作成します。

touch /home/hadoop/WordLengthReducer.java

次に、WordLengthReducer.java ファイルに以下のコードを追加します。

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordLengthReducer extends Reducer<CompositeKey, Text, CompositeKey, Text> {

    public void reduce(CompositeKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        for (Text value : values) {
            sb.append(value.toString()).append(", ");
        }
        sb.setLength(sb.length() - 2);
        context.write(key, new Text(sb.toString()));
    }
}

上記のコードでは、Hadoop MapReduce API の Reducer クラスを拡張する WordLengthReducer クラスを作成しています。reduce メソッドは、CompositeKey キー(単語の最初の文字と長さを含む)と Text 型の値の Iterable(キーに一致する単語)を受け取ります。

reduce メソッドの中では、キーに一致するすべての単語をカンマ区切りの文字列に連結します。効率的に出力文字列を構築するために StringBuilder を使用し、キーと値のペアを出力に書き込む前に末尾のカンマと空白を削除します。

ドライバを実装する

このステップでは、MapReduce ジョブを構成して実行するためのドライバクラスを作成します。

まず、ドライバクラス用の Java ファイルを作成します。

touch /home/hadoop/WordLengthDriver.java

次に、WordLengthDriver.java ファイルに以下のコードを追加します。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordLengthDriver {

    public static void main(String[] args) throws Exception {
        if (args.length!= 2) {
            System.err.println("Usage: WordLengthDriver <input> <output>");
            System.exit(1);
        }

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Word Length");

        job.setJarByClass(WordLengthDriver.class);
        job.setMapperClass(WordLengthMapper.class);
        job.setReducerClass(WordLengthReducer.class);
        job.setOutputKeyClass(CompositeKey.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true)? 0 : 1);
    }
}

上記のコードでは、MapReduce ジョブのエントリポイントとなる WordLengthDriver クラスを作成しています。main メソッドは、2 つのコマンドライン引数を受け取ります。ジョブの入力パスと出力パスです。

main メソッドの中では、新しい Configuration オブジェクトと新しい Job オブジェクトを作成します。ジョブを設定するために、マッパーとリデューサクラス、出力キーと値のクラス、入力と出力パスを設定します。

最後に、ジョブを送信して完了を待ちます。ジョブが正常に完了した場合、ステータスコード 0 で終了します。そうでない場合は、ステータスコード 1 で終了します。

ジョブを実行するには、次のコマンドを使用できます。

javac -source 8 -target 8 -classpath "/home/hadoop/:/home/hadoop/hadoop/share/hadoop/common/hadoop-common-3.3.6.jar:/home/hadoop/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.6.jar:/home/hadoop/hadoop/share/hadoop/common/lib/*" -d /home/hadoop /home/hadoop/WordLengthMapper.java /home/hadoop/CompositeKey.java /home/hadoop/WordLengthReducer.java /home/hadoop/WordLengthDriver.java
jar cvf word-length.jar *.class
hadoop jar word-length.jar WordLengthDriver /input /output

最後に、次のコマンドを実行して結果を確認できます。

hadoop fs -cat /output/*

出力例:

A:3 Amr
A:6 AADzCv
A:10 AlGyQumgIl
...
h:7 hgQUIhA
h:8 hyrjMGbY, hSElGKux
h:10 hmfHJjCkwB
...
z:6 zkpRCN
z:8 zfMHRbtk
z:9 zXyUuLHma

まとめ

この実験では、単語をその最初の文字と長さに基づいてグループ化する MapReduce ジョブを実装することで、Hadoop Shuffle Comparable の概念を探りました。複合キーを持つキーと値のペアを emit するためのカスタム Mapper、WritableComparable インターフェイスを実装するカスタム CompositeKey クラス、同じキーの単語を連結する Reducer、およびジョブを構成して実行する Driver クラスを作成しました。

この実験を通じて、Hadoop MapReduce フレームワークと、分散コンピューティングにおけるカスタムデータ型とソートの重要性を深く理解することができました。Hadoop Shuffle Comparable をマスターすることで、データ処理と分析のための効率的なアルゴリズムを設計でき、謎めいたマスクダンサーが混沌とした夜市の屋台を整えるように、ビッグデータの力を解き放つことができます。