如何检索队列中尚未处理的任务列表?
当前回答
如果您控制任务的代码,那么您可以通过让任务在第一次执行时触发一个微不足道的重试来解决这个问题,然后检查inspect().reserved()。重试将任务注册到结果后端,芹菜可以看到这一点。任务必须接受self或context作为第一个参数,这样我们才能访问重试计数。
@task(bind=True)
def mytask(self):
if self.request.retries == 0:
raise self.retry(exc=MyTrivialError(), countdown=1)
...
这个解决方案与代理无关。你不必担心你是用RabbitMQ还是Redis来存储任务。
编辑:经过测试,我发现这只是一个部分的解决方案。预留的大小受限于worker的预取设置。
其他回答
要获得队列上的任务数,你可以使用flower库,下面是一个简化的例子:
from flower.utils.broker import Broker
from django.conf import settings
def get_queue_length(queue):
broker = Broker(settings.CELERY_BROKER_URL)
queues_result = broker.queues([queue])
return queues_result.result()[0]['messages']
subprocess.run:
import subprocess
import re
active_process_txt = subprocess.run(['celery', '-A', 'my_proj', 'inspect', 'active'],
stdout=subprocess.PIPE).stdout.decode('utf-8')
return len(re.findall(r'worker_pid', active_process_txt))
注意使用your_proj更改my_proj
如果您控制任务的代码,那么您可以通过让任务在第一次执行时触发一个微不足道的重试来解决这个问题,然后检查inspect().reserved()。重试将任务注册到结果后端,芹菜可以看到这一点。任务必须接受self或context作为第一个参数,这样我们才能访问重试计数。
@task(bind=True)
def mytask(self):
if self.request.retries == 0:
raise self.retry(exc=MyTrivialError(), countdown=1)
...
这个解决方案与代理无关。你不必担心你是用RabbitMQ还是Redis来存储任务。
编辑:经过测试,我发现这只是一个部分的解决方案。预留的大小受限于worker的预取设置。
这在我的申请中很奏效:
def get_celery_queue_active_jobs(queue_name):
connection = <CELERY_APP_INSTANCE>.connection()
try:
channel = connection.channel()
name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
active_jobs = []
def dump_message(message):
active_jobs.append(message.properties['application_headers']['task'])
channel.basic_consume(queue=queue_name, callback=dump_message)
for job in range(jobs):
connection.drain_events()
return active_jobs
finally:
connection.close()
Active_jobs将是一个字符串列表,对应于队列中的任务。
不要忘记将CELERY_APP_INSTANCE替换为您自己的。
感谢@ashish在这里为我指出了正确的方向:https://stackoverflow.com/a/19465670/9843399
我认为获取正在等待的任务的唯一方法是保留一个已启动任务的列表,并让任务在启动时将自己从列表中删除。
通过rabbitmqctl和list_queues,你可以了解有多少任务正在等待,而不是任务本身:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
如果你想要的包括正在处理的任务,但还没有完成,你可以保留一个任务列表,并检查它们的状态:
from tasks import add
result = add.delay(4, 4)
result.ready() # True if finished
或者让芹菜使用CELERY_RESULT_BACKEND存储结果,并检查哪些任务不在其中。
推荐文章
- 证书验证失败:无法获得本地颁发者证书
- 当使用pip3安装包时,“Python中的ssl模块不可用”
- 无法切换Python与pyenv
- Python if not == vs if !=
- 如何从scikit-learn决策树中提取决策规则?
- 为什么在Mac OS X v10.9 (Mavericks)的终端中apt-get功能不起作用?
- 将旋转的xtick标签与各自的xtick对齐
- 为什么元组可以包含可变项?
- 如何合并字典的字典?
- 如何创建类属性?
- 不区分大小写的“in”
- 在Python中获取迭代器中的元素个数
- 解析日期字符串并更改格式
- 使用try和。Python中的if
- 如何在Python中获得所有直接子目录