我如何删除所有挂起的任务,而不知道每个任务的task_id ?


当前回答

从文档中可以看出:

$ celery -A proj purge

or

from proj.celery import app
app.control.purge()

(编辑:更新与当前的方法。)

其他回答

对于芹菜3.0+:

$ celery purge

清除特定队列:

$ celery -Q queue_name purge

1. 要正确清除等待任务的队列,您必须停止所有的worker (http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are-still-messages-left-in-the-queue):

$ sudo rabbitmqctl stop

或者(如果RabbitMQ/消息代理由Supervisor管理):

$ sudo supervisorctl stop all

2. ...然后清除特定队列中的任务:

$ cd <source_dir>
$ celery amqp queue.purge <queue name>

3. RabbitMQ开始:

$ sudo rabbitmqctl start

或者(如果RabbitMQ由Supervisor管理):

$ sudo supervisorctl start all

从文档中可以看出:

$ celery -A proj purge

or

from proj.celery import app
app.control.purge()

(编辑:更新与当前的方法。)

芹菜2。X和3.x:

例如,当使用带-Q参数的worker来定义队列时

celery worker -Q queue1,queue2,queue3

然后芹菜清除将不会工作,因为您不能传递队列参数给它。它只会删除默认队列。 解决方案是用——purge参数启动你的worker,就像这样:

celery worker -Q queue1,queue2,queue3 --purge

然而,这将运行worker。

另一个选择是使用芹菜的amqp子命令

celery amqp queue.delete queue1
celery amqp queue.delete queue2
celery amqp queue.delete queue3

如果你想删除所有挂起的任务,以及活动和保留的任务,以完全停止Celery,这对我来说是有效的:

from proj.celery import app
from celery.task.control import inspect, revoke

# remove pending tasks
app.control.purge()

# remove active tasks
i = inspect()
jobs = i.active()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)

# remove reserved tasks
jobs = i.reserved()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)