理解comebineByKey的原理
SPARK的combineByKey算子和aggregate类似。首先我们看看官网的文档:
combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=
) Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C. Note that V and C can be different – for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
Users provide three functions:
createCombiner, which turns a V into a C (e.g., creates a one-element list)
mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
mergeCombiners, to combine two C’s into a single one.
都是将类型为RDD[(K,V)]的数据处理为RDD[(K,C)]。这里的V和C可以是相同类型,也可以是不同类型。
combineByKey函数主要接受了三个函数作为参数,分别为createCombiner、mergeValue、mergeCombiners。
这三个函数足以说明它究竟做了什么。理解了这三个函数,就可以很好地理解combineByKey。
1.createCombiner ,让V变为C
V –> C
2.mergeValue,将V合并为C
C, V –> C
3.mergeCombiners,合并所有C为一个
C, C –> C
我们在命令行里输入IPYTHON=1 pyspark --master local[]启动Spark 的Python shell。这里加了–master local[] 是为了可以以local 模式运行,这样可以便于print 打印调试。
下面是一个求平均值的例子
data = [
('A', 2.), ('A', 4.), ('A', 9.),
('B', 10.), ('B', 20.),
('Z', 3.), ('Z', 5.), ('Z', 8.), ('Z', 12.)
]
rdd = sc.parallelize( data )
def mergeValue(x,value):
print 'what is the x',x
return x[0]+value,x[1]+1
sumCount = rdd.combineByKey(lambda value: (value, 1),
mergeValue,
lambda x, y: (x[0] + y[0], x[1] + y[1])
)
sumCount.collect()
#[('A', (15.0, 3)), ('B', (30.0, 2)), ('Z', (28.0, 4))]
averageByKey = sumCount.map(lambda (key, (totalSum, count)): (key, totalSum / count))
averageByKey.collectAsMap()
#{A: 5.0, B: 15.0,Z: 7.0}
我们按步骤解读一下:
1.Create a Combiner
lambda value:(value,1)
这个步骤定义了C的数据结构,也就是(sum,count)。
如果是一个新的元素,此时使用createCombiner()来创建那个键对应的累加器的初始值。(注意:这个过程会在每个分区第一次出现各个键时发生,而不是在整个RDD中第一次出现一个键时发生。)
2.Merge a Value
lambda x, value: (x[0] + value, x[1] + 1)
这个方法告诉combineByKey当给到一个新value的时候要做什么。方法的参数是一个combiner和一个新的value。combiner的数据结构在上一个方法定义了(sum,count)。
3.Merge two Combiners
lambda x, y: (x[0] + y[0], x[1] + y[1])
这个方法告诉combineByKey怎么合并两个combiners
内部流程如下:
data = [
("A", 2.), ("A", 4.), ("A", 9.),
("B", 10.), ("B", 20.),
("Z", 3.), ("Z", 5.), ("Z", 8.), ("Z", 12.)
]
Partition 1: ("A", 2.), ("A", 4.), ("A", 9.), ("B", 10.)
Partition 2: ("B", 20.), ("Z", 3.), ("Z", 5.), ("Z", 8.), ("Z", 12.)
Partition 1
("A", 2.), ("A", 4.), ("A", 9.), ("B", 10.)
A=2. --> createCombiner(2.) ==> accumulator[A] = (2., 1)
A=4. --> mergeValue(accumulator[A], 4.) ==> accumulator[A] = (2. + 4., 1 + 1) = (6., 2)
A=9. --> mergeValue(accumulator[A], 9.) ==> accumulator[A] = (6. + 9., 2 + 1) = (15., 3)
B=10. --> createCombiner(10.) ==> accumulator[B] = (10., 1)
Partition 2
("B", 20.), ("Z", 3.), ("Z", 5.), ("Z", 8.), ("Z", 12.)
B=20. --> createCombiner(20.) ==> accumulator[B] = (20., 1)
Z=3. --> createCombiner(3.) ==> accumulator[Z] = (3., 1)
Z=5. --> mergeValue(accumulator[Z], 5.) ==> accumulator[Z] = (3. + 5., 1 + 1) = (8., 2)
Z=8. --> mergeValue(accumulator[Z], 8.) ==> accumulator[Z] = (8. + 8., 2 + 1) = (16., 3)
Z=12. --> mergeValue(accumulator[Z], 12.) ==> accumulator[Z] = (16. + 12., 3 + 1) = (28., 4)
Merge partitions together
A ==> (15., 3)
B ==> mergeCombiner((10., 1), (20., 1)) ==> (10. + 20., 1 + 1) = (30., 2)
Z ==> (28., 4)
最终结果为:
Array( [A, (15., 3)], [B, (30., 2)], [Z, (28., 4)])