如何检索队列中尚未处理的任务列表?
当前回答
Redis json序列化的复制粘贴解决方案:
def get_celery_queue_items(queue_name):
import base64
import json
# Get a configured instance of a celery app:
from yourproject.celery import app as celery_app
with celery_app.pool.acquire(block=True) as conn:
tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
decoded_tasks = []
for task in tasks:
j = json.loads(task)
body = json.loads(base64.b64decode(j['body']))
decoded_tasks.append(body)
return decoded_tasks
它与Django一起工作。只是别忘了改变你的项目。
其他回答
Redis json序列化的复制粘贴解决方案:
def get_celery_queue_items(queue_name):
import base64
import json
# Get a configured instance of a celery app:
from yourproject.celery import app as celery_app
with celery_app.pool.acquire(block=True) as conn:
tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
decoded_tasks = []
for task in tasks:
j = json.loads(task)
body = json.loads(base64.b64decode(j['body']))
decoded_tasks.append(body)
return decoded_tasks
它与Django一起工作。只是别忘了改变你的项目。
如果您控制任务的代码,那么您可以通过让任务在第一次执行时触发一个微不足道的重试来解决这个问题,然后检查inspect().reserved()。重试将任务注册到结果后端,芹菜可以看到这一点。任务必须接受self或context作为第一个参数,这样我们才能访问重试计数。
@task(bind=True)
def mytask(self):
if self.request.retries == 0:
raise self.retry(exc=MyTrivialError(), countdown=1)
...
这个解决方案与代理无关。你不必担心你是用RabbitMQ还是Redis来存储任务。
编辑:经过测试,我发现这只是一个部分的解决方案。预留的大小受限于worker的预取设置。
如果你正在使用芹菜+Django,最简单的方法是在虚拟环境中直接从终端使用命令检查任务,或者使用芹菜的完整路径:
道格:http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke inspecting-workers
$ celery inspect reserved
$ celery inspect active
$ celery inspect registered
$ celery inspect scheduled
另外,如果你正在使用芹菜+RabbitMQ,你可以使用下面的命令检查队列列表:
更多信息:https://linux.die.net/man/1/rabbitmqctl
$ sudo rabbitmqctl list_queues
要从后端检索任务,使用这个
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)
这在我的申请中很奏效:
def get_celery_queue_active_jobs(queue_name):
connection = <CELERY_APP_INSTANCE>.connection()
try:
channel = connection.channel()
name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
active_jobs = []
def dump_message(message):
active_jobs.append(message.properties['application_headers']['task'])
channel.basic_consume(queue=queue_name, callback=dump_message)
for job in range(jobs):
connection.drain_events()
return active_jobs
finally:
connection.close()
Active_jobs将是一个字符串列表,对应于队列中的任务。
不要忘记将CELERY_APP_INSTANCE替换为您自己的。
感谢@ashish在这里为我指出了正确的方向:https://stackoverflow.com/a/19465670/9843399
推荐文章
- 证书验证失败:无法获得本地颁发者证书
- 当使用pip3安装包时,“Python中的ssl模块不可用”
- 无法切换Python与pyenv
- Python if not == vs if !=
- 如何从scikit-learn决策树中提取决策规则?
- 为什么在Mac OS X v10.9 (Mavericks)的终端中apt-get功能不起作用?
- 将旋转的xtick标签与各自的xtick对齐
- 为什么元组可以包含可变项?
- 如何合并字典的字典?
- 如何创建类属性?
- 不区分大小写的“in”
- 在Python中获取迭代器中的元素个数
- 解析日期字符串并更改格式
- 使用try和。Python中的if
- 如何在Python中获得所有直接子目录