通过一个例子让你了解MapReduce并行编程模型

转载自微信公众账号:开点工作室(kaidiancs)

1. MapReduce基本编程模型


MapReduce是在集群规模上面向大数据处理的并行编程模型,MapReduce编程不同于以往熟悉的编程思维。


MapReduce要求处理的数据之间没有相关性,采用对数据“分而治之”的方法,把大数据集划分为若干个小数据集,并行对小数据集处理,最后收集小数据集上的处理结果,合并成结果数据。MapReduce特别适合对大数据/文本的统计分析,例如Web访问日志分析、科技文献引用关系分析和统计、网页排序等。


【例】一个真实的数据集cite75_99.txt(专利引用数据),它来自美国国家经济研究局提供的美国专利数据,网址为http://www.nber.org/patents/。部分数据如下。


CITING(引用)  CITED(被引用) 

4738565              1001205

4795115              1001180

4903406              1001205  

4934634              1001180           

5123888              1001198

5156335              1001218

5197936              1001198     

5295940              1001198     

5439320              1001205

5544977              1001205    

¼¼

 

第一行给出列的描述,其他行记录一次专利的引用,例如第二行表示专利4738565引用了专利1001205 。文件中的各行按照引用专利排序。


 要求统计专利被引用次数,部分结果示例如下,每行记录表示专利号和被引用次数,。

 

1001198   3

1001205   4

1001209   2

1001212   2

1001213   1

1001218   1

¼¼

 

 专利被引用次数的MapReduce的基本编程模型如图1所示,图中显示了部分数据的处理过程。


MapReduce主要有6个可编程组件,分别是InputFormat、Mapper、Combiner、Partitioner、Reducer和OutputFormat,Hadoop自带了很多直接可用的InputFormat、Partitioner 和OutputFormat,很多时候用户只需要编写Mapper和Reducer就可以快速完成并行化程序设计。


图1. MapReduce基本编程模型


InputFormat是一个抽象类,完成三个任务:(1)验证作业数据的输入形式和格式。(2)将输入数据分割为逻辑意义上的数据分片(split)。(3)从数据分片中创建一条条记录(RR),把每条记录解析成键/值对(<key,value>)形式。通常文本文件中一行数据创建一条记录。


Mapper就是实现Map任务的抽象类,其中最重要的就是map( )方法。


MapReduce为每个数据分片(split)生成一个Map任务,对每条记录(RR)调用一次map( )方法。若一个Map任务处理的数据分片有n条记录,则在该Map任务中会调用n次map( )方法。map( )方法对输入的<key,value>进行处理,产生一系列新的键/值对[<key,value>]输出。


在进入Reducer处理前,必须等所有的Map任务完成,因此在进入Reducer前需要一个同步障。这个阶段也负责对map输出的数据进行收集和整理,以便Reduce节点可以完全基于本节点上的数据计算。因此,同步障中对Map输出数据的一个重要处理就是按键值的数据分区(Partitioner)。


Partitioner的作用就是对Map输出的中间结果按key进行分组,同一组的数据交给同一个Reduce节点处理,以避免Reduce节点在处理中访问其他Reduce节点。HadoopMapReduce框架中自带了一个HashPartitioner类,默认情况下,HashPartitioner对Map输出的中间结果<key,value>中的key取hash值,并按Reduce节点数取模来确定分配到哪一个Reduce节点。相同key值的中间结果一定送到同一个Reduce节点。


Reducer是对Partitioner分区后送来的一组[<key,value>]进行合并处理的抽象类,其中重要的就是reduce( )方法。MapReduce为Reduce节点上相同key的一组value值(即<key,[value]>)调用一次reduce( )方法,对<key,[value]>进行整理或处理,产生最终的结果[<key,value>]。


OutputFormat是用于描述MapReduce作业的数据输出格式和规范的抽象类,在Hadoop中以HDFS文件形式存储。


在上例中,假设数据集cite75_99.txt分为3个数据分片split,每个数据分片生成一个Map任务,3个Map任务并行执行。cite75_99.txt是个文本文件,每行为一条记录,若把记录以制表符“ \t ”分割为key和value,则在“4738565  1001205”行中,以<4738565,1001205> 为键/值对,调用map( )方法。


map( )方法从输入键/值对中提取出被引用专利号,组织成<被引用专利,1>的键/值对形式输出,“1”计数了该专利被引用一次。map( )的一次调用示例如下:

map( ):<4738565,1001205> à <1001205,1>


假设有2个Reduce节点,Partitioner把Map产生的所有中间结果<被引用专利,1>,按照“被引用专利”划分为2个区,相同的被引用专利号送到同一个Reduce节点。例如不同Map节点产生的<1001198,1>都被送到左边Reduce节点,<1001205,1>都被送到右边Reduce节点。


如图1所示,左边Reduce节点共收到3个key为1001198的数据,组织为<1001198,[1,1,1]>调用一次reduce( )。在reduce( )中,合并3个value值,得到<1001198,3>输出到结果数据中。reduce ( ) 的一次调用示例如下:


 reduce ( ):<1001198,[1,1,1]>à <1001198,3>


同理,<1001218,1>和<1001205,[1,1,1,1]>分别调用reduce( ),得到<1001218,1>和<1001205,4>输出到结果数据中。


2. 专利被引用次数统计程序代码


为了实现上述专利被引用次数统计,用户只需要编写map( )、reduce( )方法和main( ),main( )用于书写作业的配置信息。


Mapper Class:

public static class CitedMap extends Mapper<Text, Text, Text, Intwritable>

{   private Intwritable one=newIntwritable(1);

public void map( Text key, Text value, Context context)

  throwsIOException, InterruptedException

// 输入key: citing专利号 ;value:  cited专利号

{   context.write(value, one);

}   // 输出key: cited 被引专利号;value: one

}

Reducer Class:

public static class CitedReduce extends Reducer<Text, Intwritable,Text, Intwritable >

{

public void reduce(Text key, Iterable< Intwritable > values,Context context   throwsIOException, InterruptedException

{   int count = 0;

for (Intwritable val:values) {

count+=val.get( );

}

context.write(key, new IntWritable(count));

} // 输出key: 被引专利号;value: 被引次数

}


main函数代码:


public staticvoid main(String[] args) throws Exception

{    //为任务设定配置文件

 Configuration conf = new Configuration();            

 Job job = new Job(conf, “Cited”);      //新建一个用户定义的Job

 job.setJarByClass(Cited.class);    //设置执行任务的jar

 job.setMapperClass(CitedMap.class);  //设置Mapper类

 job.setReducerClass(CitedReduce.class);     //设置Reducer类

 job.setOutputKeyClass(Text.class);            //设置job输出的key

 job.setOutputValueClass(IntWritable.class);       //设置job输出的value

 job.setInputFormatClass(keyValueTextInputFormat.class);//设置输入格式

 FileInputFormat.addInputPath(job, new Path(args[0]));//设置输入文件的路径

 FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置输出文件的路径   

 System.exit(job.waitForCompletion(true) ? 0 :1);    //提交任务并等待任务完成   

}


3. 在MapReduce程序中Combiner组件的作用


 Map将中间结果存在节点所在的本地文件中,Reduce对送到节点的数据进行合并处理,Partitioner实现了数据在Map节点和Reduce节点之间分发的过程。因此,在MapReduce中,从Map阶段到Reduce阶段,有大量的数据需要在集群的网络中传输,引起较大的网络带宽开销,甚至会带来MapReduce程序的性能瓶颈。


如果能对Map生成的中间结果进行本地合并,即将那些key值相同的一组键值对归并为一个键值对,势必能减轻网络压力,提高MapReduce程序效率。


Hadoop在Mapper类结束后,Partitioner分区之前,使用一个Combiner类来解决相同key的键值对合并问题。MapReduce会使用Combiner类对Map阶段产生的中间结果进行多次处理。Combiner的作用就是合并Mapper的输出,减少网络带宽和Reduce节点上的负载。


Combiner类在实现上类似于Reducer类,实际中,常常就直接使用Reducer类。如上题中,可以在main( )中加入如下语句:


job.setCombinerClass(CitedReduce.class);    //设置Combine类


此时专利被引用次数统计的MapReduce框架如图2所示。

图2. 增加了Combiner的MapReduce框架


《横扫offer---程序员招聘真题详解700题》,开点工作室著,清华大学出版社出版,天猫、京东等各大网上书店及实体书店均已开始发售。

全部评论

相关推荐

网安已死趁早转行:山东这地方有点说法
点赞 评论 收藏
分享
评论
3
16
分享

创作者周榜

更多
牛客网
牛客企业服务