首页 > 试题广场 >

用mapreduce实现10亿级以上数据的kmeans

[问答题]

用mapreduce实现10亿级以上数据的kmeans

1 介绍

随着信息技术的发展,许多应用处理的数据规模会达到千兆级别,而这会自然而然地对计算提出更高的要求。高效的并行聚类算法和运行技术是满足科学数据分析的可扩展性以及性能要求的关键。到目前为止,一些研究者已经提出了一些并行聚类算法。所有的这些并行聚类算法都有下述的缺点:a)他们假设所有对象都能在主存中同时存放;b)这些并行系统提供了有限的编程模型,并使用这种限制去自动并行计算。以上两者在面对拥有成千上万对象的大规模数据集时会望而却步。因此,以并行聚类算法为方向的数据集需要得到发展。 
MapReduce是一种编程模型,用于处理和生成适用于各种现实世界任务的大型数据集的关联实施。用户指定map和reduce函数的计算过程,底层的运行系统便可以自动地在大规模集群上并行计算,可以自动地处理机器的错误,可以协调好中间机器的交互以至于高效地利用网络和磁盘资源。Google和Hadoop都提供了MapReduce运行时容错和动态灵活性的支持。 
在这篇文章中,我们把k-means算法改编到MapReduce框架下,该框架是在Hadoop下执行的,目的是为了使聚类方法变得可行。通过采用合适的

2 基于MapReduce的并行K-Means算法

在这节中,我们展示了基于MapReduce的并行K-Means(PKMeans)的主要设计。首先,我们简短的回顾K-Means算法,并分析了可并行化和可串行化的部分。然后,我们在细节上解释形式化map和reduce操作所必要的计算。

2.1 K-Means 算法

K-Means算法是最著名、最广泛使用的聚类算法。它用输入参数k把n个对象集合分成了k份,目的是让簇内的相似度高而簇间的相似度低。簇的相似度可以根据簇内对象到簇中心距离的平均值来衡量。 
算法运行如下:首先随机从所有对象中选择k个对象,这k个对象代表初始聚类的中心。每个剩下的对象会基于对象到与簇中心的距离被分配到最相似的簇。然后计算得到每个簇新的均值。这个过程需要不断迭代直到标准函数收敛。 
在k-means算法中,最密集的计算发生在距离的计算上。在每个迭代的过程中,需要n*k个距离的计算,其中n为对象总数,k为簇的总数。很明显,一个对象与中心距离计算跟其它对象与中心距离计算是无关的。因此,不同对象到中心距离的计算可以并行执行。每个迭代中,用来进行下一轮迭代计算的新中心需要更新,所以迭代过程必须串行执行。

2.2 基于MapReduce的PKMeans

如上文分析,PKMeans算法需要一种MapReduce job。map函数分配每个样本到最近的中心,reduce函数负责聚类中心的更新。为了减少网络负责,需要combiner函数来处理同一个map同一个key的中间结果的部分合并。 

Map函数. 输入数据集存储在分布式文件系统HDFS中,作为<key,value>的序列文件,每个<key,value>代表数据集的一条记录。Key为该记录对应于数据文件起始位置的偏移量,value为该条记录的内容。数据集被分割并传给所有的map,因此距离的计算就会并行执行。对于每个map任务,PKMeans构造了一个全局变量centers,centers是一个包含所有聚类中心信息的数组。如果给定该信息,map函数就可以计算判断某个样本到哪个中心最近。然后,中间结果由两部分组成:最近中心的索引和样本信息。Map函数的伪代码见算法1。

算法1.  map(key,value)


输入:全局变量centers,偏移量key,样本value 
输出:<key’,value>对,其中key’是最近中心的索引,value’是样本信息的字符串

  1. value构造样本的instance
  2. minDis=Double.MAX_VALUE
  3. Index=-1;
  4. For i=0 to centers.length do 
    dis=ComputeDist(instance,centers[i]); 
    If dis<minDis
    minDis=dis
    index=i
    }
  5. End For
  6. index作为key’
  7. 把不同维度的values构造成value’
  8. 输出<key’,value’>对;
  9. End
注意这里的Step 2和Step 3初始化了辅助变量minDisindex;Step 4通过计算找出了与样本最近的中心点,函数ComputeDist(instance,centers[i])返回样本和中心点centers[i]的距离;Step 8输出了用来进行下一个过程(combiner)的中间数据。 
Combine函数. 每个map任务完成之后,我们用combiner去合并同一个map任务的中间结果。因为中间结果是存储在结点的本地磁盘上,所以这个过程不会耗费网络传输的代价。在combine函数中,我们把属于相同簇的values求和。为了计算每个簇的对象的平均值,我们需要记录每个map的每个簇中样本的总数。Combine函数的伪代码见算法2.

算法2. combine(key,V)


输入:key为簇的索引,V为属于该簇的样本列表 
输出:<key’,value’>对,key’为簇的索引,value’是由属于同一类的所有样本总和以及样本数所组成的字符串。

  1. 初始化一个数组,用来记录同一类的所有样本的每个维度的总和,样本是V中的元素;
  2. 初始化一个计数器num为0来记录属于同一类的样本总数;
  3. While(V.hasNext()){ 
    V.next()构造样本实例instance; 
    instance的不同维度值相加到数组 
    num++;
  4. }
  5. key作为key’
  6. 构造value’:不同维度的求和结果+num
  7. 输出<key’,value’>对;
  8. End

Reduce函数. Reduce函数的输入数据由每个结点的combine函数获得。如combine函数所描述,输入数据包括部分样本(同一类)的求和以及对应样本数。在reduce函数中,我们可以把同一类的所有样本求和并且计算出对应的样本数。因此,我们可以得到用于下一轮迭代的新中心。Reduce函数的伪代码见算法3。


算法3. Reduce(key,V)


输入:key为簇的索引,V为来自不同结点的部分总和的样本列表 
输出:<key’,value’>对,key’为簇的索引,value’是代表新的聚类中心的字符串

  1. 初始化一个数组,用来记录同一类的所有样本的每个维度的总和,样本是V中的元素;
  2. 初始化一个计数器NUM为0来记录属于同一类的样本总数;
  3. While(V.hasNext()){ 
    V.next()构造样本实例instance; 
    instance的不同维度值相加到数组 
    NUM+=num;
  4. }
  5. 数组的每个元素除以NUM来获得新的中心坐标;
  6. key作为key’
  7. 构造value’为所有中心坐标的字符串;
  8. 输出<key’,value’>对;
  9. End

发表于 2019-06-26 11:07:28 回复(0)

利用map()函数将每一个样本划归到它的聚类中心的簇中

利用combiner()函数将同一簇样本聚集到一起

利用reduce()函数将同一簇样本的聚类中计算出来,以供下一次迭代使用

编辑于 2019-04-22 10:27:58 回复(0)