Exécuter un travail MapReduce
Dans cette étape, vous allez apprendre à exécuter un travail MapReduce sur les données stockées dans HDFS, en exploitant le pouvoir du traitement parallèle pour analyser efficacement de grands ensembles de données.
MapReduce est un modèle de programmation pour traiter de grands ensembles de données en parallèle sur une grappe de machines. Il est composé de deux phases principales :
-
Map : Les données d'entrée sont divisées en morceaux plus petits, et chaque morceau est traité par une tâche séparée appelée "mappeur". Le mappeur traite les données et émet des paires clé-valeur.
-
Reduce : La sortie des mappeurs est triée et regroupée par clé, et chaque groupe est traité par une tâche séparée appelée "réducteur". Le réducteur combine les valeurs associées à chaque clé et produit le résultat final.
Exécutons un simple travail MapReduce qui compte les occurrences de mots dans un fichier texte. Tout d'abord, créez un fichier Java nommé WordCount.java
avec le contenu suivant :
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)? 0 : 1);
}
}
Ensuite, compilez le fichier Java :
mkdir ~/wordcount
javac -source 8 -target 8 -classpath $(hadoop classpath) -d ~/wordcount WordCount.java
jar -cvf ~/wordcount.jar -C ~/wordcount.
Enfin, exécutez le travail MapReduce :
hadoop jar ~/wordcount.jar WordCount /home/hadoop/input/file.txt /home/hadoop/output
La classe WordCount
définit un travail MapReduce qui compte les occurrences de mots dans un fichier texte. La classe TokenizerMapper
découpe chaque ligne de texte d'entrée et émet des paires clé-valeur (mot, 1). La classe IntSumReducer
additionne les valeurs (comptes) pour chaque mot et émet les paires finales (mot, compte).
Le fichier Java est compilé et emballé dans un fichier JAR, qui est ensuite exécuté à l'aide de la commande hadoop jar
. Le chemin du fichier d'entrée (/home/hadoop/input/file.txt
) et le chemin du répertoire de sortie (/home/hadoop/output
) sont fournis en tant qu'arguments.