Celery dynamic queue creation and routing(Celery 动态队列创建和路由)
问题描述
我正在尝试调用一个任务并为该任务创建一个队列,如果它不存在,则立即将调用的任务插入该队列.我有以下代码:
I'm trying to call a task and create a queue for that task if it doesn't exist then immediately insert to that queue the called task. I have the following code:
@task
def greet(name):
return "Hello %s!" % name
def run():
result = greet.delay(args=['marc'], queue='greet.1',
routing_key='greet.1')
print result.ready()
然后我有一个自定义路由器:
then I have a custom router:
class MyRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if task == 'tasks.greet':
return {'queue': kwargs['queue'],
'exchange': 'greet',
'exchange_type': 'direct',
'routing_key': kwargs['routing_key']}
return None
这将创建一个名为 greet.1
的交换和一个名为 greet.1
的队列,但该队列为空.交换器应该只称为 greet
,它知道如何将路由键(如 greet.1
)路由到名为 greet.1
的队列.
this creates an exchange called greet.1
and a queue called greet.1
but the queue is empty. The exchange should be just called greet
which knows how to route a routing key like greet.1
to the queue called greet.1
.
有什么想法吗?
推荐答案
当您执行以下操作时:
task.apply_async(queue='foo', routing_key='foobar')
然后 Celery 将从 CELERY_QUEUES 中的 'foo' 队列中获取默认值,或者如果它不存在,则使用 (queue=foo, exchange=foo, routing_key=foo) 自动创建它
Then Celery will take default values from the 'foo' queue in CELERY_QUEUES, or if it does not exist then automatically create it using (queue=foo, exchange=foo, routing_key=foo)
因此,如果 CELERY_QUEUES 中不存在foo",您最终会得到:
So if 'foo' does not exist in CELERY_QUEUES you will end up with:
queues['foo'] = Queue('foo', exchange=Exchange('foo'), routing_key='foo')
然后生产者将声明该队列,但是由于您覆盖了 routing_key,实际使用 routing_key = 'foobar'
The producer will then declare that queue, but since you override the routing_key,
actually send the message using routing_key = 'foobar'
这可能看起来很奇怪,但这种行为实际上对主题交换很有用,发布到不同主题的地方.
This may seem strange but the behavior is actually useful for topic exchanges, where you publish to different topics.
虽然很难做你想做的事,你可以自己创建队列并声明它,但这不适用于自动消息发布重试.如果 apply_async 的 queue 参数可以支持会更好一个自定义的 kombu.Queue
将被声明并用作目的地.也许你可以在 http://github.com/celery/celery/issues
It's harder to do what you want though, you can create the queue yourself
and declare it, but that won't work well with automatic message publish retries.
It would be better if the queue argument to apply_async could support
a custom kombu.Queue
instead that will be both declared and used as the destination.
Maybe you could open an issue for that at http://github.com/celery/celery/issues
这篇关于Celery 动态队列创建和路由的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:Celery 动态队列创建和路由


- 沿轴计算直方图 2022-01-01
- 使用Heroku上托管的Selenium登录Instagram时,找不到元素';用户名'; 2022-01-01
- pytorch 中的自适应池是如何工作的? 2022-07-12
- 如何在 Python 的元组列表中对每个元组中的第一个值求和? 2022-01-01
- padding='same' 转换为 PyTorch padding=# 2022-01-01
- python check_output 失败,退出状态为 1,但 Popen 适用于相同的命令 2022-01-01
- 分析异常:路径不存在:dbfs:/databricks/python/lib/python3.7/site-packages/sampleFolder/data; 2022-01-01
- python-m http.server 443--使用SSL? 2022-01-01
- 如何在 python3 中将 OrderedDict 转换为常规字典 2022-01-01
- 如何将一个类的函数分成多个文件? 2022-01-01