Разделение данных в Олимпиаде с использованием Hadoop

HadoopBeginner
Практиковаться сейчас

Введение

В древних греческих Олимпиадах спортсмены со всей страны собирались, чтобы показать свою силу и соревноваться в различных спортивных мероприятиях. Один из таких спортсменов, Алексιος, тренировался без перерыва для предстоящих игр, решая принести славу своей polis.

Целью было сортировать и организовать участников по разным видам спорта, обеспечивая честную и эффективную борьбу. Однако, с сотнями спортсменов, борющихся за славу, задача разделить их по своим видам спорта была огромной.

Реализовать маппер

В этом шаге мы создадим класс маппера, который будет читать входные данные и генерировать пары ключ-значение для обработки партиционировщиком.

Сначала смените пользователя на hadoop, а затем перейдите в домашнюю директорию пользователя hadoop:

su - hadoop

Затем создайте Java-файл для класса маппера:

touch /home/hadoop/OlympicMapper.java

Добавьте следующий код в файл OlympicMapper.java:

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class OlympicMapper extends Mapper<LongWritable, Text, Text, Text> {

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] fields = value.toString().split(",");
        String athlete = fields[0];
        String event = fields[1];
        context.write(new Text(event), new Text(athlete));
    }
}

В классе OlympicMapper мы определяем входной ключ как LongWritable (представляющий смещение строки) и входное значение как Text (представляющий строку текста из входного файла). Выходной ключ - это объект Text, представляющий вид спорта, а выходное значение - это объект Text, представляющий имя спортсмена.

Метод map разбивает каждую строку входных данных по разделителю запятая, извлекает имя спортсмена и вид спорта и emits пару ключ-значение с видом спорта в качестве ключа и именем спортсмена в качестве значения.

Реализовать партиционировщик

В этом шаге мы создадим пользовательский класс партиционировщика, который будет разделять пары ключ-значение на основе вида спорта.

Сначала создайте Java-файл для класса партиционировщика:

touch /home/hadoop/OlympicPartitioner.java

Затем добавьте следующий код в файл OlympicPartitioner.java:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class OlympicPartitioner extends Partitioner<Text, Text> {

    @Override
    public int getPartition(Text key, Text value, int numPartitions) {
        return Math.abs(key.hashCode() % numPartitions);
    }
}

Класс OlympicPartitioner расширяет класс Partitioner, предоставляемый Hadoop. Он переопределяет метод getPartition, который принимает ключ, значение и количество разделов в качестве входных параметров.

Метод getPartition вычисляет хэш-код для вида спорта (ключа) и возвращает номер раздела, взяв абсолютное значение хэш-кода по модулю количество разделов. Это гарантирует, что все записи с одинаковым видом спорта отправляются в один и тот же раздел для обработки редьюсером.

Реализовать редьюсер

В этом шаге мы создадим класс редьюсера, который будет обрабатывать разделенные данные и генерировать окончательный вывод.

Сначала создайте Java-файл для класса редьюсера:

touch /home/hadoop/OlympicReducer.java

Затем добавьте следующий код в файл OlympicReducer.java:

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class OlympicReducer extends Reducer<Text, Text, Text, Text> {

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        StringBuilder athletes = new StringBuilder();
        for (Text value : values) {
            athletes.append(value.toString()).append(",");
        }
        if (athletes.length() > 0) {
            athletes.deleteCharAt(athletes.length() - 1);
        }
        context.write(key, new Text(athletes.toString()));
    }
}

Класс OlympicReducer расширяет класс Reducer, предоставляемый Hadoop. Он определяет входной ключ как Text (представляющий вид спорта), входное значение как Text (представляющий имя спортсмена) и выходной ключ и значение как объекты Text.

Метод reduce вызывается для каждого уникального ключа события, с итератором по именам спортсменов, связанным с этим событием. Он формирует запятой-разделенный список спортсменов для каждого события и emits пару ключ-значение с событием в качестве ключа и списком спортсменов в качестве значения.

Написать драйвер

В этом шаге мы создадим класс драйвера, который связывает классы маппера, партиционировщика и редьюсера и запускает задачу MapReduce.

Сначала создайте Java-файл для класса драйвера:

touch /home/hadoop/OlympicDriver.java

Затем добавьте следующий код в файл OlympicDriver.java:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OlympicDriver {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Olympic Partitioner");

        job.setJarByClass(OlympicDriver.class);
        job.setMapperClass(OlympicMapper.class);
        job.setPartitionerClass(OlympicPartitioner.class);
        job.setReducerClass(OlympicReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true)? 0 : 1);
    }
}

Класс OlympicDriver является точкой входа для задачи MapReduce. Он настраивает конфигурацию задачи, задает классы маппера, партиционировщика и редьюсера и настраивает пути ввода и вывода.

В методе main мы создаем новый объект Configuration и экземпляр Job с именем задачи "Olympic Partitioner". Мы задаем классы маппера, партиционировщика и редьюсера с использованием соответствующих методов установки.

Мы также задаем классы выходного ключа и значения как Text. Путь ввода и вывода задаются с использованием аргументов командной строки, переданных драйверу.

Наконец, мы вызываем метод waitForCompletion на экземпляре Job, чтобы запустить задачу MapReduce и выйти с соответствующим кодом статуса (0 при успехе, 1 при ошибке).

Для запуска задачи вам нужно скомпилировать Java-классы и создать jar-файл. Затем вы можете выполнить jar-файл с использованием следующей команды:

javac -source 8 -target 8 -classpath "/home/hadoop/:/home/hadoop/hadoop/share/hadoop/common/hadoop-common-3.3.6.jar:/home/hadoop/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.6.jar:/home/hadoop/hadoop/share/hadoop/common/lib/*" -d /home/hadoop /home/hadoop/OlympicMapper.java /home/hadoop/OlympicPartitioner.java /home/hadoop/OlympicReducer.java /home/hadoop/OlympicDriver.java
jar cvf olympic.jar *.class
hadoop jar olympic.jar OlympicDriver /input /output

Наконец, мы можем проверить результаты, выполнив следующую команду:

hadoop fs -cat /output/*

Пример вывода:

Event_1 Athlete_17,Athlete_18,Athlete_79,Athlete_71,Athlete_77,Athlete_75,Athlete_19,Athlete_24,Athlete_31,Athlete_32,Athlete_39,Athlete_89,Athlete_88,Athlete_87,Athlete_100,Athlete_13,Athlete_52,Athlete_53,Athlete_58
Event_2 Athlete_1,Athlete_97,Athlete_96,Athlete_85,Athlete_81,Athlete_80,Athlete_72,Athlete_68,Athlete_64,Athlete_61,Athlete_54,Athlete_48,Athlete_47,Athlete_43,Athlete_28,Athlete_23,Athlete_21,Athlete_15,Athlete_12,Athlete_3
Event_3 Athlete_11,Athlete_55,Athlete_8,Athlete_46,Athlete_42,Athlete_41,Athlete_40,Athlete_38,Athlete_33,Athlete_92,Athlete_29,Athlete_27,Athlete_25,Athlete_93,Athlete_22,Athlete_20,Athlete_98,Athlete_14,Athlete_69,Athlete_99,Athlete_66,Athlete_65
Event_4 Athlete_90,Athlete_50,Athlete_37,Athlete_36,Athlete_91,Athlete_74,Athlete_73,Athlete_63,Athlete_26,Athlete_78,Athlete_5,Athlete_62,Athlete_60,Athlete_59,Athlete_82,Athlete_4,Athlete_51,Athlete_86,Athlete_2,Athlete_94,Athlete_7,Athlete_95
Event_5 Athlete_34,Athlete_76,Athlete_57,Athlete_56,Athlete_30,Athlete_16,Athlete_6,Athlete_10,Athlete_83,Athlete_84,Athlete_70,Athlete_45,Athlete_44,Athlete_49,Athlete_9,Athlete_67,Athlete_35

Резюме

В этом лабаратории мы изучили концепцию партиционировщика Hadoop Shuffle, создав сценарий, вдохновленный древнегреческими Олимпиадами. Мы реализовали класс маппера для чтения входных данных и генерации пар ключ-значение, пользовательский класс партиционировщика для разделения данных на основе события и класс редьюсера для обработки разделенных данных и генерации окончательного вывода.

В ходе этой лабораторной работы я приобрел практический опыт в использовании модели программирования MapReduce и узнал, как использовать класс партиционировщика для эффективного распределения данных по разделам. Сценарий древнегреческих Олимпиад предоставил интересный контекст для понимания практических приложений партиционировщика Shuffle в реальном мире.