正确使用Celery中的revoke来取消任务

取消一个在排队的Celery的Task,操作是简单的,用revoke就行。 但是,如果要很好地取消一个正在运行的Task,则没那么容易。

取消操作

Celery(本文基于5.1.2版本)有两种取消操作。

一是通过Control.revoke

from celery.app.control.Control import revoke

ctrl = Control()
ctrl.revoke(task_id)
# Or terminate
ctrl.revoke(task_id, terminate=True)
ctrl.terminate(task_id)

当然,使用Control最简单的办法是直接利用Celery的app对象:

app.control.revoke(task_id)

二是通过AsyncResult.revoke

from celery.result import AsyncResult

result = AsyncResult(task_id)
result.revoke()
# Or terminate
result.revoke(terminate=True)

二者基本是等价的,只是后者常用于复杂的操作场景,前者只是单纯地取消。 更多关于revoke的内容,可以参考Revoking tasks

以下基于AsyncResult.revoke进行介绍,省略写task_id

取消任务

并非所有任务状态,都可以取消。对于已经结束的任务,取消是无效操作。

stateDiagram
[*] --> PENDING
PENDING --> STARTED
STARTED --> SUCCESS
STARTED --> FAILURE
STARTED --> RETRY
RETRY --> STARTED
SUCCESS --> [*]
FAILURE --> [*]
STARTED --> REVOKED
REVOKED --> [*]

可以被取消的只有:

取消PENDING

这代表Task还在排队中,没有真正运行。只需要使用普通的取消即可。

result.revoke()

它的原理是,发广播通知所有worker,更新revoked列表。 当worker执行到revoked列表中的任务时,会直接跳过。

如果一个任务已经在运行,则以上操作无效。 因此,它只适用于取消未运行任务的场景,业务能容忍、或要求已运行任务自然地执行完毕。

取消STARTED

如果一个任务已经在运行,通常处于STARTED状态。 取消这个任务,意味着中断这个运行过程。

result.revoke(terminate=True)
# Or with signal
import signal
result.revoke(terminate=True, signal=signal.SIGTERM)

要注意的是,terminate=True意味着使用给定的signal,结束任务进程。 这里的signal,可以是signal模块的任意值。 默认的signal=None,即等价于signal=signal.SIGTERM

在结束任务进程时,任务自己是不知道的。 因此,这只适用于粗暴终止的场景。

然而,SIGTERM有时候并没有粗暴到底,会等一会儿,往往不是预期行为。 因此,如果要以终止的方式取消正在运行的任务,也可以使用SIGQUIT

利用SIGUSR1

尽管所有的signal都是可用的,但是有四个是Celery预留的特殊signal,详见Process Signals

Signal 原文 备注
SIGTERM Warm shutdown, wait for tasks to complete. 热停止,默认行为。
SIGQUIT Cold shutdown, terminate ASAP 冷停止。
SIGUSR1 Dump traceback for all active threads. 发一个SoftTimeLimitExceeded异常。
SIGUSR2 Remote debug, see celery.contrib.rdb. 调试专用。

其中,SIGTERM就是默认的Signal,很粗暴;SIGQUIT则比它更粗暴。 至于不在上面的SIGKILLSIGINT等,也不见得好多少。 如果想要让被终止的任务知道自己被终止,目前只能使用SIGUSR1

它会在任务正在执行的位置,抛出一个SoftTimeLimitExceeded。 因此,只需要在最外层用try ... except捕获这个异常,就可以进行一些后处理。 但是,由于它的现象和任务超时是类似的,因此需要自行区分。

结论

对于只取消没有在运行的任务,只需要revoke即可; 对于已经在运行的任务,则需要terminate=True。 默认的signal=signal.SIGTERM,有时结束得不够快,有时过于粗暴,缺少一些后处理。 因此,对不关心其运行情况的业务,也可考虑使用SIGQUIT,甚至SIGKILL。 对于需要做取消前做一些处理的,可以用SIGUSR1


相关笔记