浅谈SPARK 的 aggregate 算子

理解aggregate的原理

刚开始我觉得SPARK的aggregate算子比较难理解的。首先我们看看官网的example

>>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
>>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
>>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
(10, 4)
>>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
(0, 0)    

看了后,我就一脸懵逼了,两个lambda函数中的x,y各自代表什么东西呢?

我们先看看官网的aggregate 用法说明:

Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral “zero value.”

The functions op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.

The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U

zero value:初始值,就是你要结果的类型。上面例子的zero value是(0,0)

seqOp:对RDD里每个partition里要实施的操作。

combOp:对所有的partition汇总的操作

我们再进一步解读官网的那个例子,仔细说说那些x,y分别代表什么
这个例子的意图是算出一个列表里所有的元素的和,还有列表的长度。

在pyspark里,我们执行以下代码

#我们创建一个有4个元素的list,并且分为2个partition
listRDD = sc.parallelize([1,2,3,4], 2)

#然后我们定义seqOp
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
#上面你可以更直观的看到x,y分别代表什么了。local_result也就是初始值(0,0);list_element就是每个元素1,2,3,4..

#然后是combOp:
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
#从变量的名字你应该可以直观的看出x,y分别代表什么了吧

#然后aggregate一下:
listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)

第一个分区的子列表是[1,2],我们实施seqOp的时候会对这个子列表产生一个本地(可以认为这是一个节点服务器上)的result,result也就是(sum,length)。第一个分区的local result 是(3,2)。

是这么算的:
(0+1,0+1)=> (1,1)
(1+2,1+1)=> (3,2)

第二个分区的子列表是[3,4]
同理也可以算出第二个分区的local result是(7,2)

然后是到combOp将两个本地结果汇总。计算过程是:
(3+7,2+2) =>(10,4)

这里有个特别注意的地方

如果你的zero value不是(0,0),而是(1,0)结果会有点出乎意料,这个例子中,结果并不是(12,4),而是(13,4)。如果你的partition为3的话结果是(14,4)。这应该是aggregate 会根据分区数在多运算几次

总结

aggregate 可以用来先计算每个partition的本地结果,然后再汇总。