Ejecutando un trabajo MapReduce
En este paso, aprenderás a ejecutar un trabajo MapReduce en los datos almacenados en HDFS, aprovechando el poder del procesamiento en paralelo para analizar eficientemente grandes conjuntos de datos.
MapReduce es un modelo de programación para procesar grandes conjuntos de datos en paralelo en un clúster de máquinas. Está compuesto por dos fases principales:
-
Map: Los datos de entrada se dividen en trozos más pequeños, y cada trozo es procesado por una tarea separada llamada "mapper". El mapper procesa los datos y emite pares clave-valor.
-
Reduce: La salida de los mappers se ordena y agrupa por clave, y cada grupo es procesado por una tarea separada llamada "reducer". El reducer combina los valores asociados con cada clave y produce el resultado final.
Vamos a ejecutar un trabajo MapReduce simple que cuente la frecuencia de aparición de palabras en un archivo de texto. Primero, crea un archivo Java llamado WordCount.java
con el siguiente contenido:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)? 0 : 1);
}
}
A continuación, compila el archivo Java:
mkdir ~/wordcount
javac -source 8 -target 8 -classpath $(hadoop classpath) -d ~/wordcount WordCount.java
jar -cvf ~/wordcount.jar -C ~/wordcount.
Finalmente, ejecuta el trabajo MapReduce:
hadoop jar ~/wordcount.jar WordCount /home/hadoop/input/file.txt /home/hadoop/output
La clase WordCount
define un trabajo MapReduce que cuenta la frecuencia de aparición de palabras en un archivo de texto. La clase TokenizerMapper
tokeniza cada línea de texto de entrada y emite pares clave-valor (palabra, 1). La clase IntSumReducer
suma los valores (conteos) para cada palabra y emite los pares finales (palabra, conteo).
El archivo Java se compila y empaqueta en un archivo JAR, que luego se ejecuta utilizando el comando hadoop jar
. La ruta del archivo de entrada (/home/hadoop/input/file.txt
) y la ruta del directorio de salida (/home/hadoop/output
) se proporcionan como argumentos.