Join 연산 구현
이 단계에서는 dinosaurs.txt와 locations.txt 파일에 대한 join 연산을 수행하는 MapReduce 작업을 구현합니다.
/home/hadoop/join-lab 디렉토리에 다음 내용으로 JoinDinosaurs.java라는 새 Java 파일을 생성합니다:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class JoinDinosaurs {
public static class JoinMapper extends Mapper<LongWritable, Text, Text, Text> {
private final Text outKey = new Text();
private final Text outValue = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] parts = line.split(",");
if (parts.length == 2) { // locations.txt
outKey.set(parts[0]);
outValue.set("LOC:" + parts[1]);
} else if (parts.length == 3) { // dinosaurs.txt
outKey.set(parts[0]);
outValue.set("DIN:" + parts[1] + "," + parts[2]);
}
context.write(outKey, outValue);
}
}
public static class JoinReducer extends Reducer<Text, Text, Text, Text> {
private final Text outValue = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Map<String, String> dinMap = new HashMap<>();
StringBuilder locBuilder = new StringBuilder();
for (Text value : values) {
String valStr = value.toString();
if (valStr.startsWith("DIN:")) {
dinMap.put("DIN", valStr.substring(4));
} else if (valStr.startsWith("LOC:")) {
locBuilder.append(valStr.substring(4)).append(",");
}
if (locBuilder.length() > 0) {
locBuilder.deleteCharAt(locBuilder.length() - 1);
}
}
StringBuilder outBuilder = new StringBuilder();
for (Map.Entry<String, String> entry : dinMap.entrySet()) {
outBuilder.append(entry.getValue()).append("\t").append(locBuilder.toString().trim());
}
outValue.set(outBuilder.toString());
context.write(key, outValue);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (args.length != 2) {
System.err.println("Usage: JoinDinosaurs <input_dir> <output_dir>");
System.exit(1);
}
Job job = Job.getInstance();
job.setJarByClass(JoinDinosaurs.class);
job.setJobName("Join Dinosaurs");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
이 코드는 사용자 정의 JoinMapper와 JoinReducer를 사용하여 MapReduce 작업을 정의합니다. 매퍼는 dinosaurs.txt와 locations.txt에서 입력 데이터를 읽고, 공룡 이름을 키로, 데이터 유형 ("DIN" 또는 "LOC") 과 해당 값을 값으로 하는 키 - 값 쌍을 내보냅니다. 그런 다음 리듀서는 키별로 값을 그룹화하고 공룡 정보와 위치를 결합하여 join 연산을 수행합니다.
코드를 컴파일하려면 다음 명령을 실행합니다:
mkdir classes
javac -source 8 -target 8 -cp "/home/hadoop/hadoop/share/hadoop/common/hadoop-common-3.3.6.jar:/home/hadoop/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.6.jar:/home/hadoop/hadoop/share/hadoop/common/lib/*" -d classes JoinDinosaurs.java
jar -cvf join-dinosaurs.jar -C classes/ .
다음으로, 다음 명령을 사용하여 MapReduce 작업을 실행합니다:
hadoop jar join-dinosaurs.jar JoinDinosaurs /home/hadoop/join-lab /home/hadoop/join-lab/output
이 명령은 join-dinosaurs.jar 파일에서 JoinDinosaurs 클래스를 실행하며, 입력 디렉토리 /home/hadoop/join-lab ( dinosaurs.txt와 locations.txt 포함) 와 출력 디렉토리 /home/hadoop/join-lab/output을 사용합니다.
작업이 성공적으로 완료되면 /home/hadoop/join-lab/output 디렉토리에서 출력을 볼 수 있습니다.