Celery让某个task一个一个地执行

背景

最近有个需求是这样子的,某个task要求一定要一个一个地执行,不能并发执行。
比较简单的办法是直接将 celery worker启动为 一个进程: “-c 1”。 但是,这种方法会导致其它的task也只能单进程了。

后来通过Google,查找了很多例子,最普遍的一个做法是参考官方文档的做法, 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from celery import task
from celery.five import monotonic
from celery.utils.log import get_task_logger
from contextlib import contextmanager
from django.core.cache import cache
from hashlib import md5
from djangofeeds.models import Feed
logger = get_task_logger(__name__)
LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
@contextmanager
def memcache_lock(lock_id, oid):
timeout_at = monotonic() + LOCK_EXPIRE - 3
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, LOCK_EXPIRE)
# 如果存在lock_id的话会返回False,不存在的话会返回True。这个也可以换成用Redis实现,比如用 setnx
try:
yield status
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)
@task(bind=True)
def import_feed(self, feed_url):
# The cache key consists of the task name and the MD5 digest
# of the feed URL.
feed_url_hexdigest = md5(feed_url).hexdigest()
lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
logger.debug('Importing feed: %s', feed_url)
with memcache_lock(lock_id, self.app.oid) as acquired:
if acquired:
return Feed.objects.import_feed(feed_url).url
logger.debug(
'Feed %s is already being imported by another worker', feed_url)

但是,上面的逻辑只是在有task正执行的时候忽略了新增task。比如说有个import_feed task 正在运行,还没有运行完,
再调用apply_async的时候就会不做任何操作。

所以得在上面代码的基础上改一改。

实践

这里我用了一个上厕所的例子。假设有一个公共厕所。如果有人在用着这个厕所的时候其他人就不能使用了,得在旁边排队等候。
一次只能进去一个人。废话少说直接上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@app.task(bind=True, base=ShitTask, max_retries=10)
def shit_task(self, toilet_id):
lock_id = '{0}-lock'.format(toilet_id)
shit_queue = '{0}-queue'.format(self.name)
# 这里我选用了Redis来处理队列
with memcache_lock(lock_id, self.app.oid) as acquired:
if acquired:
# 当进入到厕所的时候,先把门锁上(其他人就进不来了,然后再拉
print('Oh yes, Lock the door and it is my time to shit. ')
time.sleep(5)
return 'I finished shit'
else:
# 有人在用厕所,得在外面等着
print('Oops, somebody engaged the toilet, I have to queue up')
rdb.lpush(shit_queue, json.dumps(list(self.request.args)))
# 将其他人放到Redis里排队等候
raise Ignore()

上面代码是主要的task处理所及。另外,要先重写一下Task类的 after_return 方法,使得当没能执行的task(在门口排队的人)
在正在执行task(正在用厕所的人)成功执行完后,接着执行下一个task(下个人接着用厕所)。

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# coding=u8
from celery import Celery, Task
from celery.exceptions import Ignore
from celery import task
import redis
import pickle
import json
from celery.five import monotonic
from celery.utils.log import get_task_logger
from contextlib import contextmanager
import time
app = Celery('tasks', broker='redis://localhost:6379/10')
logger = get_task_logger(__name__)
LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
rdb = redis.Redis(db=11)
class ShitTask(Task):
abstract = True
def after_return(self, status, retval, task_id, args, kwargs, einfo):
print(status)
if retval:
# 这个retval的内容就是task return过来的内容
print('somebody finished shit, calling the next one to shit')
shit_queue = '{0}-queue'.format(self.name)
task_args = rdb.rpop(shit_queue)
if task_args:
task_args = json.loads(task_args)
self.delay(*task_args)
else:
pass
@contextmanager
def memcache_lock(lock_id, oid):
timeout_at = monotonic() + LOCK_EXPIRE - 3
# status = cache.add(lock_id, oid, LOCK_EXPIRE) # 如果lock_id 存在则返回False,如果不存在则返回True
status = rdb.setnx(lock_id, oid)
rdb.expire(lock_id, LOCK_EXPIRE)
try:
yield status
finally:
if monotonic() < timeout_at and status:
# 设置一个时间限制,一个人不能占用厕所太久,而且只有占用厕所的那人才能开锁把厕所门打开
print('release the lock and open the door of the toilet %s' % lock_id)
rdb.delete(lock_id)
@app.task(bind=True, base=ShitTask, max_retries=10)
def shit_task(self):
print('task name %s' % self.name)
lock_id = '{0}-lock'.format(self.name)
shit_queue = '{0}-queue'.format(self.name)
print(lock_id)
with memcache_lock(lock_id, self.app.oid) as acquired:
print('acquired', acquired)
if acquired:
print('Oh yes, Lock the door and it is my time to shit. ')
time.sleep(5)
return 'I finished shit'
else:
print('Oops, somebody engaged the toilet, I have to queue up')
#pending_task = pickle.dumps(self)
#rdb.lpush(shit_queue, pending_task)
# 不能用pickle 的去序列化task。在after_return load的时候会出现很诡异的现象。load出的task是第一个acquired的task
# 改为用json来做序列化
rdb.lpush(shit_queue, json.dumps(list(self.request.args)))
raise Ignore()

测试

可以新建一个test_celery项目来检验一下。新建一个目录名叫 test_celery
然后新建一个tasks.py文件,内容就是上面代码。
用下面命令来启动Celery worker,这里用了8个进程来处理。设置多点可以增加task的并行执行任务数。

1
celery -A tasks worker -l info -c 8

然后,可以启动ipython 进行调用task。

快速地敲几个task.delay

在celery日志中我们可以看到两个shit_tasks 是一个接一个来运行的。而不是并行执行。

代码放在了:
https://github.com/mikolaje/celery_toys/tree/master/test_lock

Reference

http://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html#ensuring-a-task-is-only-executed-one-at-a-time