Spark什么时候用 persist

前言

最近在用Spark做一些数据统计,有个任务要跑几个小时,所以需要优化一下。首先想到的是用 persist或者cache(persist的其中一种方式)

正文

场景一

首先看看在Stackoverflow的一个回答

Spark很多惰性算子,它并不会立即执行,persist就是惰性的。只有当被action trigger的时候,叫 lineage的RDD 链才会被执行。
如果是线性的lineage的话,persist是没用的。但如果RDD的lineage被分流出去的话,那么persist就有用了。

举个例子:

1
2
3
4
5
6
7
8
9
10
11
# 比如这种情况,有个两个action算子count;
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()
# 可以在公用的RDD上加个cache,让这个flatMap只计算一次
val textFile = sc.textFile("/user/emp.txt")
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
wordsRDD.cache()
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

另外一个案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def make_url(value):
url = 'http://domain.com/' + str(value)
if value == '1134021255504498689':
print(value) # 这里print出来可以作为一个标记,知道 这个udf执行了几次
return url
url_udf = udf(make_url, StringType())
df = spark.read.csv('file:///Users/xiaofeilong/Documents/twitter_tweet_keyword.csv')
df = df.withColumn('url', url_udf('_c0'))
# df.cache()
df1 = df.filter(df._c1.isin(['petcare']))
df2 = df.filter(df._c1.isin(['hound']))
df_union = df1.union(df2)
df_union = df_union.orderBy('url') # 这里加order by url的原因是,要让它执行url_udf。不然如果这个字段没用上的话,Spark并不会执行url_udf
print(df_union.count())

上述例子中,如果不加cache的话。log中可以看到打印了两次 1134021255504498689。而且会看到有两条这样的信息:

1
2
3
4
5
INFO FileScanRDD: Reading File path: file:///Users/xiaofeilong/Documents/twitter_tweet_keyword.csv, range: 0-2315578, partition values: [empty row]
.
.
.
FileScanRDD: Reading File path: file:///Users/xiaofeilong/Documents/twitter_tweet_keyword.csv, range: 0-2315578, partition values: [empty row]

测试结论!

  • 说明csv文件被load了两次!!!

  • 如果用了cache的话,只会出现一次!!!

场景二

Reference

https://stackoverflow.com/questions/28981359/why-do-we-need-to-call-cache-or-persist-on-a-rdd
https://blog.csdn.net/ainidong2005/article/details/53152605