如何检索队列中尚未处理的任务列表?


当前回答

这在我的申请中很奏效:

def get_celery_queue_active_jobs(queue_name):
    connection = <CELERY_APP_INSTANCE>.connection()

    try:
        channel = connection.channel()
        name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
        active_jobs = []

        def dump_message(message):
            active_jobs.append(message.properties['application_headers']['task'])

        channel.basic_consume(queue=queue_name, callback=dump_message)

        for job in range(jobs):
            connection.drain_events()

        return active_jobs
    finally:
        connection.close()

Active_jobs将是一个字符串列表,对应于队列中的任务。

不要忘记将CELERY_APP_INSTANCE替换为您自己的。

感谢@ashish在这里为我指出了正确的方向:https://stackoverflow.com/a/19465670/9843399

其他回答

我认为获取正在等待的任务的唯一方法是保留一个已启动任务的列表,并让任务在启动时将自己从列表中删除。

通过rabbitmqctl和list_queues,你可以了解有多少任务正在等待,而不是任务本身:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

如果你想要的包括正在处理的任务,但还没有完成,你可以保留一个任务列表,并检查它们的状态:

from tasks import add
result = add.delay(4, 4)

result.ready() # True if finished

或者让芹菜使用CELERY_RESULT_BACKEND存储结果,并检查哪些任务不在其中。

如果您控制任务的代码,那么您可以通过让任务在第一次执行时触发一个微不足道的重试来解决这个问题,然后检查inspect().reserved()。重试将任务注册到结果后端,芹菜可以看到这一点。任务必须接受self或context作为第一个参数,这样我们才能访问重试计数。

@task(bind=True)
def mytask(self):
    if self.request.retries == 0:
        raise self.retry(exc=MyTrivialError(), countdown=1)
    ...

这个解决方案与代理无关。你不必担心你是用RabbitMQ还是Redis来存储任务。

编辑:经过测试,我发现这只是一个部分的解决方案。预留的大小受限于worker的预取设置。

芹菜检查模块似乎只知道从工作人员的角度来看的任务。如果你想查看队列中的消息(还没有被worker提取),我建议使用pyrabbit,它可以与rabbitmq http api接口,从队列中检索各种信息。

一个例子可以在这里找到: 使用芹菜检索队列长度(RabbitMQ, Django)

EDIT:查看获取队列中任务列表的其他答案。

你应该看这里: 芹菜指南-检查工人

基本上是这样的:

my_app = Celery(...)

# Inspect all nodes.
i = my_app.control.inspect()

# Show the items that have an ETA or are scheduled for later processing
i.scheduled()

# Show tasks that are currently active.
i.active()

# Show tasks that have been claimed by workers
i.reserved()

这取决于你想要什么

要从后端检索任务,使用这个

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
                       password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)