Hadoop-MapReduce实战(WordCount)

WordCount案例

  • 需求1:统计一堆文件中单词出现的个数

    在一堆给定的文本文件中统计输出每一个单词出现的总次数

    • 数据准备:

    • 分析

      ​ 按照mapreduce编程规范,分别编写Mapper,Reducer,Driver。

      分析

  • 编写mapper类


import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
   
	
	Text k = new Text();
	IntWritable v = new IntWritable(1);
	
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
   
		
		// 1 获取一行
		String line = value.toString();
		
		// 2 切割
		String[] words = line.split(" ");
		
		// 3 输出
		for (String word : words) {
   
			
			k.set(word);
			context.write(k, v);
		}
	}
}

编写reducer类

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
   

	@Override
	protected void reduce(Text key, Iterable<IntWritable> value,
			Context context) throws IOException, InterruptedException {
   
		
		// 1 累加求和
		int sum = 0;
		for (IntWritable count : value) {
   
			sum += count.get();
		}
		
		// 2 输出
		context.write(key, new IntWritable(sum));
	}
}

编写驱动类

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {
   

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
   

        String[] args=new String{
   “输入路径”,”输出路径”};

		// 1 获取配置信息
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);

		// 2 设置jar加载路径
		job.setJarByClass(WordcountDriver.class);

		// 3 设置map和Reduce类
		job.setMapperClass(WordcountMapper.class);
		job.setReducerClass(WordcountReducer.class);

		// 4 设置map输出
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		// 5 设置Reduce输出
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		// 6 设置输入和输出路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 7 提交
		boolean result = job.waitForCompletion(true);

		System.exit(result ? 0 : 1);
	}
}

需求2:把单词按照ASCII码奇偶分区(Partitioner)

  • 分析
  • 自定义分区
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class WordCountPartitioner extends Partitioner<Text, IntWritable>{
   

	@Override
	public int getPartition(Text key, IntWritable value, int numPartitions) {
   
		
		// 1 获取单词key 
		String firWord = key.toString().substring(0, 1);
		//String -> int
        int result = Integer.valueOf(firWord);

		// 2 根据奇数偶数分区
		if (result % 2 == 0) {
   
			return 0;
		}else {
   
			return 1;
		}
	}
}

在驱动中配置加载分区,设置reducetask个数

job.setPartitionerClass(WordCountPartitioner.class);
job.setNumReduceTasks(2);

需求3:对每一个maptask的输出局部汇总(Combiner)

统计过程中对每一个maptask的输出进行局部汇总,以减小网络传输量即采用Combiner功能

  • 数据准备

    方案一

    • 增加一个WordcountCombiner类继承Reducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
   

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Context context) throws IOException, InterruptedException {
   
        // 1 汇总
		int count = 0;
		for(IntWritable v :values){
   
			count += v.get();
		}
		// 2 写出
		context.write(key, new IntWritable(count));
	}
}

在WordcountDriver驱动类中指定combiner

// 9 指定需要使用combiner,以及用哪个类作为combiner的逻辑
job.setCombinerClass(WordcountCombiner.class);

方案二

  • 将WordcountReducer作为combiner在WordcountDriver驱动类中指定
// 指定需要使用combiner,以及用哪个类作为combiner的逻辑
job.setCombinerClass(WordcountReducer.class);

运行程序
需求4:大量小文件的切片优化(CombineTextInputFormat)

在分布式的架构中,分布式文件系统HDFS,和分布式运算程序编程框架mapreduce。

HDFS:不怕大文件,怕很多小文件

mapreduce :怕数据倾斜

那么mapreduce是如果解决多个小文件的问题呢?

mapreduce关于大量小文件的优化策略

  • 默认情况下,TextInputFormat对任务的切片机制是按照文件规划切片,不管有多少个小文件,都会是单独的切片,都会交给一个maptask,这样,如果有大量的小文件就会产生大量的maptask,处理效率极端底下

  • 优化策略

    最好的方法:在数据处理的最前端(预处理、采集),就将小文件合并成大文件,在上传到HDFS做后续的分析

补救措施:如果已经是大量的小文件在HDFS中了,可以使用另一种inputformat来做切片(CombineFileInputformat),它的切片逻辑跟TextInputformat

注:combineTextInputFormat是CombineFileInputformat的子类

不同:

它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个maptask了

/如果不设置InputFormat,它默认的用的是TextInputFormat.class
		/*CombineTextInputFormat为系统自带的组件类 * setMinInputSplitSize 中的2048是表示n个小文件之和不能大于2048 * setMaxInputSplitSize 中的4096是 当满足setMinInputSplitSize中的2048情况下 在满足n+1个小文件之和不能大于4096 */
		job.setInputFormatClass(CombineTextInputFormat.class);
		CombineTextInputFormat.setMinInputSplitSize(job, 2048);
		CombineTextInputFormat.setMaxInputSplitSize(job,4096);
  • 输入数据:准备5个小文件
  • 实现过程
    • 不做任何处理,运行需求1中的wordcount程序,观察切片个数为5
  • 文件大小 < MinSplit < MaxSplit number of splits:1
    MinSplit < 文件大小 < MaxSplit number of splits:1
    MaxSplit < 文件大小 < 2*MaxSplit number of splits:2
    2 * MaxSplit < 文件大小 number of splits:3
    测试大小 最大MB 文件大小和最大值倍数 Splits
    4.97MB 3MB 1.65倍 2
    4.1MB 3MB 1.36 1
    6.51 3MB 2.17 3
全部评论

相关推荐

上周组里招人,我面了六个候选人,回来跟同事吃饭的时候聊起一个让我挺感慨的现象。前三个候选人,算法题写得都不错。第一道二分查找,五分钟之内给出解法,边界条件也处理得干净。第二道动态规划,状态转移方程写对了,空间复杂度也优化了一版。我翻他们的简历,力扣刷题量都在300以上。后三个呢,就有点参差不齐了。有的边界条件没处理好,有的直接说这道题没刷过能不能换个思路讲讲。其中有一个女生,我印象特别深——她拿到题之后没有马上写,而是先问我:“面试官,我能先跟你确认一下我对题目的理解吗?”然后她把自己的思路讲了一遍,虽然最后代码写得不是最优解,但整个沟通过程非常顺畅。这个女生的代码不是最优的,但当我问她“如果这里是线上环境,你会怎么设计’的时候,她给我讲了一套完整的方案——异常怎么处理、日志怎么打、怎么平滑发布。她对这是之前在实习的时候踩过的坑。”我在想LeetCode到底在筛选什么?我自己的经历可能有点代表性。我当年校招的时候,也是刷了三百多道题才敢去面试。那时候大家都刷,你不刷就过不了笔试关。后来工作了,前三年基本没再打开过力扣。真正干活的时候,没人让你写反转链表,也没人让你手撕红黑树。更多的是:这个接口为什么慢了、那个服务为什么OOM了、线上数据对不上了得排查一下。所以后来我当面试官,慢慢调整了自己的评判标准。算法题我还会出,但目的变了。我出算法题,不是想看你能不能背出最优解。而是想看你拿到一个陌生问题的时候,是怎么思考的。你会先理清题意吗?你会主动问边界条件吗?你想不出来的时候会怎么办?你写出来的代码,变量命名乱不乱、结构清不清楚?这些才是工作中真正用得到的能力。LeetCode是一个工具,不是目的。它帮你熟悉数据结构和常见算法思路,这没问题。但如果你刷了三百道题,却说不清楚自己的项目解决了什么问题、遇到了什么困难、你是怎么解决的,那这三百道题可能真的白刷了。所以还要不要刷LeetCode?要刷,但别只刷题。刷题的时候,多问自己几个为什么:为什么用这个数据结构?为什么这个解法比那个好?如果换个条件,解法还成立吗?把刷题当成锻炼思维的方式,而不是背答案的任务。毕竟面试官想看到的,从来不是一台背题机器,而是一个能解决问题的人。
国企上岸了的向宇同桌...:最害怕答非所问了,但是频繁反问确定意思又害怕面试官觉得我笨
AI时代还有必要刷lee...
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务