《Hadoop实战》第六章MapReduce工作机制
1.错误处理机制
(1)硬件故障
从MapReduce任务的执行角度出发,所涉及的硬件主要是JobTracker和TaskTracker(对应从HDFS出发就是NameNode和DataNode)。显然硬件故障就是JobTracker机器故障和TaskTracker机器故障。
在Hadoop集群中,任何时候都只有唯一一个JobTracker.所以JobTracker故障就是单点故障,这是所有错误中最严重的。到目前为止,在Hadoop中还没有相应的解决办法。能够想到的是通过创建多个备用JobTracker节点,在主JobTracker失败之后采用领导***算法(Hadoop中常用的一种确定Master的算法)来重新确定JobTracker节点。一些企业使用Hadoop提供服务时,就采用了这样的方法来避免JobTracker错误。
机器故障除了JobTracker错误就是'TaskTracker错误。TaskTracker 故障相对较为常见,并且MapReduce也有相应的解决办法,主要是重新执行任务。下面将详细介绍当作业遇到TaskTracker错误时, MapReduce所采取的解决步骤。
在Hadoop中,正常情况下, TaskTracker会不断地与系统JobTracker通过心跳机制进行通信。如果某TaskTracker出现故障或运行缓慢,它会停止或者很少向JobTracker发送心跳。 如果一个TaskTracker在一定时间内(默认是1分钟)没有与JobTracker通信,那么JobTracker会将此TaskTracker从等待任务调度的TaskTracker集合中移除。 同时JobTracker会要求此TaskTracker上的任务立刻返回,如果此TaskTracker任务是仍然在mapping阶段的Map任务, 那么JobTracker会要求其他的TaskTracker重新执行所有原本由故障TaskTracker执行的Map任务。如果任务是在Reduce阶段的Reduce任务, 那么JobTracker会要求其他TashTracker重新执行故障TaskTracker未完成的Reduce任务。比如,一个TaskTracker已经完成被分配的三个Reduce任务中的两个, 因为Reduce任务一旦完成就会将数据写到HDFS上,所以只有第三个未完成的Reduce需要重新执行。但是对于Map任务来说, 即使TashTracker完成了部分Map, Reduce 仍可能无法获取此节点上所有Map的所有输出。所以无论Map任务完成与否,故障TashTracker.上的Map任务都必须重新执行。
在Hadoop集群中,任何时候都只有唯一一个JobTracker.所以JobTracker故障就是单点故障,这是所有错误中最严重的。到目前为止,在Hadoop中还没有相应的解决办法。能够想到的是通过创建多个备用JobTracker节点,在主JobTracker失败之后采用领导***算法(Hadoop中常用的一种确定Master的算法)来重新确定JobTracker节点。一些企业使用Hadoop提供服务时,就采用了这样的方法来避免JobTracker错误。
机器故障除了JobTracker错误就是'TaskTracker错误。TaskTracker 故障相对较为常见,并且MapReduce也有相应的解决办法,主要是重新执行任务。下面将详细介绍当作业遇到TaskTracker错误时, MapReduce所采取的解决步骤。
在Hadoop中,正常情况下, TaskTracker会不断地与系统JobTracker通过心跳机制进行通信。如果某TaskTracker出现故障或运行缓慢,它会停止或者很少向JobTracker发送心跳。 如果一个TaskTracker在一定时间内(默认是1分钟)没有与JobTracker通信,那么JobTracker会将此TaskTracker从等待任务调度的TaskTracker集合中移除。 同时JobTracker会要求此TaskTracker上的任务立刻返回,如果此TaskTracker任务是仍然在mapping阶段的Map任务, 那么JobTracker会要求其他的TaskTracker重新执行所有原本由故障TaskTracker执行的Map任务。如果任务是在Reduce阶段的Reduce任务, 那么JobTracker会要求其他TashTracker重新执行故障TaskTracker未完成的Reduce任务。比如,一个TaskTracker已经完成被分配的三个Reduce任务中的两个, 因为Reduce任务一旦完成就会将数据写到HDFS上,所以只有第三个未完成的Reduce需要重新执行。但是对于Map任务来说, 即使TashTracker完成了部分Map, Reduce 仍可能无法获取此节点上所有Map的所有输出。所以无论Map任务完成与否,故障TashTracker.上的Map任务都必须重新执行。
(2)任务失败
在实际任务中,MapReduce作业还会遇到用户代码缺陷或进程崩溃引起的任务失败等情况。 用户代码缺陷会导致它在执行过程中抛出异常。此时,任务JVM进程会自动退出,并向TashTracker父进程发送错误消息,同时错误消息也会写人log文件,最后TasKTracker将此次任务尝试标记失败。对于进程崩溃引起的任务失败,TashTracker的监听程序会发现进程退出,此时TaskTracker也会将此次任务尝试标记为失败。对于死循环程序或执行时间太长的程序,由于TashTracker没有接收到进度更新,它也会将此次任务尝试标记为失败, 并杀死程序对应的进程。在以上情况中,TaskTracker 将任务尝试标记为失败之后会将TaskTracker 自身的任务计数器减1, 以便向JobTracker申请新的任务。TaskTracker 也会通过心跳机制告诉JobTracker本地的一一个任务尝试失败。 JobTracker接到任务失败的通知后,通过重置任务状态,将其加人到调度队列来重新分配该任务执行(JobTracker会尝试避免将失败的任务再次分配给运行失败的TaskTracker)。如果此任务尝试了4次(次数可以进行设置)仍没有完成,就不会再被重试,此时整个作业也就失败了。
2.作业调度机制
2.作业调度机制
在0.19.0版本之前,Hadoop 集群上的用户作业采用先进先出(FIFO, First Input FirstOutput)调度算法,即按照作业提交的顺序来运行。同时每个作业都会使用整个集群,因此它们只有轮到自己运行才能享受整个集群的服务。虽然FIFO调度器最后又支持了设置优先级的功能,但是由于不支持优先级抢占,所以这种单用户的调度算法仍然不符合云计算中采用并行计算来提供服务的宗旨。从0.19.0版本开始,Hadoop 除了默认的FIFO调度器外,还提供了支持多用户同时服务和集群资源公平共享的调度器,即公平调度器(Fair SchedulerGuide)和容量调度器(Capacity Scheduler Guide)。下面主要介绍公平调度器。
公平调度是为作业分配资源的方法,其目的是随着时间的推移,让提交的作业获取等量的集群共享资源,让用户公平地共享集群。具体做法是:当集群上只有一一个作业在运行时,它将使用整个集群;当有其他作业提交时,系统会将TaskTracker节点空闲时间片分配给这些新的作业,并保证每一个作业都得到大概等量的CPU时间。
公平调度器按作业池来组织作业,它会按照提交作业的用户数目将资源公平地分到这些作业池里。默认情况下,每一个用户拥有一个独立的作业池,以使每个用户都能获得- -份等同的集群资源而不会管它们提交了多少作业。在每一一个资源池内,会用公平共享的方法在运行作业之间共享容量。除了提供公平共享方法外,公平调度器还允许为作业池设置最小的共享资源,以确保特定用户、群组或生产应用程序总能获取到足够的资源。对于设置了最小共享资源的作业池来说,如果包含了作业,它至少能获取到最小的共享资源。但是如果最小共享资源超过作业需要的资源时,额外的资源会在其他作业池间进行切分。.
公平调度是为作业分配资源的方法,其目的是随着时间的推移,让提交的作业获取等量的集群共享资源,让用户公平地共享集群。具体做法是:当集群上只有一一个作业在运行时,它将使用整个集群;当有其他作业提交时,系统会将TaskTracker节点空闲时间片分配给这些新的作业,并保证每一个作业都得到大概等量的CPU时间。
公平调度器按作业池来组织作业,它会按照提交作业的用户数目将资源公平地分到这些作业池里。默认情况下,每一个用户拥有一个独立的作业池,以使每个用户都能获得- -份等同的集群资源而不会管它们提交了多少作业。在每一一个资源池内,会用公平共享的方法在运行作业之间共享容量。除了提供公平共享方法外,公平调度器还允许为作业池设置最小的共享资源,以确保特定用户、群组或生产应用程序总能获取到足够的资源。对于设置了最小共享资源的作业池来说,如果包含了作业,它至少能获取到最小的共享资源。但是如果最小共享资源超过作业需要的资源时,额外的资源会在其他作业池间进行切分。.
在常规操作中,当提交一个新作业时,公平调度器会等待已运行作业中的任务完成,以释放时间片给新的作业。但公平调度器也支持作业抢占。如果新的作业在一定时间(即超时时间, 可以配置)内还未获取公平的资源分配,公平调度器就会允许这个作业抢占已运行作业中的任务,以获取运行所需要的资源。另外, 如果作业在超时时间内获取的资源不到公平共享资源的一半时,也允许对任务进行抢占。而在选择时,公平调度器会在所有运行任务中选择最近运行起来的任务,这样浪费的计算相对较少。由于Hadoop作业能容忍丢失任务,抢占不会导致被抢占的作业失败,只是让被抢占作业的运行时间更长。
最后,公平调度器还可以限制每个用户和每个作业池并发运行的作业数量。这个限制可以在用户一次性提交数百个作业或当大量作业并发执行时用来确保中间数据不会塞满集群上的磁盘空间。超出限制的作业会被列入调度器的队列中进行等待,直到早期作业运行完毕。公平调度器再根据作业优先权和提交时间的排列情况从等待作业中调度即将运行的作业。
最后,公平调度器还可以限制每个用户和每个作业池并发运行的作业数量。这个限制可以在用户一次性提交数百个作业或当大量作业并发执行时用来确保中间数据不会塞满集群上的磁盘空间。超出限制的作业会被列入调度器的队列中进行等待,直到早期作业运行完毕。公平调度器再根据作业优先权和提交时间的排列情况从等待作业中调度即将运行的作业。
3.Shuffle
从前面的介绍中我们得知,Map的输出会经过一个名为shuffle的过程交给Reduce处理(在“MapReduce数据流”图中也可以看出),当然也有Map的结果经过sort-merge交给Reduce处理的。其实在MapReduce流程中,为了让Reduce可以并行处理Map结果,必须对Map的输出进行一定的排序和分割,然后再交给对应的Reduce,而这个将Map输出进行进一步整理并交给Reduce的过程就成为了shuffle. 从shuffle的过程可以看出,它是MapReduce的核心所在,shuffle 过程的性能与整个MapReduce的性能直接相关。
总体来说,shuffle过程包含在Map和Reduce两端中。在Map端的shuffle过程是对Map的结果进行划分(partition)、 排序(sort)和分割(spill), 然后将属于同一个划分的输出合并在一起(merge)并写在磁盘上, 同时按照不同的划分将结果发送给对应的Reduce(Map输出的划分与Reduce的对应关系由JobTracker确定)。Reduce端又会将各个Map送来的属于同一个划分的输出进行合并(merge), 然后对merge的结果进行排序,最后交给Reduce处理。’下 面将从Map和Reduce两端详细介绍shuffle过程。
总体来说,shuffle过程包含在Map和Reduce两端中。在Map端的shuffle过程是对Map的结果进行划分(partition)、 排序(sort)和分割(spill), 然后将属于同一个划分的输出合并在一起(merge)并写在磁盘上, 同时按照不同的划分将结果发送给对应的Reduce(Map输出的划分与Reduce的对应关系由JobTracker确定)。Reduce端又会将各个Map送来的属于同一个划分的输出进行合并(merge), 然后对merge的结果进行排序,最后交给Reduce处理。’下 面将从Map和Reduce两端详细介绍shuffle过程。
(1)Map端
从MapReduce的程序中可以看出,Map的输出结果是由collector处理的,所以Map端的shuffle过程包含在collect函数对Map输出结果的处理过程中。下面从具体的代码来分析Map端的shuffle过程。
首先从collect函数的代码入手(MapTask 类)。从下面的代码段可以看出Map函数的输出内存缓冲区是一个环形结构。
final int kvnext = (kvindex + 1)鲁kvoffsets. length;
当输出内存缓冲区内容达到设定的阈值时,就需要把缓冲区内容分割(spill) 到磁盘中。但是在分割的时候Map并不会阻止继续向缓冲区中写入结果,如果Map结果生成的速度快于写出速度,那么缓仲区会写满,这时Map任务必须等待,直到分割写出过程结束。
首先从collect函数的代码入手(MapTask 类)。从下面的代码段可以看出Map函数的输出内存缓冲区是一个环形结构。
final int kvnext = (kvindex + 1)鲁kvoffsets. length;
当输出内存缓冲区内容达到设定的阈值时,就需要把缓冲区内容分割(spill) 到磁盘中。但是在分割的时候Map并不会阻止继续向缓冲区中写入结果,如果Map结果生成的速度快于写出速度,那么缓仲区会写满,这时Map任务必须等待,直到分割写出过程结束。
(2)Reduce端
在Reduce端,shuffle阶段可以分成三个阶段:复制Map输出、排序合并和Reduce处理。下面按照这三个阶段进行详细介绍。
如前文所述,Map任务成功完成后,会通知父TashTracker状态已更新,TaskTracker 进而通知JobTracker (这些通知在心跳机制中进行)。所以,对于指定作业来说,JobTracker 能够记录Map输出和TaskTracker的映射关系。Reduce会定期向JobTracker获取Map的输出位置。一日拿到输出位置, Reduce任务就会从此输出对应的TaskTracker.上复制输出到本地(如果Map的输出很小,则会被复制到执行Reduce任务的TaskTracker节点的内存中, 便于进一步处理,否则会放人磁盘),而不会等到所有的Map任务结束。这就是Reduce任务的复制阶段。
在Reduce复制Map的输出结果的同时,Reduce 任务就进入了合并(merge) 阶段。这一阶段主要的任务是将从各个MapTaskTracker.上复制的Map输出文件(无论在内存还是在磁盘)进行整合,并维持数据原来的顺序。reduce端的最后阶段就是对合并的文件进行reduce处理。
如前文所述,Map任务成功完成后,会通知父TashTracker状态已更新,TaskTracker 进而通知JobTracker (这些通知在心跳机制中进行)。所以,对于指定作业来说,JobTracker 能够记录Map输出和TaskTracker的映射关系。Reduce会定期向JobTracker获取Map的输出位置。一日拿到输出位置, Reduce任务就会从此输出对应的TaskTracker.上复制输出到本地(如果Map的输出很小,则会被复制到执行Reduce任务的TaskTracker节点的内存中, 便于进一步处理,否则会放人磁盘),而不会等到所有的Map任务结束。这就是Reduce任务的复制阶段。
在Reduce复制Map的输出结果的同时,Reduce 任务就进入了合并(merge) 阶段。这一阶段主要的任务是将从各个MapTaskTracker.上复制的Map输出文件(无论在内存还是在磁盘)进行整合,并维持数据原来的顺序。reduce端的最后阶段就是对合并的文件进行reduce处理。
(3)shuffle过程的优化
熟悉了上面介绍的shuffle过程,可能有读者会说: 这个shuffle过程不是最优的。是的,Hadoop采用的shuffle过程并不是最优的。举个简单的例子,如果现在需要Hadoop集群完成两个集合的并操作,事实上并操作只需要让两个集群中重复的元素在最后的结果中出现一次就可以了,并不要求结果的元素是按顺序排列的。但是如果使用Hadoop默认的shuffle过程,那么结果势必是排好序的, 显然这个处理就不是必须的了。在这里简单介绍从Hadoop参数的配置出发来优化shuffle过程。在一个任务中,完成单位任务使用时间最多的一般都是I/O操作。在Map端,主要就是shuffle阶段中缓冲区内容超过阈值后的写出操作。可以通过合理地设置ip.sort.*属性来减少这种情况下的写出次数,具体来说就是增加io.sort.mb的值。在Reduce端,在复制Map输出的时候直接将复制的结果放在内存中同样能够提升性能,这样可以让部分数据少做两次I/O操作(前提是留下的内存足够Reduce任务执行)。所以在Reduce函数的内存需求很小的情况下,将mapred.inmem.merge.threshold设置为0,将mapreed.job.reduce.input.buffer.percent设置为1.0(或者一个更低的值)能够让I/O操作更少,提升shuffle 的性能。

查看29道真题和解析