如何检索队列中尚未处理的任务列表?


当前回答

如果你正在使用芹菜+Django,最简单的方法是在虚拟环境中直接从终端使用命令检查任务,或者使用芹菜的完整路径:

道格:http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke inspecting-workers

$ celery inspect reserved
$ celery inspect active
$ celery inspect registered
$ celery inspect scheduled

另外,如果你正在使用芹菜+RabbitMQ,你可以使用下面的命令检查队列列表:

更多信息:https://linux.die.net/man/1/rabbitmqctl

$ sudo rabbitmqctl list_queues

其他回答

我认为获取正在等待的任务的唯一方法是保留一个已启动任务的列表,并让任务在启动时将自己从列表中删除。

通过rabbitmqctl和list_queues,你可以了解有多少任务正在等待,而不是任务本身:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

如果你想要的包括正在处理的任务,但还没有完成,你可以保留一个任务列表,并检查它们的状态:

from tasks import add
result = add.delay(4, 4)

result.ready() # True if finished

或者让芹菜使用CELERY_RESULT_BACKEND存储结果,并检查哪些任务不在其中。

要获得队列上的任务数,你可以使用flower库,下面是一个简化的例子:

from flower.utils.broker import Broker
from django.conf import settings

def get_queue_length(queue):
    broker = Broker(settings.CELERY_BROKER_URL)
    queues_result = broker.queues([queue])
    return queues_result.result()[0]['messages']

这在我的申请中很奏效:

def get_celery_queue_active_jobs(queue_name):
    connection = <CELERY_APP_INSTANCE>.connection()

    try:
        channel = connection.channel()
        name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
        active_jobs = []

        def dump_message(message):
            active_jobs.append(message.properties['application_headers']['task'])

        channel.basic_consume(queue=queue_name, callback=dump_message)

        for job in range(jobs):
            connection.drain_events()

        return active_jobs
    finally:
        connection.close()

Active_jobs将是一个字符串列表,对应于队列中的任务。

不要忘记将CELERY_APP_INSTANCE替换为您自己的。

感谢@ashish在这里为我指出了正确的方向:https://stackoverflow.com/a/19465670/9843399

from celery.task.control import inspect
def key_in_list(k, l):
    return bool([True for i in l if k in i.values()])

def check_task(task_id):
    task_value_dict = inspect().active().values()
    for task_list in task_value_dict:
        if self.key_in_list(task_id, task_list):
             return True
    return False

据我所知,芹菜没有提供API来检查队列中等待的任务。这是特定于代理的。例如,如果你使用Redis作为代理,那么检查在芹菜(默认)队列中等待的任务就像这样简单:

连接到代理 在芹菜列表中列出项目(以LRANGE命令为例)

请记住,这些任务等待可用的员工来挑选。您的集群可能有一些正在运行的任务——这些任务不会在这个列表中,因为它们已经被选中了。

检索特定队列中的任务的过程是特定于代理的。