Hadoop 奥运会分区

HadoopHadoopBeginner
立即练习

💡 本教程由 AI 辅助翻译自英文原版。如需查看原文,您可以 切换至英文原版

引言

在古希腊奥林匹克运动会上,来自各地的运动员们齐聚一堂,展示他们的实力并参与各种体育赛事。其中一位名叫亚历克西奥斯(Alexios)的运动员,为了即将到来的比赛进行了不懈的训练,决心为自己的城邦赢得荣耀。

目标是根据运动员的项目将他们分类并组织到不同的组别中,以确保比赛的公平和高效。然而,面对数百名渴望荣耀的运动员,将他们分配到各自的项目中是一项艰巨的任务。


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL hadoop(("`Hadoop`")) -.-> hadoop/HadoopMapReduceGroup(["`Hadoop MapReduce`"]) hadoop(("`Hadoop`")) -.-> hadoop/HadoopYARNGroup(["`Hadoop YARN`"]) hadoop(("`Hadoop`")) -.-> hadoop/HadoopHiveGroup(["`Hadoop Hive`"]) hadoop/HadoopMapReduceGroup -.-> hadoop/mappers_reducers("`Coding Mappers and Reducers`") hadoop/HadoopMapReduceGroup -.-> hadoop/shuffle_partitioner("`Shuffle Partitioner`") hadoop/HadoopYARNGroup -.-> hadoop/yarn_setup("`Hadoop YARN Basic Setup`") hadoop/HadoopHiveGroup -.-> hadoop/process("`Process Control Function`") hadoop/HadoopHiveGroup -.-> hadoop/udf("`User Defined Function`") hadoop/HadoopHiveGroup -.-> hadoop/integration("`Integration with HDFS and MapReduce`") subgraph Lab Skills hadoop/mappers_reducers -.-> lab-288997{{"`Hadoop 奥运会分区`"}} hadoop/shuffle_partitioner -.-> lab-288997{{"`Hadoop 奥运会分区`"}} hadoop/yarn_setup -.-> lab-288997{{"`Hadoop 奥运会分区`"}} hadoop/process -.-> lab-288997{{"`Hadoop 奥运会分区`"}} hadoop/udf -.-> lab-288997{{"`Hadoop 奥运会分区`"}} hadoop/integration -.-> lab-288997{{"`Hadoop 奥运会分区`"}} end

实现 Mapper

在这一步中,我们将创建一个 Mapper 类,用于读取输入数据并生成供 Partitioner 处理的键值对。

首先,将用户切换为 hadoop,然后切换到 hadoop 用户的主目录:

su - hadoop

接着,为 Mapper 类创建一个 Java 文件:

touch /home/hadoop/OlympicMapper.java

将以下代码添加到 OlympicMapper.java 文件中:

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class OlympicMapper extends Mapper<LongWritable, Text, Text, Text> {

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] fields = value.toString().split(",");
        String athlete = fields[0];
        String event = fields[1];
        context.write(new Text(event), new Text(athlete));
    }
}

OlympicMapper 类中,我们将输入键定义为 LongWritable(表示行的偏移量),输入值定义为 Text(表示输入文件中的一行文本)。输出键是一个表示项目的 Text 对象,输出值是一个表示运动员姓名的 Text 对象。

map 方法通过逗号分隔符拆分每行输入数据,提取运动员的姓名和项目,并发射一个以项目为键、运动员姓名为值的键值对。

实现 Partitioner

在这一步中,我们将创建一个自定义的 Partitioner 类,用于根据项目对键值对进行分区。

首先,为 Partitioner 类创建一个 Java 文件:

touch /home/hadoop/OlympicPartitioner.java

然后,将以下代码添加到 OlympicPartitioner.java 文件中:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class OlympicPartitioner extends Partitioner<Text, Text> {

    @Override
    public int getPartition(Text key, Text value, int numPartitions) {
        return Math.abs(key.hashCode() % numPartitions);
    }
}

OlympicPartitioner 类扩展了 Hadoop 提供的 Partitioner 类。它重写了 getPartition 方法,该方法接收键、值和分区数量作为输入。

getPartition 方法计算项目(键)的哈希码,并通过取哈希码的绝对值对分区数量取模来返回分区编号。这确保了所有具有相同项目的记录都会被发送到同一个分区,以便由 Reducer 进行处理。

实现 Reducer

在这一步中,我们将创建一个 Reducer 类,用于处理分区数据并生成最终输出。

首先,为 Reducer 类创建一个 Java 文件:

touch /home/hadoop/OlympicReducer.java

然后,将以下代码添加到 OlympicReducer.java 文件中:

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class OlympicReducer extends Reducer<Text, Text, Text, Text> {

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        StringBuilder athletes = new StringBuilder();
        for (Text value : values) {
            athletes.append(value.toString()).append(",");
        }
        if (athletes.length() > 0) {
            athletes.deleteCharAt(athletes.length() - 1);
        }
        context.write(key, new Text(athletes.toString()));
    }
}

OlympicReducer 类扩展了 Hadoop 提供的 Reducer 类。它将输入键定义为 Text(表示项目),输入值定义为 Text(表示运动员的姓名),并将输出键和值定义为 Text 对象。

reduce 方法会为每个唯一的事件键调用,并传入与该事件关联的运动员姓名的迭代器。它会为每个事件构建一个以逗号分隔的运动员列表,并发射一个以事件为键、运动员列表为值的键值对。

编写 Driver

在这一步中,我们将创建一个 Driver 类,用于将 Mapper、Partitioner 和 Reducer 类整合在一起并运行 MapReduce 任务。

首先,为 Driver 类创建一个 Java 文件:

touch /home/hadoop/OlympicDriver.java

然后,将以下代码添加到 OlympicDriver.java 文件中:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OlympicDriver {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Olympic Partitioner");

        job.setJarByClass(OlympicDriver.class);
        job.setMapperClass(OlympicMapper.class);
        job.setPartitionerClass(OlympicPartitioner.class);
        job.setReducerClass(OlympicReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

OlympicDriver 类是 MapReduce 任务的入口点。它设置了任务配置,指定了 Mapper、Partitioner 和 Reducer 类,并配置了输入和输出路径。

main 方法中,我们创建了一个新的 Configuration 对象和一个名为 "Olympic Partitioner" 的 Job 实例。我们使用相应的 setter 方法设置了 Mapper、Partitioner 和 Reducer 类。

我们还将输出键和值类设置为 Text。输入和输出路径通过传递给驱动程序的命令行参数指定。

最后,我们调用 Job 实例的 waitForCompletion 方法来运行 MapReduce 任务,并根据任务的成功或失败退出(0 表示成功,1 表示失败)。

要运行任务,你需要编译 Java 类并创建一个 jar 文件。然后,你可以使用以下命令执行 jar 文件:

javac -source 8 -target 8 -classpath "/home/hadoop/:/home/hadoop/hadoop/share/hadoop/common/hadoop-common-3.3.6.jar:/home/hadoop/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.6.jar:/home/hadoop/hadoop/share/hadoop/common/lib/*" -d /home/hadoop /home/hadoop/OlympicMapper.java /home/hadoop/OlympicPartitioner.java /home/hadoop/OlympicReducer.java /home/hadoop/OlympicDriver.java
jar cvf olympic.jar *.class
hadoop jar olympic.jar OlympicDriver /input /output

最后,我们可以通过运行以下命令来检查结果:

hadoop fs -cat /output/*

示例输出:

Event_1 Athlete_17,Athlete_18,Athlete_79,Athlete_71,Athlete_77,Athlete_75,Athlete_19,Athlete_24,Athlete_31,Athlete_32,Athlete_39,Athlete_89,Athlete_88,Athlete_87,Athlete_100,Athlete_13,Athlete_52,Athlete_53,Athlete_58
Event_2 Athlete_1,Athlete_97,Athlete_96,Athlete_85,Athlete_81,Athlete_80,Athlete_72,Athlete_68,Athlete_64,Athlete_61,Athlete_54,Athlete_48,Athlete_47,Athlete_43,Athlete_28,Athlete_23,Athlete_21,Athlete_15,Athlete_12,Athlete_3
Event_3 Athlete_11,Athlete_55,Athlete_8,Athlete_46,Athlete_42,Athlete_41,Athlete_40,Athlete_38,Athlete_33,Athlete_92,Athlete_29,Athlete_27,Athlete_25,Athlete_93,Athlete_22,Athlete_20,Athlete_98,Athlete_14,Athlete_69,Athlete_99,Athlete_66,Athlete_65
Event_4 Athlete_90,Athlete_50,Athlete_37,Athlete_36,Athlete_91,Athlete_74,Athlete_73,Athlete_63,Athlete_26,Athlete_78,Athlete_5,Athlete_62,Athlete_60,Athlete_59,Athlete_82,Athlete_4,Athlete_51,Athlete_86,Athlete_2,Athlete_94,Athlete_7,Athlete_95
Event_5 Athlete_34,Athlete_76,Athlete_57,Athlete_56,Athlete_30,Athlete_16,Athlete_6,Athlete_10,Athlete_83,Athlete_84,Athlete_70,Athlete_45,Athlete_44,Athlete_49,Athlete_9,Athlete_67,Athlete_35

总结

在本实验中,我们通过设计一个受古希腊奥运会启发的场景,探索了 Hadoop Shuffle Partitioner 的概念。我们实现了一个 Mapper 类来读取输入数据并生成键值对,一个自定义的 Partitioner 类来根据项目对数据进行分区,以及一个 Reducer 类来处理分区数据并生成最终输出。

通过本实验,我获得了 MapReduce 编程模型的实践经验,并学习了如何利用 Partitioner 类有效地将数据分布到各个分区。古希腊奥运会的场景为理解 Shuffle Partitioner 在实际应用中的用途提供了一个引人入胜的背景。

您可能感兴趣的其他 Hadoop 教程