前言
最近上头说我写的ETL工具从MySQL导出为CSV的速度太慢了,需要性能优化。的确,有部分数据因为在MySQL里面做了
分表分库,而我目前的导出实现是一个一个对小表进行导出。其实,这一步完全是可以并发多个表同时导出的。
理论上如果网络IO没有瓶颈的话,多个表同时从MySQL里dump可以大大提升效率。
正题
查阅文档
为了实现并发地使用sqlalchemy我花了不少时间在网上找资料,也在StackOverflow寻求帮助。
其实刚开始我是想用threading结合SQLAlchemy来实现多线程导出的。去SQLAlchemy官网一看,没有找到相关的实现文档,
却找到了multiprocessing与SQLAlchemy的实践:
|
|
实践
于是,我就打算用multiprocessing试试了。multiprocessing有个比较坑爹的地方就是它会用pickle来序列化一些数据,
因为要把数据复制到新spawn出来的进程。
关于pickle
首先我们要了解下pickle,虽然pickle挺坑的。哪些内容可以pickle呢,官网上说:
比如, 我们举个简单的例子:
这段代码我的目的是想用多个进程多个connector连接到MySQL,然后各自进程同时去查询,这样便可以实现并行处理,
提升效率。然而,实际上这段代码不能正常地执行,而且没有任何报错提示。这是为何呢?
有些时候可能会看到这样的报错:
https://stackoverflow.com/questions/58022926/cant-pickle-the-sqlalchemy-engine-in-the-class
下面我把上面的代码修改下:
我们看到,这段代码有一些变化:
增加了getstate, getstate这两个魔术方法。为什么要加呢?
123 首先,我们 self.pool.apply_async(self.run_in_process) 可以看出apply_async调用的是实例的方法,所以Python需要pickle整个Client对象,包括它的所有实例变量。在第一个代码片段中我们可以看到它的实例变量有pool, connection, engine等等。然而这些对象都是不可以被pickle的,所以代码执行的时候会有问题。所以就有了__getstate__, __getstate__ 这两个东西。
|
|
multiprocessing的一些细节
传多个参数在target函数
有时候当我们想在apply_async 的 target函数上传指定参数的时候, 可以用kwds传进去,比如:
map和apply的区别
map执行的顺序是和参数的顺序一致的,apply_async的顺序是随机的
map一般用来切分参数执行在同一个方法上。而apply_async可以调用不同的方法。
此外,
map相当于 map_async().get()
apply相当于 apply_async().get()
子进程报错没有提示
有个很坑的地方,有些时候逻辑明明没有执行,但又没有任何报错!
比如这个,如果我不get() 一下 apply_async后的返回的话,看不到任何报错信息,解决办法就是用get()后才
能得知报错的信息。
加装饰器后报错
比如我们在run_in_process
方法上了个装饰器,然后就报错了。
报错说:
解决办法:
比较麻烦,不能用@了。可以以这种办法代替装饰器:
关于处理ctrl-c 的报错:
有时候我们用multiprocessing处理一些任务,当我们想终止任务时候,用Ctrl+C 然后会看到一堆的报错,有时候还得连续按很多CTRL+C完全终止掉。
下面是最佳解决方案:
解决方案代码:
Reference
https://docs.python.org/3/library/pickle.html
https://stackoverflow.com/questions/25382455/python-notimplementederror-pool-objects-cannot-be-passed-between-processes/25385582#25385582