Celery重复执行一个Task的解决方案

问题

在系统负载较高,消息队列里堵了太多东西的情况下,Celery容易出现重复执行一个Task,甚至不止一次的情况。

这个问题在RabbitMQ情况下也会出现,但本文仅针对Redis,其它Broker仅供参考。

原因

见《Using Redis》:

If a task isn’t acknowledged within the Visibility Timeout the task will be redelivered to another worker and executed.

This causes problems with ETA/countdown/retry tasks where the time to execute exceeds the visibility timeout; in fact if that happens it will be executed again, and again in a loop.

So you have to increase the visibility timeout to match the time of the longest ETA you’re planning to use.

Note that Celery will redeliver messages at worker shutdown, so having a long visibility timeout will only delay the redelivery of ‘lost’ tasks in the event of a power failure or forcefully terminated workers.

Periodic tasks won’t be affected by the visibility timeout, as this is a concept separate from ETA/countdown.

这其实不是一个Bug,而是Celery的一个Feature。 在Message超过1小时未被消费的情况下,Celery会重新发一个一模一样的(task_id相同)。 当然,在很多业务场景下,这就是一个Bug。

解决方案

幂等性

如果限定在Task层面解决这个问题,则需要保证Task执行内容的幂等性。 即,多次执行后,近第一次执行真正生效。 最简单的设计就是,在Task运行的一开始,就通过Redis检查一个全局标记。

@celery.task(bind=True)
def idempotent_task(self):
    r = redis.Redis(host='localhost')
    key = f'idempotent_{self.request.id}'
    if r.get(key):
        return

    r.set(key, True, ex=7 * 24 * 3600)
    # do the task

这个设计利用了Redis的全局性,但是仍然有两个问题:

  1. 线程不安全。 在并发、尤其是分布式并发的情况下,不能排除有多个相同Task同时执行到if r.get(key):这句。
  2. Redis过期时间ex必须设,但又没有万全的值。 必须设ex,是因为这个值只是为了保证幂等性,本身就是临时的,没有持久化的需要。 而且在大量业务的冲击下,Redis有可能被撑爆。 而如果设ex,无论它有多长,都不能保证两个相同Task在队列中低于这个时限。

这两种情况都能导致这个幂等性方案失效,但是在设置无误的情况下,它们发生的概率都很低。 如果有必要,可以使用Redlock或ZooKeeper等技术,给读写操作加上分布式锁。

时效性

提高visibility_timeout,并且设置expires

如果任务执行时间较长,Message在队列中可能等待超过1小时,则应该提高visibility_timeout。 同时,为了避免治标不治本,如果业务场景能接受,建议设置expiresexpires是设置Message过期的秒数,如果过期,Task会被标记为REVOKED,这个状态等同于调用方主动取消。 只要expires < visibility_timeout,就可以保证Message不重复(代价是,Message没了)。

# In celery.py
APP = Celery()
APP.conf.update(
    broker_transport_options={'visibility_timeout': YOUR_VISIBILITY},
)

# In tasks.py
@APP.Task(expires=YOUR_EXPIRES)
def your_task():
    pass

# Or in where your task is called
your_task(expires=YOUR_EXPIRES)

设置Redis的visibility_timeout的方法只有一种,但设置expires的方法有两种。 前者是在tasks.py中,静态设置;后者是在调用位置,动态设置;后者覆盖前者。

注意CELERY_EVENT_QUEUE_EXPIRESCELERY_EVENT_QUEUE_TTL配置,只对RabbitMQ(amqp)生效,对Redis不生效。

结论

前述两种方案,多少都有些隐患。 第一种光靠Redis实现读写操作,仍然有竞争问题,而如果加分布式锁又太复杂。 第二种改变了业务交互方式,增加了一个自动取消的概念。

其实还有一种方案,单纯提高visibility_timeout到一个极高的值。

参考


相关笔记