如何检索队列中尚未处理的任务列表?


当前回答

据我所知,芹菜没有提供API来检查队列中等待的任务。这是特定于代理的。例如,如果你使用Redis作为代理,那么检查在芹菜(默认)队列中等待的任务就像这样简单:

连接到代理 在芹菜列表中列出项目(以LRANGE命令为例)

请记住,这些任务等待可用的员工来挑选。您的集群可能有一些正在运行的任务——这些任务不会在这个列表中,因为它们已经被选中了。

检索特定队列中的任务的过程是特定于代理的。

其他回答

我得出的结论是,获得队列上的作业数量的最佳方法是使用rabbitmqctl,正如这里多次建议的那样。为了允许任何选择的用户使用sudo运行命令,我遵循了这里的说明(我跳过了编辑配置文件部分,因为我不介意在命令之前键入sudo)。

我还获取了jamesc的grep和cut代码片段,并将其封装在子进程调用中。

from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))
from celery.task.control import inspect
def key_in_list(k, l):
    return bool([True for i in l if k in i.values()])

def check_task(task_id):
    task_value_dict = inspect().active().values()
    for task_list in task_value_dict:
        if self.key_in_list(task_id, task_list):
             return True
    return False

要从后端检索任务,使用这个

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
                       password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)

这在我的申请中很奏效:

def get_celery_queue_active_jobs(queue_name):
    connection = <CELERY_APP_INSTANCE>.connection()

    try:
        channel = connection.channel()
        name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
        active_jobs = []

        def dump_message(message):
            active_jobs.append(message.properties['application_headers']['task'])

        channel.basic_consume(queue=queue_name, callback=dump_message)

        for job in range(jobs):
            connection.drain_events()

        return active_jobs
    finally:
        connection.close()

Active_jobs将是一个字符串列表,对应于队列中的任务。

不要忘记将CELERY_APP_INSTANCE替换为您自己的。

感谢@ashish在这里为我指出了正确的方向:https://stackoverflow.com/a/19465670/9843399

如果你不使用优先级任务,这其实很简单,如果你使用的是Redis。获取任务计数:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

但是,优先级任务在redis中使用不同的键,所以整体情况稍微复杂一些。总的来说,您需要为任务的每个优先级查询redis。在python中(以及在Flower项目中),它看起来像:

PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]


def make_queue_name_for_pri(queue, pri):
    """Make a queue name for redis

    Celery uses PRIORITY_SEP to separate different priorities of tasks into
    different queues in Redis. Each queue-priority combination becomes a key in
    redis with names like:

     - batch1\x06\x163 <-- P3 queue named batch1

    There's more information about this in Github, but it doesn't look like it 
    will change any time soon:

      - https://github.com/celery/kombu/issues/422

    In that ticket the code below, from the Flower project, is referenced:

      - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135

    :param queue: The name of the queue to make a name for.
    :param pri: The priority to make a name with.
    :return: A name for the queue-priority pair.
    """
    if pri not in DEFAULT_PRIORITY_STEPS:
        raise ValueError('Priority not in priority steps')
    return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
                                (queue, '', '')))


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
                      DEFAULT_PRIORITY_STEPS]
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return sum([r.llen(x) for x in priority_names])

如果你想要获得一个实际的任务,你可以使用以下方法:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1

从那里,您必须反序列化返回的列表。以我为例,我可以通过以下方法来实现:

r = redis.StrictRedis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))

请注意,反序列化可能需要一些时间,您需要调整上面的命令以处理不同的优先级。