浅谈SPARK 的 combineByKey 算子

理解comebineByKey的原理

SPARK的combineByKey算子和aggregate类似。首先我们看看官网的文档:

  • combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=)

  • Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C. Note that V and C can be different – for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).

  • Users provide three functions:
    createCombiner, which turns a V into a C (e.g., creates a one-element list)
    mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
    mergeCombiners, to combine two C’s into a single one.

都是将类型为RDD[(K,V)]的数据处理为RDD[(K,C)]。这里的V和C可以是相同类型,也可以是不同类型。

combineByKey函数主要接受了三个函数作为参数,分别为createCombiner、mergeValue、mergeCombiners。

这三个函数足以说明它究竟做了什么。理解了这三个函数,就可以很好地理解combineByKey。

1.createCombiner ,让V变为C
V –> C
2.mergeValue,将V合并为C
C, V –> C
3.mergeCombiners,合并所有C为一个
C, C –> C

我们在命令行里输入IPYTHON=1 pyspark --master local[]启动Spark 的Python shell。这里加了–master local[] 是为了可以以local 模式运行,这样可以便于print 打印调试。

下面是一个求平均值的例子

data = [
        ('A', 2.), ('A', 4.), ('A', 9.), 
        ('B', 10.), ('B', 20.), 
        ('Z', 3.), ('Z', 5.), ('Z', 8.), ('Z', 12.) 
       ]
rdd = sc.parallelize( data )

def mergeValue(x,value):
    print 'what is the x',x
    return x[0]+value,x[1]+1

sumCount = rdd.combineByKey(lambda value: (value, 1),
                            mergeValue,
                            lambda x, y: (x[0] + y[0], x[1] + y[1])
    )

sumCount.collect()
#[('A', (15.0, 3)), ('B', (30.0, 2)), ('Z', (28.0, 4))]

averageByKey = sumCount.map(lambda (key, (totalSum, count)): (key, totalSum / count))

averageByKey.collectAsMap()
#{A: 5.0, B: 15.0,Z: 7.0}

我们按步骤解读一下:
1.Create a Combiner
lambda value:(value,1)
这个步骤定义了C的数据结构,也就是(sum,count)。

如果是一个新的元素,此时使用createCombiner()来创建那个键对应的累加器的初始值。(注意:这个过程会在每个分区第一次出现各个键时发生,而不是在整个RDD中第一次出现一个键时发生。)

2.Merge a Value
lambda x, value: (x[0] + value, x[1] + 1)
这个方法告诉combineByKey当给到一个新value的时候要做什么。方法的参数是一个combiner和一个新的value。combiner的数据结构在上一个方法定义了(sum,count)。

3.Merge two Combiners
lambda x, y: (x[0] + y[0], x[1] + y[1])
这个方法告诉combineByKey怎么合并两个combiners

内部流程如下:

data = [
        ("A", 2.), ("A", 4.), ("A", 9.), 
        ("B", 10.), ("B", 20.), 
        ("Z", 3.), ("Z", 5.), ("Z", 8.), ("Z", 12.) 
       ]

Partition 1: ("A", 2.), ("A", 4.), ("A", 9.), ("B", 10.)
Partition 2: ("B", 20.), ("Z", 3.), ("Z", 5.), ("Z", 8.), ("Z", 12.) 


Partition 1 
("A", 2.), ("A", 4.), ("A", 9.), ("B", 10.)

A=2. --> createCombiner(2.) ==> accumulator[A] = (2., 1)
A=4. --> mergeValue(accumulator[A], 4.) ==> accumulator[A] = (2. + 4., 1 + 1) = (6., 2)
A=9. --> mergeValue(accumulator[A], 9.) ==> accumulator[A] = (6. + 9., 2 + 1) = (15., 3)
B=10. --> createCombiner(10.) ==> accumulator[B] = (10., 1)

Partition 2
("B", 20.), ("Z", 3.), ("Z", 5.), ("Z", 8.), ("Z", 12.) 

B=20. --> createCombiner(20.) ==> accumulator[B] = (20., 1)
Z=3. --> createCombiner(3.) ==> accumulator[Z] = (3., 1)
Z=5. --> mergeValue(accumulator[Z], 5.) ==> accumulator[Z] = (3. + 5., 1 + 1) = (8., 2)
Z=8. --> mergeValue(accumulator[Z], 8.) ==> accumulator[Z] = (8. + 8., 2 + 1) = (16., 3)
Z=12. --> mergeValue(accumulator[Z], 12.) ==> accumulator[Z] = (16. + 12., 3 + 1) = (28., 4)

Merge partitions together
A ==> (15., 3)
B ==> mergeCombiner((10., 1), (20., 1)) ==> (10. + 20., 1 + 1) = (30., 2)
Z ==> (28., 4)

最终结果为:
Array( [A, (15., 3)], [B, (30., 2)], [Z, (28., 4)])