实验环境
系统版本:Centos 7.5
Hadoop版本:Apache Hadoop 2.7.3
1. 简述
Hadoop将输入数据切分成若干个输入分片(input split),并将每个split交给一个MapTask处理;
Map Task不断的从对应的split中解析出一个个key/value,并调用map()函数处理,处理完之后根据Reduce Task个数将结果分成若干个分片(partition)写到本地磁盘;
同时,每个Reduce Task从每个Map Task上读取属于自己的那个partition,然后基于排序的方法将key相同的数据聚集在一起,调用reduce()函数处理,并将结果输出到文件中。
流程图如下:
2. 编写代码
WordMap.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package yiyun.hadoop.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class WordMap extends Mapper<Object, Text, Text, IntWritable> { protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split(" "); for(String word : words) { // 每个单词出现 1 次,作为中间结果输出 context.write(new Text(word), new IntWritable(1)); } } }
|
WordReduce.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| package yiyun.hadoop.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> { protected void reduce(Text key, Iterable<IntWritable> values) throws IOException, InterruptedException { int sum = 0; for(IntWritable count : values) { sum = sum + count.get(); } // 输出最终结果 context.write(key, new IntWritable(sum)); } }
|
WordMain.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package yiyun.hadoop.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { if(args.length != 2 || args == null) { System.out.println("please input current Path"); System.exit(0); }
Configuration conf = new Configuration(); Job job = new Job(conf, WordMain.class.getSimpleName()); // 打包jar包 job.setJarByClass(WordMain.class); // 通过job设置输入输出格式 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); // 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 设置处理 Map/Reduce 阶段的类 job.setMapperClass(WordMap.class); job.setReducerClass(WordReduce.class); // 设置最终输出 key/value 的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 提交作业 job.waitForCompletion(true); } }
|
3. 打包 jar
4. 上传用于单词计数的文本文件到hadoop
上传 test.txt 到 hadoop 根目录
1
| hadoop fs -put /home/yiyun/test.txt /
|
查看是否上传成功
5. 运行 jar 包
运行jar包,指定包名及主类名,然后指定输入路径参数和输出路径参数(该参数都是在HDFS上,且输出路径即word文件夹不能够已存在)
1
| hadoop jar /home/yiyun/wordcount.jar yiyun.hadoop.wordcount.WordMain /test.txt /word
|