MapReduce(wordcount案例)
一:MapReduce数据处理流程
- 需求描述:从源文件中统计所有单词的出现个数
- 数据处理流程示意图:
-
- 注意:
- mapper是一行行读取数据的,读取一行数据就调用一次map方法
- 相同key的数据会被整合后再交给Reduce
- 每一个键就会调用一次reduce方法
- mapreduce中包含的节点:JobTracker和TaskTracker
二:源数据
hello tom hello bob hello joy hello rose hello joy hello jerry hello tom hello rose hello joy
三:map代码结构解析
3.1map代码结构
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WorldCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String[] arr = value.toString().split(" ");
for (String str : arr) {
context.write(new Text(str), new IntWritable(1));
}
}
}
3.2代码分析
3.2.1 mapper的泛型
- 第一参数:KEYIN,指的是map解析源数据时接受到的key,在此处默认为偏移量,偏移量以字节量为单位,用于记录该行数据在文件中的位置,如图所示:
- 第二参数:VALUEIN,指的是map解析源数据时接受到的value,不指定的化默认就是这一行的数据,也就是上图中的字符串
- 第三参数:KEYOUT,指的是map结束后交给reducer的数据的key,当前案例中,输出给Reduce的是单词
- 第四参数:VALUEOUT,指的是map结束后交给reducer的数据的value,当前案例中,输出的的值是次数
3.2.2 泛型种类
- java中long类型对应mapreduce中的LongWritable
- java中int类型对应mapreduce中的IntWritable
- java中String类型对应mapreduce中的Text
3.2.3map方法实现逻辑
- 参数:第一第二参数与所继承的Mapper泛型类型一致,第三参数context,负责把结果写入Reduce中
- 返回类型:空
- 方法体逻辑:
String[] arr = value.toString().split(" ");
for (String str : arr) {
context.write(new Text(str), new IntWritable(1));
} - key此处用处不大直接忽略
- mapper是一行行处理数据的,第一行数据读取进入mapper后,value就是第一行的字符串,此时调用map方法处理该字符串,首先将该行字符串转化为String后拆分(text转化为string)
- 每拆分一次就计数1,也就是 0 hello tom hello bob 转变为 hello 1 tom 1 hello 1 bob 1 的过程
注意:
- 相同key的数据会被整合后再交给Reducer
- mapper是一行行读取数据的,读取一行数据就调用一次map方法
四:reducer代码结构解析
4.1Reducer代码结构
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WorldCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
4.2代码解析
4.2.1Reducer的泛型参数
- 第一泛型:mapper传递过来的key
- 第二泛型:mapper传递过来的value
- 第三泛型:Reducer输出到结果文件的key
- 第四泛型:Reducer输出到结果文件的value
4.2.2Reducer重写reduce方法
- 参数:
- 第一参数:k对应mapper解析后的单词
- 第二参数:v对应解析后的单词次数1
- 第三参数:context负责输出统计结果到结果文件
- 返回值:空
- 逻辑体: for循环对迭代器(K 1,1,1,1 格式)进行累加运算统计单词出现次数
,统计后输出到结果文件中
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
注意: - 每一次键统计就会调用一次reduce方法,hello:1,1,1 调用一次,tom:1,1,1调用一次
五:Driver代码结构解析
5.1代码结构
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
public class WorldCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //在MapReduce上需要先给这个程序申请job任务
//0.新建环境配置对象
Configuration conf = new Configuration();
//1.申请job任务
Job job = Job.getInstance(conf);
//2.指定入口类(Driver类)
job.setJarByClass(WorldCountDriver.class);
//3.指定Mapper类
job.setMapperClass(WorldCountMapper.class);
//4.指定Reducer类
job.setReducerClass(WorldCountReducer.class);
//5.声明Mapper输出结果的类型(KeyOut ValueOut)
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//6.声明Reducer的输出结果类型(KeyOut ValueOut)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//7.设置要处理的文件路径(指定job对象,指定源文件在节点中的路径)
FileInputFormat.addInputPath(job, new Path("hdfs://192.168.174.130:9000/txt/world.txt"));
//8.设置输出路径(要求该路径本来不存在)
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.174.130:9000/result"));
//9.启动执行,自动阻塞等待执行完毕
job.waitForCompletion(true);
}
} 注意: - FileInputFormat 导包导入最长的那个
- Text导入Hadoop io的包
六:最原始的运行方式
打包为jar,传入linux中,在Hadoop中执行 hadoop jar XX.jar 命令
hadoop jar XX.jar
查看1道真题和解析