MapReduce(wordcount案例)
一:MapReduce数据处理流程
- 需求描述:从源文件中统计所有单词的出现个数
- 数据处理流程示意图:
-
- 注意:
- mapper是一行行读取数据的,读取一行数据就调用一次map方法
- 相同key的数据会被整合后再交给Reduce
- 每一个键就会调用一次reduce方法
- mapreduce中包含的节点:JobTracker和TaskTracker
二:源数据
hello tom hello bob hello joy hello rose hello joy hello jerry hello tom hello rose hello joy
三:map代码结构解析
3.1map代码结构
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WorldCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String[] arr = value.toString().split(" "); for (String str : arr) { context.write(new Text(str), new IntWritable(1)); } } }
3.2代码分析
3.2.1 mapper的泛型
- 第一参数:KEYIN,指的是map解析源数据时接受到的key,在此处默认为偏移量,偏移量以字节量为单位,用于记录该行数据在文件中的位置,如图所示:
- 第二参数:VALUEIN,指的是map解析源数据时接受到的value,不指定的化默认就是这一行的数据,也就是上图中的字符串
- 第三参数:KEYOUT,指的是map结束后交给reducer的数据的key,当前案例中,输出给Reduce的是单词
- 第四参数:VALUEOUT,指的是map结束后交给reducer的数据的value,当前案例中,输出的的值是次数
3.2.2 泛型种类
- java中long类型对应mapreduce中的LongWritable
- java中int类型对应mapreduce中的IntWritable
- java中String类型对应mapreduce中的Text
3.2.3map方法实现逻辑
- 参数:第一第二参数与所继承的Mapper泛型类型一致,第三参数context,负责把结果写入Reduce中
- 返回类型:空
- 方法体逻辑:
String[] arr = value.toString().split(" "); for (String str : arr) { context.write(new Text(str), new IntWritable(1)); }
- key此处用处不大直接忽略
- mapper是一行行处理数据的,第一行数据读取进入mapper后,value就是第一行的字符串,此时调用map方法处理该字符串,首先将该行字符串转化为String后拆分(text转化为string)
- 每拆分一次就计数1,也就是 0 hello tom hello bob 转变为 hello 1 tom 1 hello 1 bob 1 的过程
注意:
- 相同key的数据会被整合后再交给Reducer
- mapper是一行行读取数据的,读取一行数据就调用一次map方法
四:reducer代码结构解析
4.1Reducer代码结构
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WorldCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } }
4.2代码解析
4.2.1Reducer的泛型参数
- 第一泛型:mapper传递过来的key
- 第二泛型:mapper传递过来的value
- 第三泛型:Reducer输出到结果文件的key
- 第四泛型:Reducer输出到结果文件的value
4.2.2Reducer重写reduce方法
- 参数:
- 第一参数:k对应mapper解析后的单词
- 第二参数:v对应解析后的单词次数1
- 第三参数:context负责输出统计结果到结果文件
- 返回值:空
- 逻辑体: for循环对迭代器(K 1,1,1,1 格式)进行累加运算统计单词出现次数
,统计后输出到结果文件中
int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum));注意:
- 每一次键统计就会调用一次reduce方法,hello:1,1,1 调用一次,tom:1,1,1调用一次
五:Driver代码结构解析
5.1代码结构
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text; public class WorldCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //在MapReduce上需要先给这个程序申请job任务 //0.新建环境配置对象 Configuration conf = new Configuration(); //1.申请job任务 Job job = Job.getInstance(conf); //2.指定入口类(Driver类) job.setJarByClass(WorldCountDriver.class); //3.指定Mapper类 job.setMapperClass(WorldCountMapper.class); //4.指定Reducer类 job.setReducerClass(WorldCountReducer.class); //5.声明Mapper输出结果的类型(KeyOut ValueOut) job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //6.声明Reducer的输出结果类型(KeyOut ValueOut) job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //7.设置要处理的文件路径(指定job对象,指定源文件在节点中的路径) FileInputFormat.addInputPath(job, new Path("hdfs://192.168.174.130:9000/txt/world.txt")); //8.设置输出路径(要求该路径本来不存在) FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.174.130:9000/result")); //9.启动执行,自动阻塞等待执行完毕 job.waitForCompletion(true); } }注意:
- FileInputFormat 导包导入最长的那个
- Text导入Hadoop io的包
六:最原始的运行方式
打包为jar,传入linux中,在Hadoop中执行 hadoop jar XX.jar 命令
hadoop jar XX.jar