MapReduce(wordcount案例)

一:MapReduce数据处理流程

  • 需求描述:从源文件中统计所有单词的出现个数
  • 数据处理流程示意图:
  • 注意:
  1. mapper是一行行读取数据的,读取一行数据就调用一次map方法
  2. 相同key的数据会被整合后再交给Reduce
  3. 每一个键就会调用一次reduce方法

  • mapreduce中包含的节点:JobTracker和TaskTracker

二:源数据

hello tom hello bob
hello joy
hello rose
hello joy
hello jerry
hello tom
hello rose
hello joy


三:map代码结构解析

3.1map代码结构

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WorldCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		String[] arr = value.toString().split(" ");
		for (String str : arr) {
			context.write(new Text(str), new IntWritable(1));
		}
	}
}

3.2代码分析

3.2.1 mapper的泛型

  • 第一参数:KEYIN,指的是map解析源数据时接受到的key,在此处默认为偏移量,偏移量以字节量为单位,用于记录该行数据在文件中的位置,如图所示:

  • 第二参数:VALUEIN,指的是map解析源数据时接受到的value,不指定的化默认就是这一行的数据,也就是上图中的字符串

  • 第三参数:KEYOUT,指的是map结束后交给reducer的数据的key,当前案例中,输出给Reduce的是单词

  • 第四参数:VALUEOUT,指的是map结束后交给reducer的数据的value,当前案例中,输出的的值是次数

3.2.2 泛型种类

  • java中long类型对应mapreduce中的LongWritable
  • java中int类型对应mapreduce中的IntWritable
  • java中String类型对应mapreduce中的Text

3.2.3map方法实现逻辑

  • 参数:第一第二参数与所继承的Mapper泛型类型一致,第三参数context,负责把结果写入Reduce中
  • 返回类型:空
  • 方法体逻辑:
		String[] arr = value.toString().split(" ");
		for (String str : arr) {
			context.write(new Text(str), new IntWritable(1));
		}
  1. key此处用处不大直接忽略
  2. mapper是一行行处理数据的,第一行数据读取进入mapper后,value就是第一行的字符串,此时调用map方法处理该字符串,首先将该行字符串转化为String后拆分(text转化为string)
  3. 每拆分一次就计数1,也就是 0 hello tom hello bob 转变为  hello 1 tom 1 hello 1 bob 1 的过程                          

注意:
  1.     相同key的数据会被整合后再交给Reducer                             
  2.     mapper是一行行读取数据的,读取一行数据就调用一次map方法
  

四:reducer代码结构解析

4.1Reducer代码结构

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WorldCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable val : values) {
			sum += val.get();
		}
		context.write(key, new IntWritable(sum));
	}
}

4.2代码解析

4.2.1Reducer的泛型参数

  • 第一泛型:mapper传递过来的key
  • 第二泛型:mapper传递过来的value
  • 第三泛型:Reducer输出到结果文件的key
  • 第四泛型:Reducer输出到结果文件的value

4.2.2Reducer重写reduce方法

  • 参数:  
  1. 第一参数:k对应mapper解析后的单词
  2. 第二参数:v对应解析后的单词次数1 
  3. 第三参数:context负责输出统计结果到结果文件
  • 返回值:空
  • 逻辑体: for循环对迭代器(K  1,1,1,1 格式)进行累加运算统计单词出现次数,统计后输出到结果文件中
                int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
注意:
  1. 每一次键统计就会调用一次reduce方法,hello:1,1,1 调用一次,tom:1,1,1调用一次           

五:Driver代码结构解析

    5.1代码结构

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;

public class WorldCountDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //在MapReduce上需要先给这个程序申请job任务
		//0.新建环境配置对象
                Configuration conf = new Configuration();
                //1.申请job任务    
		Job job = Job.getInstance(conf);
        
		//2.指定入口类(Driver类)
		job.setJarByClass(WorldCountDriver.class);
		//3.指定Mapper类
		job.setMapperClass(WorldCountMapper.class);
		//4.指定Reducer类
		job.setReducerClass(WorldCountReducer.class);
		
                //5.声明Mapper输出结果的类型(KeyOut ValueOut)
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		//6.声明Reducer的输出结果类型(KeyOut ValueOut)
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		//7.设置要处理的文件路径(指定job对象,指定源文件在节点中的路径)
		FileInputFormat.addInputPath(job, new Path("hdfs://192.168.174.130:9000/txt/world.txt"));
		//8.设置输出路径(要求该路径本来不存在)
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.174.130:9000/result"));
		
                //9.启动执行,自动阻塞等待执行完毕
		job.waitForCompletion(true);
	}
}
注意:
  1. FileInputFormat 导包导入最长的那个  
  2. Text导入Hadoop io的包

六:最原始的运行方式

打包为jar,传入linux中,在Hadoop中执行 hadoop jar XX.jar 命令
hadoop jar XX.jar  



全部评论

相关推荐

10-10 16:30
济宁学院 Java
一表renzha:面试官:蓝桥杯三等奖?你多去两次厕所都能拿二等吧
点赞 评论 收藏
分享
09-03 23:20
已编辑
南京邮电大学 UE4
点赞 评论 收藏
分享
评论
点赞
1
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务