通过一个例子让你了解MapReduce并行编程模型
MapReduce是在集群规模上面向大数据处理的并行编程模型,MapReduce编程不同于以往熟悉的编程思维。
MapReduce要求处理的数据之间没有相关性,采用对数据“分而治之”的方法,把大数据集划分为若干个小数据集,并行对小数据集处理,最后收集小数据集上的处理结果,合并成结果数据。MapReduce特别适合对大数据/文本的统计分析,例如Web访问日志分析、科技文献引用关系分析和统计、网页排序等。
【例】一个真实的数据集cite75_99.txt(专利引用数据),它来自美国国家经济研究局提供的美国专利数据,网址为http://www.nber.org/patents/。部分数据如下。
CITING(引用) CITED(被引用)
4738565 1001205
4795115 1001180
4903406 1001205
4934634 1001180
5123888 1001198
5156335 1001218
5197936 1001198
5295940 1001198
5439320 1001205
5544977 1001205
¼¼
第一行给出列的描述,其他行记录一次专利的引用,例如第二行表示专利4738565引用了专利1001205 。文件中的各行按照引用专利排序。
要求统计专利被引用次数,部分结果示例如下,每行记录表示专利号和被引用次数,。
1001198 3
1001205 4
1001209 2
1001212 2
1001213 1
1001218 1
¼¼
专利被引用次数的MapReduce的基本编程模型如图1所示,图中显示了部分数据的处理过程。
MapReduce主要有6个可编程组件,分别是InputFormat、Mapper、Combiner、Partitioner、Reducer和OutputFormat,Hadoop自带了很多直接可用的InputFormat、Partitioner 和OutputFormat,很多时候用户只需要编写Mapper和Reducer就可以快速完成并行化程序设计。
图1. MapReduce基本编程模型
InputFormat是一个抽象类,完成三个任务:(1)验证作业数据的输入形式和格式。(2)将输入数据分割为逻辑意义上的数据分片(split)。(3)从数据分片中创建一条条记录(RR),把每条记录解析成键/值对(<key,value>)形式。通常文本文件中一行数据创建一条记录。
Mapper就是实现Map任务的抽象类,其中最重要的就是map( )方法。
MapReduce为每个数据分片(split)生成一个Map任务,对每条记录(RR)调用一次map( )方法。若一个Map任务处理的数据分片有n条记录,则在该Map任务中会调用n次map( )方法。map( )方法对输入的<key,value>进行处理,产生一系列新的键/值对[<key,value>]输出。
在进入Reducer处理前,必须等所有的Map任务完成,因此在进入Reducer前需要一个同步障。这个阶段也负责对map输出的数据进行收集和整理,以便Reduce节点可以完全基于本节点上的数据计算。因此,同步障中对Map输出数据的一个重要处理就是按键值的数据分区(Partitioner)。
Partitioner的作用就是对Map输出的中间结果按key进行分组,同一组的数据交给同一个Reduce节点处理,以避免Reduce节点在处理中访问其他Reduce节点。HadoopMapReduce框架中自带了一个HashPartitioner类,默认情况下,HashPartitioner对Map输出的中间结果<key,value>中的key取hash值,并按Reduce节点数取模来确定分配到哪一个Reduce节点。相同key值的中间结果一定送到同一个Reduce节点。
Reducer是对Partitioner分区后送来的一组[<key,value>]进行合并处理的抽象类,其中重要的就是reduce( )方法。MapReduce为Reduce节点上相同key的一组value值(即<key,[value]>)调用一次reduce( )方法,对<key,[value]>进行整理或处理,产生最终的结果[<key,value>]。
OutputFormat是用于描述MapReduce作业的数据输出格式和规范的抽象类,在Hadoop中以HDFS文件形式存储。
在上例中,假设数据集cite75_99.txt分为3个数据分片split,每个数据分片生成一个Map任务,3个Map任务并行执行。cite75_99.txt是个文本文件,每行为一条记录,若把记录以制表符“ \t ”分割为key和value,则在“4738565 1001205”行中,以<4738565,1001205> 为键/值对,调用map( )方法。
map( )方法从输入键/值对中提取出被引用专利号,组织成<被引用专利,1>的键/值对形式输出,“1”计数了该专利被引用一次。map( )的一次调用示例如下:
map( ):<4738565,1001205> à <1001205,1>
假设有2个Reduce节点,Partitioner把Map产生的所有中间结果<被引用专利,1>,按照“被引用专利”划分为2个区,相同的被引用专利号送到同一个Reduce节点。例如不同Map节点产生的<1001198,1>都被送到左边Reduce节点,<1001205,1>都被送到右边Reduce节点。
如图1所示,左边Reduce节点共收到3个key为1001198的数据,组织为<1001198,[1,1,1]>调用一次reduce( )。在reduce( )中,合并3个value值,得到<1001198,3>输出到结果数据中。reduce ( ) 的一次调用示例如下:
reduce ( ):<1001198,[1,1,1]>à <1001198,3>
同理,<1001218,1>和<1001205,[1,1,1,1]>分别调用reduce( ),得到<1001218,1>和<1001205,4>输出到结果数据中。
2. 专利被引用次数统计程序代码
为了实现上述专利被引用次数统计,用户只需要编写map( )、reduce( )方法和main( ),main( )用于书写作业的配置信息。
Mapper Class:
public static class CitedMap extends Mapper<Text, Text, Text, Intwritable>
{ private Intwritable one=newIntwritable(1);
public void map( Text key, Text value, Context context)
throwsIOException, InterruptedException
// 输入key: citing专利号 ;value: cited专利号
{ context.write(value, one);
} // 输出key: cited 被引专利号;value: one
}
Reducer Class:
public static class CitedReduce extends Reducer<Text, Intwritable,Text, Intwritable >
{
public void reduce(Text key, Iterable< Intwritable > values,Context context throwsIOException, InterruptedException
{ int count = 0;
for (Intwritable val:values) {
count+=val.get( );
}
context.write(key, new IntWritable(count));
} // 输出key: 被引专利号;value: 被引次数
}
main函数代码:
public staticvoid main(String[] args) throws Exception
{ //为任务设定配置文件
Configuration conf = new Configuration();
Job job = new Job(conf, “Cited”); //新建一个用户定义的Job
job.setJarByClass(Cited.class); //设置执行任务的jar
job.setMapperClass(CitedMap.class); //设置Mapper类
job.setReducerClass(CitedReduce.class); //设置Reducer类
job.setOutputKeyClass(Text.class); //设置job输出的key
job.setOutputValueClass(IntWritable.class); //设置job输出的value
job.setInputFormatClass(keyValueTextInputFormat.class);//设置输入格式
FileInputFormat.addInputPath(job, new Path(args[0]));//设置输入文件的路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置输出文件的路径
System.exit(job.waitForCompletion(true) ? 0 :1); //提交任务并等待任务完成
}
3. 在MapReduce程序中Combiner组件的作用
Map将中间结果存在节点所在的本地文件中,Reduce对送到节点的数据进行合并处理,Partitioner实现了数据在Map节点和Reduce节点之间分发的过程。因此,在MapReduce中,从Map阶段到Reduce阶段,有大量的数据需要在集群的网络中传输,引起较大的网络带宽开销,甚至会带来MapReduce程序的性能瓶颈。
如果能对Map生成的中间结果进行本地合并,即将那些key值相同的一组键值对归并为一个键值对,势必能减轻网络压力,提高MapReduce程序效率。
Hadoop在Mapper类结束后,Partitioner分区之前,使用一个Combiner类来解决相同key的键值对合并问题。MapReduce会使用Combiner类对Map阶段产生的中间结果进行多次处理。Combiner的作用就是合并Mapper的输出,减少网络带宽和Reduce节点上的负载。
Combiner类在实现上类似于Reducer类,实际中,常常就直接使用Reducer类。如上题中,可以在main( )中加入如下语句:
job.setCombinerClass(CitedReduce.class); //设置Combine类
此时专利被引用次数统计的MapReduce框架如图2所示。
图2. 增加了Combiner的MapReduce框架
《横扫offer---程序员招聘真题详解700题》,开点工作室著,清华大学出版社出版,天猫、京东等各大网上书店及实体书店均已开始发售。