Ausführen eines MapReduce-Jobs
In diesem Schritt lernen Sie, wie Sie einen MapReduce-Job auf den in HDFS gespeicherten Daten ausführen, indem Sie die Macht der parallelen Verarbeitung nutzen, um große Datensätze effizient zu analysieren.
MapReduce ist ein Programmiermodell zur parallelen Verarbeitung großer Datensätze über einem Cluster von Computern. Es besteht aus zwei Hauptphasen:
-
Map: Die Eingabedaten werden in kleinere Blöcke unterteilt, und jeder Block wird von einer separaten Aufgabe namens "Mapper" verarbeitet. Der Mapper verarbeitet die Daten und gibt Schlüssel-Wert-Paare aus.
-
Reduce: Die Ausgabe der Mapper wird nach Schlüssel sortiert und gruppiert, und jede Gruppe wird von einer separaten Aufgabe namens "Reducer" verarbeitet. Der Reducer kombiniert die mit jedem Schlüssel assoziierten Werte und erzeugt das endgültige Ergebnis.
Lassen Sie uns einen einfachen MapReduce-Job ausführen, der die Vorkommen von Wörtern in einer Textdatei zählt. Erstellen Sie zunächst eine Java-Datei namens WordCount.java
mit dem folgenden Inhalt:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)? 0 : 1);
}
}
Als nächstes kompilieren Sie die Java-Datei:
mkdir ~/wordcount
javac -source 8 -target 8 -classpath $(hadoop classpath) -d ~/wordcount WordCount.java
jar -cvf ~/wordcount.jar -C ~/wordcount.
Schließlich führen Sie den MapReduce-Job aus:
hadoop jar ~/wordcount.jar WordCount /home/hadoop/input/file.txt /home/hadoop/output
Die Klasse WordCount
definiert einen MapReduce-Job, der die Vorkommen von Wörtern in einer Textdatei zählt. Die Klasse TokenizerMapper
tokenisiert jede Zeile des Eingabetexts und gibt (Wort, 1)-Schlüssel-Wert-Paare aus. Die Klasse IntSumReducer
summiert die Werte (Zählungen) für jedes Wort auf und gibt die endgültigen (Wort, Zählung)-Paare aus.
Die Java-Datei wird kompiliert und in eine JAR-Datei verpackt, die dann mit dem Befehl hadoop jar
ausgeführt wird. Der Pfad zur Eingabedatei (/home/hadoop/input/file.txt
) und der Pfad zum Ausgabeverzeichnis (/home/hadoop/output
) werden als Argumente angegeben.