分布式缓存的古老智慧

HadoopHadoopBeginner
立即练习

💡 本教程由 AI 辅助翻译自英文原版。如需查看原文,您可以 切换至英文原版

引言

在一片失落文明的古老遗迹中,一群现代探险者偶然发现了一座供奉知识与智慧之神的隐秘神庙。神庙的墙壁上装饰着复杂的象形文字,这些文字蕴含着古代祭司们使用的高级数据处理系统的秘密。

其中一位探险者,一位技艺精湛的Hadoop工程师,承担起了大祭司的角色,破译这些象形文字,揭开神庙的奥秘。目标是通过利用Hadoop分布式缓存的力量,高效处理大规模数据集,重建古代的数据处理系统,就像几个世纪前的古代祭司们所做的那样。


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL hadoop(("`Hadoop`")) -.-> hadoop/HadoopMapReduceGroup(["`Hadoop MapReduce`"]) hadoop(("`Hadoop`")) -.-> hadoop/HadoopYARNGroup(["`Hadoop YARN`"]) hadoop(("`Hadoop`")) -.-> hadoop/HadoopHiveGroup(["`Hadoop Hive`"]) hadoop/HadoopMapReduceGroup -.-> hadoop/setup_jobs("`Setting up MapReduce Jobs`") hadoop/HadoopMapReduceGroup -.-> hadoop/mappers_reducers("`Coding Mappers and Reducers`") hadoop/HadoopMapReduceGroup -.-> hadoop/distributed_cache("`Leveraging Distributed Cache in Jobs`") hadoop/HadoopYARNGroup -.-> hadoop/yarn_jar("`Yarn Commands jar`") hadoop/HadoopHiveGroup -.-> hadoop/import_export_data("`Importing and Exporting Data`") subgraph Lab Skills hadoop/setup_jobs -.-> lab-288968{{"`分布式缓存的古老智慧`"}} hadoop/mappers_reducers -.-> lab-288968{{"`分布式缓存的古老智慧`"}} hadoop/distributed_cache -.-> lab-288968{{"`分布式缓存的古老智慧`"}} hadoop/yarn_jar -.-> lab-288968{{"`分布式缓存的古老智慧`"}} hadoop/import_export_data -.-> lab-288968{{"`分布式缓存的古老智慧`"}} end

准备数据集和代码

在这一步骤中,我们将设置必要的文件和代码,以模拟古代的数据处理系统。

首先,切换到 hadoop 用户,然后进入 hadoop 用户的主目录:

su - hadoop

创建一个名为 distributed-cache-lab 的新目录,并进入该目录:

mkdir distributed-cache-lab
cd distributed-cache-lab

接下来,创建一个名为 ancient-texts.txt 的文本文件,内容如下:

The wisdom of the ages is eternal.
Knowledge is the path to enlightenment.
Embrace the mysteries of the universe.

该文件将代表我们想要处理的古代文本。

现在,创建一个名为 AncientTextAnalyzer.java 的 Java 文件,内容如下:

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class AncientTextAnalyzer {

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: AncientTextAnalyzer <in> <out>");
            System.exit(2);
        }

        Job job = Job.getInstance(conf, "Ancient Text Analyzer");
        job.setJarByClass(AncientTextAnalyzer.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

这段代码是一个简单的 MapReduce 程序,用于统计输入文件中每个单词的出现次数。我们将使用这段代码来演示 Hadoop 中分布式缓存的使用。

编译并打包代码

在这一步骤中,我们将编译 Java 代码并创建一个用于部署的 JAR 文件。

首先,确保你的 classpath 中包含 Hadoop 核心 JAR 文件。你可以从 Apache Hadoop 网站下载,或者使用 Hadoop 安装中提供的文件。

编译 AncientTextAnalyzer.java 文件:

javac -source 8 -target 8 -classpath "/home/hadoop/hadoop/share/hadoop/common/hadoop-common-3.3.6.jar:/home/hadoop/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.6.jar:/home/hadoop/hadoop/share/hadoop/common/lib/*" AncientTextAnalyzer.java

现在,使用编译后的 class 文件创建一个 JAR 文件:

jar -cvf ancient-text-analyzer.jar AncientTextAnalyzer*.class

使用分布式缓存运行 MapReduce 任务

在这一步骤中,我们将运行 MapReduce 任务,并利用分布式缓存将输入文件提供给集群中的所有节点。

首先,将输入文件 ancient-texts.txt 复制到 Hadoop 分布式文件系统(HDFS)中:

hadoop fs -mkdir /input
hadoop fs -put ancient-texts.txt /input/ancient-texts.txt

接下来,使用分布式缓存选项运行 MapReduce 任务:

hadoop jar ancient-text-analyzer.jar AncientTextAnalyzer -files ancient-texts.txt /input/ancient-texts.txt /output

该命令将运行 AncientTextAnalyzer MapReduce 任务,并使用 -files 选项将 ancient-texts.txt 文件分发到集群中的所有节点。输入路径为 /input/ancient-texts.txt,输出路径为 /output

任务完成后,你可以查看输出结果:

hadoop fs -cat /output/part-r-00000

你应该会看到类似以下的单词计数输出:

Embrace 1
Knowledge       1
The     1
ages    1
enlightenment.  1
eternal.        1
is      2
mysteries       1
of      2
path    1
the     4
to      1
universe.       1
wisdom  1

总结

在本实验中,我们通过实现一个古代文本分析系统,探索了 Hadoop 分布式缓存功能的强大之处。通过利用分布式缓存,我们能够高效地将输入文件分发到集群中的所有节点,从而实现并行处理,并减少跨网络传输数据的开销。

通过这次实践,我对 Hadoop 分布式缓存如何在分布式计算环境中优化数据处理有了更深入的理解。通过在集群中缓存频繁访问的数据,我们可以显著提高性能并减少网络流量,尤其是在处理大规模数据集或复杂计算时。

此外,本实验还让我获得了使用 Hadoop MapReduce、Java 编程以及在 Hadoop 集群上执行任务的实践经验。理论知识与动手实践的结合,提升了我在大数据处理方面的熟练度,并为我应对更高级的 Hadoop 相关挑战做好了准备。

您可能感兴趣的其他 Hadoop 教程