Введение
В древних греческих Олимпиадах спортсмены со всей страны собирались, чтобы показать свою силу и соревноваться в различных спортивных мероприятиях. Один из таких спортсменов, Алексιος, тренировался без перерыва для предстоящих игр, решая принести славу своей polis.
Целью было сортировать и организовать участников по разным видам спорта, обеспечивая честную и эффективную борьбу. Однако, с сотнями спортсменов, борющихся за славу, задача разделить их по своим видам спорта была огромной.
Реализовать маппер
В этом шаге мы создадим класс маппера, который будет читать входные данные и генерировать пары ключ-значение для обработки партиционировщиком.
Сначала смените пользователя на hadoop, а затем перейдите в домашнюю директорию пользователя hadoop:
su - hadoop
Затем создайте Java-файл для класса маппера:
touch /home/hadoop/OlympicMapper.java
Добавьте следующий код в файл OlympicMapper.java:
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class OlympicMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
String athlete = fields[0];
String event = fields[1];
context.write(new Text(event), new Text(athlete));
}
}
В классе OlympicMapper мы определяем входной ключ как LongWritable (представляющий смещение строки) и входное значение как Text (представляющий строку текста из входного файла). Выходной ключ - это объект Text, представляющий вид спорта, а выходное значение - это объект Text, представляющий имя спортсмена.
Метод map разбивает каждую строку входных данных по разделителю запятая, извлекает имя спортсмена и вид спорта и emits пару ключ-значение с видом спорта в качестве ключа и именем спортсмена в качестве значения.
Реализовать партиционировщик
В этом шаге мы создадим пользовательский класс партиционировщика, который будет разделять пары ключ-значение на основе вида спорта.
Сначала создайте Java-файл для класса партиционировщика:
touch /home/hadoop/OlympicPartitioner.java
Затем добавьте следующий код в файл OlympicPartitioner.java:
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class OlympicPartitioner extends Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text value, int numPartitions) {
return Math.abs(key.hashCode() % numPartitions);
}
}
Класс OlympicPartitioner расширяет класс Partitioner, предоставляемый Hadoop. Он переопределяет метод getPartition, который принимает ключ, значение и количество разделов в качестве входных параметров.
Метод getPartition вычисляет хэш-код для вида спорта (ключа) и возвращает номер раздела, взяв абсолютное значение хэш-кода по модулю количество разделов. Это гарантирует, что все записи с одинаковым видом спорта отправляются в один и тот же раздел для обработки редьюсером.
Реализовать редьюсер
В этом шаге мы создадим класс редьюсера, который будет обрабатывать разделенные данные и генерировать окончательный вывод.
Сначала создайте Java-файл для класса редьюсера:
touch /home/hadoop/OlympicReducer.java
Затем добавьте следующий код в файл OlympicReducer.java:
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class OlympicReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuilder athletes = new StringBuilder();
for (Text value : values) {
athletes.append(value.toString()).append(",");
}
if (athletes.length() > 0) {
athletes.deleteCharAt(athletes.length() - 1);
}
context.write(key, new Text(athletes.toString()));
}
}
Класс OlympicReducer расширяет класс Reducer, предоставляемый Hadoop. Он определяет входной ключ как Text (представляющий вид спорта), входное значение как Text (представляющий имя спортсмена) и выходной ключ и значение как объекты Text.
Метод reduce вызывается для каждого уникального ключа события, с итератором по именам спортсменов, связанным с этим событием. Он формирует запятой-разделенный список спортсменов для каждого события и emits пару ключ-значение с событием в качестве ключа и списком спортсменов в качестве значения.
Написать драйвер
В этом шаге мы создадим класс драйвера, который связывает классы маппера, партиционировщика и редьюсера и запускает задачу MapReduce.
Сначала создайте Java-файл для класса драйвера:
touch /home/hadoop/OlympicDriver.java
Затем добавьте следующий код в файл OlympicDriver.java:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class OlympicDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Olympic Partitioner");
job.setJarByClass(OlympicDriver.class);
job.setMapperClass(OlympicMapper.class);
job.setPartitionerClass(OlympicPartitioner.class);
job.setReducerClass(OlympicReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)? 0 : 1);
}
}
Класс OlympicDriver является точкой входа для задачи MapReduce. Он настраивает конфигурацию задачи, задает классы маппера, партиционировщика и редьюсера и настраивает пути ввода и вывода.
В методе main мы создаем новый объект Configuration и экземпляр Job с именем задачи "Olympic Partitioner". Мы задаем классы маппера, партиционировщика и редьюсера с использованием соответствующих методов установки.
Мы также задаем классы выходного ключа и значения как Text. Путь ввода и вывода задаются с использованием аргументов командной строки, переданных драйверу.
Наконец, мы вызываем метод waitForCompletion на экземпляре Job, чтобы запустить задачу MapReduce и выйти с соответствующим кодом статуса (0 при успехе, 1 при ошибке).
Для запуска задачи вам нужно скомпилировать Java-классы и создать jar-файл. Затем вы можете выполнить jar-файл с использованием следующей команды:
javac -source 8 -target 8 -classpath "/home/hadoop/:/home/hadoop/hadoop/share/hadoop/common/hadoop-common-3.3.6.jar:/home/hadoop/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.6.jar:/home/hadoop/hadoop/share/hadoop/common/lib/*" -d /home/hadoop /home/hadoop/OlympicMapper.java /home/hadoop/OlympicPartitioner.java /home/hadoop/OlympicReducer.java /home/hadoop/OlympicDriver.java
jar cvf olympic.jar *.class
hadoop jar olympic.jar OlympicDriver /input /output
Наконец, мы можем проверить результаты, выполнив следующую команду:
hadoop fs -cat /output/*
Пример вывода:
Event_1 Athlete_17,Athlete_18,Athlete_79,Athlete_71,Athlete_77,Athlete_75,Athlete_19,Athlete_24,Athlete_31,Athlete_32,Athlete_39,Athlete_89,Athlete_88,Athlete_87,Athlete_100,Athlete_13,Athlete_52,Athlete_53,Athlete_58
Event_2 Athlete_1,Athlete_97,Athlete_96,Athlete_85,Athlete_81,Athlete_80,Athlete_72,Athlete_68,Athlete_64,Athlete_61,Athlete_54,Athlete_48,Athlete_47,Athlete_43,Athlete_28,Athlete_23,Athlete_21,Athlete_15,Athlete_12,Athlete_3
Event_3 Athlete_11,Athlete_55,Athlete_8,Athlete_46,Athlete_42,Athlete_41,Athlete_40,Athlete_38,Athlete_33,Athlete_92,Athlete_29,Athlete_27,Athlete_25,Athlete_93,Athlete_22,Athlete_20,Athlete_98,Athlete_14,Athlete_69,Athlete_99,Athlete_66,Athlete_65
Event_4 Athlete_90,Athlete_50,Athlete_37,Athlete_36,Athlete_91,Athlete_74,Athlete_73,Athlete_63,Athlete_26,Athlete_78,Athlete_5,Athlete_62,Athlete_60,Athlete_59,Athlete_82,Athlete_4,Athlete_51,Athlete_86,Athlete_2,Athlete_94,Athlete_7,Athlete_95
Event_5 Athlete_34,Athlete_76,Athlete_57,Athlete_56,Athlete_30,Athlete_16,Athlete_6,Athlete_10,Athlete_83,Athlete_84,Athlete_70,Athlete_45,Athlete_44,Athlete_49,Athlete_9,Athlete_67,Athlete_35
Резюме
В этом лабаратории мы изучили концепцию партиционировщика Hadoop Shuffle, создав сценарий, вдохновленный древнегреческими Олимпиадами. Мы реализовали класс маппера для чтения входных данных и генерации пар ключ-значение, пользовательский класс партиционировщика для разделения данных на основе события и класс редьюсера для обработки разделенных данных и генерации окончательного вывода.
В ходе этой лабораторной работы я приобрел практический опыт в использовании модели программирования MapReduce и узнал, как использовать класс партиционировщика для эффективного распределения данных по разделам. Сценарий древнегреческих Олимпиад предоставил интересный контекст для понимания практических приложений партиционировщика Shuffle в реальном мире.



