简单谈谈SPARK 广播变量的用法
首先我们看看官网的文档对Broadcast的描述:
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
意思就是说广播变量让你可以在每个节点机器上缓存一个只读的变量,而减少了在每个任务中复制一份的繁琐。
- Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。
- Spark的动作通过一系列的步骤执行,这些步骤由分布式的洗牌操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。
import pandas as pd
table=pd.read_sql("select goods_id,goods_model,brand_id from goods",engine)
#比如我们要先读取一个比较大的mysql表以供map、filter这些变型中查询数据。我们可以用广播变量将这个公共的变量在每个节点中都保留一份
table_broadcast=sc.broadcast(table)
#调用的时候很简单,只需要在对象.value就可以
def parse(tup):
result=table_broadcast.value[table_broadcast.value['goods_id]==100]['brand_id']
return result
rdd.map(parse)
在创建了广播变量之后,在集群上的所有函数中应该使用它来替代使用v.这样v就不会不止一次地在节点之间传输了。另外,为了确保所有的节点获得相同的变量,对象v在被广播之后就不应该再修改。
如果不用broadcast的话,table会从主节点为每个任务发送一个这样的数据,就会代价很大,而且再调用table的时候,还需要向每个节点再发送一遍。