Spark常见的Transformation算子(三)
Spark常见的Transformation算子(三)
初始化数据
println("======================= 原始数据 ===========================")
val data1: RDD[Int] = sc.parallelize(1 to 10, 3)
println(s"原始数据为:${data1.collect.toBuffer}")
val data2: RDD[Int] = sc.parallelize(5 to 15, 2)
println(s"原始数据为:${data2.collect.toBuffer}")
val data3: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 5, 4, 3, 2, 1))
println(s"原数数据为:${data3.collect.toBuffer}")
结果
distinct
用于去重,生成的RDD可能有重复的元素,使用distinct方法可以去掉重复的元素,此方***打乱元素的顺序,操作开销很大
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
// 第一种实现:需要参数numPartitions,这个类似于一个因子,如果数据集中的元素可以被numPartitions整除,则排在前面,之后排被numPartitions整除余1的,以此类推,体现局部无序,整体有序
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
// 第二种实现:调用了第一种实现,参数采用了默认的参数
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}
Scala版本
println("======================= distinct-1 ===========================")
// 如果没有指定numPartitions参数,则为创建数据时的分区数量
val value1: RDD[Int] = data3.distinct()
println(s"经过distinct处理后的数据为:${value1.collect.toBuffer}")
println("======================= distinct-2 ===========================")
// 局部无序,整体有序。以传入的参数numPartitions作为因子,所有的元素除以numPartitions,模为0的排在第一位,之后排模为1的,以此类推
val value2: RDD[Int] = data3.distinct(2)
println(s"经过distinct处理后的数据为:${value2.collect.toBuffer}")
// 返回结果
// (4, 2, 1, 3, 5)
// 4, 2 ==> 模为0
// 1, 3, 5 ==> 模为1
运行结果
union
两个RDD进行合并,不去重
/**
* Return the union of this RDD and another one. Any identical elements will appear multiple
* times (use `.distinct()` to eliminate them).
*/
// 返回此RDD和另一个RDD的并集,不去重,顺序连接
def union(other: RDD[T]): RDD[T] = withScope {
sc.union(this, other)
}
Scala版本
println("======================= union ===========================")
val value: RDD[Int] = data1.union(data2)
println(s"经过union处理后的数据为:${value3.collect.toBuffer}")
运行结果
intersection
对于两个RDD求交集,并去重,无序返回,操作开销很大
/**
* Return the intersection of this RDD and another one. The output will not contain any duplicate
* elements, even if the input RDDs did.
*
* @note This method performs a shuffle internally.
*/
// 第一种实现:一个参数,返回此RDD和另一个RDD的交集,不包含重复元素
// 最后返回也是局部无序,整体有序。分区大小采用两个RDD中分区数量较大的
def intersection(other: RDD[T]): RDD[T] = withScope {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
}
/**
* Return the intersection of this RDD and another one. The output will not contain any duplicate
* elements, even if the input RDDs did.
*
* @note This method performs a shuffle internally.
*
* @param partitioner Partitioner to use for the resulting RDD
*/
// 第二种实现:两个参数,另一个RDD和一个分区器
def intersection(
other: RDD[T],
partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)), partitioner)
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
}
/**
* Return the intersection of this RDD and another one. The output will not contain any duplicate
* elements, even if the input RDDs did. Performs a hash partition across the cluster
*
* @note This method performs a shuffle internally.
*
* @param numPartitions How many partitions to use in the resulting RDD
*/
// 第三种实现:两个参数,第二个参数传入numPartitions,内部调用调用第二种实现,使用默认分区器HashPartitioner(numPartitions),并且返回结果局部无序,整体有序和distinct规则一样
def intersection(other: RDD[T], numPartitions: Int): RDD[T] = withScope {
intersection(other, new HashPartitioner(numPartitions))
}
Scala版本
println("======================= intersection-1 ===========================")
val value1: RDD[Int] = data1.intersection(data2)
println(s"分区数量为:${value1.getNumPartitions}")
println(s"经过intersection处理后的数据为:${value1.collect.toBuffer}")
println("======================= intersection-2 ===========================")
val value2: RDD[Int] = data1.intersection(data2, new HashPartitioner(4))
println(s"分区数量为:${value2.getNumPartitions}")
println(s"经过intersection处理后的数据为:${value2.collect.toBuffer}")
println("======================= intersection-3 ===========================")
val value3: RDD[Int] = data1.intersection(data2, 5)
println(s"分区数量为:${value3.getNumPartitions}")
println(s"经过intersection处理后的数据为:${value3.collect.toBuffer}")
运行结果
subtract
RDD1.substract(RDD2),返回在RDD1中出现但是不在RDD2中出现的元素
/**
* Return an RDD with the elements from `this` that are not in `other`.
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
// 第一种实现:一个参数,调用了第三种实现
// 最后返回也是局部无序,整体有序。分区大小采用两个RDD中分区数量较大的
def subtract(other: RDD[T]): RDD[T] = withScope {
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
}
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
// 第二种实现,调用了第三种实现,使用默认分区器HashPartitioner(numPartitions),并且返回结果局部无序,整体有序和distinct规则一样
def subtract(other: RDD[T], numPartitions: Int): RDD[T] = withScope {
subtract(other, new HashPartitioner(numPartitions))
}
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
// 第三种实现,两个参数,第二个参数为分区器
def subtract(
other: RDD[T],
p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
if (partitioner == Some(p)) {
// Our partitioner knows how to handle T (which, since we have a partitioner, is
// really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
val p2 = new Partitioner() {
override def numPartitions: Int = p.numPartitions
override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
}
// Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
// anyway, and when calling .keys, will not have a partitioner set, even though
// the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
// partitioned by the right/real keys (e.g. p).
this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
} else {
this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
}
}
Scala版本
println("======================= subtract-1 ===========================")
val value1: RDD[Int] = data1.subtract(data2)
println(s"分区数量为:${value1.getNumPartitions}")
println(s"经过subtract处理后的数据为:${value1.collect.toBuffer}")
println("======================= subtract-2 ===========================")
val value2: RDD[Int] = data1.subtract(data2, new HashPartitioner(4))
println(s"分区数量为:${value2.getNumPartitions}")
println(s"经过subtract处理后的数据为:${value2.collect.toBuffer}")
println("======================= subtract-3 ===========================")
val value3: RDD[Int] = data1.subtract(data2, 5)
println(s"分区数量为:${value3.getNumPartitions}")
println(s"经过subtract处理后的数据为:${value3.collect.toBuffer}")
运行结果
cartesian
返回两个RDD的笛卡尔积,开销非常大
/**
* Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
* elements (a, b) where a is in `this` and b is in `other`.
*/
// 分区数量为两个RDD之积
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
new CartesianRDD(sc, this, other)
}
Scala版本
println("======================= cartesian ===========================")
val value1: RDD[(Int, Int)] = data1.cartesian(data2)
println(s"分区数量为:${value1.getNumPartitions}")
println(s"经过cartesian处理后的数据为:${value1.collect.toBuffer}")
运行结果
sample
采样操作,用于从样本中取出部分数据
/**
* Return a sampled subset of this RDD.
*
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD's size
* without replacement: probability that each element is chosen; fraction must be [0, 1]
* with replacement: expected number of times each element is chosen; fraction must be greater
* than or equal to 0
* @param seed seed for the random number generator
*
* @note This is NOT guaranteed to provide exactly the fraction of the count
* of the given [[RDD]].
*/
// 返回此RDD的采样子集
// withReplacement 是否放回
// fraction,如果withReplacement为false,则fraction表示概率,介于(0,1]
// fraction,如果withReplacement为true,则fraction表示期望的次数,大于等于0
// seed 用于指定的随机数生成器的种子,一般情况下,seed不建议指定
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T] = {
require(fraction >= 0,
s"Fraction must be nonnegative, but got ${fraction}")
withScope {
require(fraction >= 0.0, "Negative fraction value: " + fraction)
if (withReplacement) {
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
} else {
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
}
}
}
Scala版本
println("======================= sample-1 ===========================")
val value1: RDD[Int] = data1.sample(withReplacement = false, 0.5)
println(s"分区数量为:${value1.getNumPartitions}")
println(s"经过sample抽样的结果为:${value1.collect.toBuffer}")
println("======================= sample-2 ===========================")
val data4: RDD[Int] = data1.repartition(2)
val value2: RDD[Int] = data4.sample(withReplacement = false, 0.5)
println(s"分区数量为:${value2.getNumPartitions}")
println(s"经过sample抽样的结果为:${value2.collect.toBuffer}")
运行结果