How to add multiprocessing to consumer with pika (RabbitMQ) in python(如何在 python 中使用 pika (RabbitMQ) 向消费者添加多处理)
问题描述
我在 python 中使用 pika 框架编写了非常基本的生产者-消费者代码.问题是 - 消费者端在队列中的消息上运行太慢.我进行了一些测试,发现我可以通过多处理将工作流程加快 27 倍.问题是 - 我不知道向我的代码添加多处理功能的正确方法是什么.
I have very basic producer-consumer code written with pika framework in python. The problem is - consumer side runs too slow on messages in queue. I ran some tests and found out that i can speed up the workflow up to 27 times with multiprocessing. The problem is - I don't know what is the right way to add multiprocessing functionality to my code.
import pika
import json
from datetime import datetime
from functions import download_xmls
def callback(ch, method, properties, body):
print('Got something')
body = json.loads(body)
type = body[-1]['Type']
print('Object type in work currently ' + type)
cnums = [x['cadnum'] for x in body[:-1]]
print('Got {} cnums to work with'.format(len(cnums)))
date_start = datetime.now()
download_xmls(type,cnums)
date_end = datetime.now()
ch.basic_ack(delivery_tag=method.delivery_tag)
print('Download complete in {} seconds'.format((date_end-date_start).total_seconds()))
def consume(queue_name = 'bot-test'):
parameters = pika.URLParameters('server@address')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='bot-test')
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
如何从这里开始添加多处理功能?
How do I start with adding multiprocessing functionality from here?
推荐答案
Pika 有广泛的 示例代码,我建议您查看.请注意,此代码仅供 示例 使用.在处理线程的情况下,您将不得不使用更智能的方式来管理您的线程.
Pika has extensive example code that I recommend you check out. Note that this code is for example use only. In the case of doing work on threads, you will have to use a more intelligent way to manage your threads.
目标是不阻塞运行 Pika IO 循环的线程,并从您的工作线程正确回调到 IO 循环.这就是 add_callback_threadsafe
存在并在该代码中使用的原因.
The goal is to not block the thread that runs Pika's IO loop, and to call back into the IO loop correctly from your worker threads. That's why add_callback_threadsafe
exists and is used in that code.
注意: RabbitMQ 团队监控 rabbitmq-users
邮件列表,并且有时只回答 StackOverflow 上的问题.
NOTE: the RabbitMQ team monitors the rabbitmq-users
mailing list and only sometimes answers questions on StackOverflow.
这篇关于如何在 python 中使用 pika (RabbitMQ) 向消费者添加多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:如何在 python 中使用 pika (RabbitMQ) 向消费者添加多


- pytorch 中的自适应池是如何工作的? 2022-07-12
- padding='same' 转换为 PyTorch padding=# 2022-01-01
- python check_output 失败,退出状态为 1,但 Popen 适用于相同的命令 2022-01-01
- 沿轴计算直方图 2022-01-01
- python-m http.server 443--使用SSL? 2022-01-01
- 使用Heroku上托管的Selenium登录Instagram时,找不到元素';用户名'; 2022-01-01
- 如何将一个类的函数分成多个文件? 2022-01-01
- 如何在 python3 中将 OrderedDict 转换为常规字典 2022-01-01
- 如何在 Python 的元组列表中对每个元组中的第一个值求和? 2022-01-01
- 分析异常:路径不存在:dbfs:/databricks/python/lib/python3.7/site-packages/sampleFolder/data; 2022-01-01