Celery重复执行一个Task的解决方案
2021-01-19 19:16:01 +08 字数:1394 标签: Python问题 ¶
在系统负载较高,消息队列里堵了太多东西的情况下,Celery容易出现重复执行一个Task,甚至不止一次的情况。
这个问题在RabbitMQ情况下也会出现,但本文仅针对Redis,其它Broker仅供参考。
原因 ¶
见《Using Redis》:
If a task isn’t acknowledged within the Visibility Timeout the task will be redelivered to another worker and executed.
This causes problems with ETA/countdown/retry tasks where the time to execute exceeds the visibility timeout; in fact if that happens it will be executed again, and again in a loop.
So you have to increase the visibility timeout to match the time of the longest ETA you’re planning to use.
Note that Celery will redeliver messages at worker shutdown, so having a long visibility timeout will only delay the redelivery of ‘lost’ tasks in the event of a power failure or forcefully terminated workers.
Periodic tasks won’t be affected by the visibility timeout, as this is a concept separate from ETA/countdown.
这其实不是一个Bug,而是Celery的一个Feature。
在Message超过1小时未被消费的情况下,Celery会重新发一个一模一样的(task_id
相同)。
当然,在很多业务场景下,这就是一个Bug。
解决方案 ¶
幂等性 ¶
如果限定在Task层面解决这个问题,则需要保证Task执行内容的幂等性。 即,多次执行后,近第一次执行真正生效。 最简单的设计就是,在Task运行的一开始,就通过Redis检查一个全局标记。
@celery.task(bind=True)
def idempotent_task(self):
r = redis.Redis(host='localhost')
key = f'idempotent_{self.request.id}'
if r.get(key):
return
r.set(key, True, ex=7 * 24 * 3600)
# do the task
这个设计利用了Redis的全局性,但是仍然有两个问题:
- 线程不安全。
在并发、尤其是分布式并发的情况下,不能排除有多个相同Task同时执行到
if r.get(key):
这句。 - Redis过期时间
ex
必须设,但又没有万全的值。 必须设ex
,是因为这个值只是为了保证幂等性,本身就是临时的,没有持久化的需要。 而且在大量业务的冲击下,Redis有可能被撑爆。 而如果设ex
,无论它有多长,都不能保证两个相同Task在队列中低于这个时限。
这两种情况都能导致这个幂等性方案失效,但是在设置无误的情况下,它们发生的概率都很低。 如果有必要,可以使用Redlock或ZooKeeper等技术,给读写操作加上分布式锁。
时效性 ¶
提高visibility_timeout
,并且设置expires
。
如果任务执行时间较长,Message在队列中可能等待超过1小时,则应该提高visibility_timeout
。
同时,为了避免治标不治本,如果业务场景能接受,建议设置expires
。
expires
是设置Message过期的秒数,如果过期,Task会被标记为REVOKED
,这个状态等同于调用方主动取消。
只要expires < visibility_timeout
,就可以保证Message不重复(代价是,Message没了)。
# In celery.py
APP = Celery()
APP.conf.update(
broker_transport_options={'visibility_timeout': YOUR_VISIBILITY},
)
# In tasks.py
@APP.Task(expires=YOUR_EXPIRES)
def your_task():
pass
# Or in where your task is called
your_task(expires=YOUR_EXPIRES)
设置Redis的visibility_timeout
的方法只有一种,但设置expires
的方法有两种。
前者是在tasks.py
中,静态设置;后者是在调用位置,动态设置;后者覆盖前者。
注意:CELERY_EVENT_QUEUE_EXPIRES
和CELERY_EVENT_QUEUE_TTL
配置,只对RabbitMQ(amqp
)生效,对Redis不生效。
结论 ¶
前述两种方案,多少都有些隐患。 第一种光靠Redis实现读写操作,仍然有竞争问题,而如果加分布式锁又太复杂。 第二种改变了业务交互方式,增加了一个自动取消的概念。
其实还有一种方案,单纯提高visibility_timeout
到一个极高的值。
参考 ¶
- duplicate task in each work. · Issue #3270 · celery/celery
- Long running jobs redelivering after broker visibility timeout with celery and redis · Issue #5935 · celery/celery
- Using Redis — Celery 5.0.5 documentation#visibility-timeout
- Calling Tasks — Celery 5.0.5 documentation#expiration
- Configuration and defaults — Celery 5.0.5 documentation