Synchronize asyncio Queue(同步异步队列)
问题描述
我计划使用基于异步队列的生产者-消费者实现来处理实时数据,其中以正确的时间顺序发送数据至关重要.所以这里是它的代码片段:
I am planning to have an asyncio Queue based producer-consumer implementation for a processing of realtime data where sending out data in correct time order is vital. So here is the code snippet of it :
async def produce(Q, n_jobs):
for i in range(n_jobs):
print(f"Producing :{i}")
await Q.put(i)
async def consume(Q):
while True:
n = await Q.get()
print(f"Consumed :{n}")
x = do_sometask_and_return_the_result(n)
print(f"Finished :{n} and Result: {x}")
async def main(loop):
Q = asyncio.Queue(loop=loop, maxsize=3)
await asyncio.wait([produce(Q, 10), consume(Q), consume(Q), consume(Q)])
print("Done")
这里生产者产生数据并将其放入异步队列.我有多个消费者来消费和处理数据.在查看输出时,在打印Consumed :{n}"时保持顺序.(如 1,2,3,4... 等等),这完全没问题.但是,由于函数 do_sometask_and_return_the_result(n) 需要可变时间来返回结果,因此在 n Finished :{n}"的下一次打印中不会保持顺序.(如 2,1,4,3,5,...).
Here the producer produces data and puts it into the asyncio Queue. I have multiple consumers to consume and process the data. While seeing the outputs, the order is maintained while printing "Consumed :{n}" (as in 1,2,3,4... and so on) , this is completely fine. but, since the function do_sometask_and_return_the_result(n) takes variable time to return the result, the order is not maintained in the next print of n "Finished :{n}" (as in 2,1,4,3,5,...).
有什么方法可以同步这些数据,因为我需要保持打印结果的顺序?即使在 do_sometask_and_return_the_result(n) 之后,我也希望看到 1,2,3,4,.. 的 'n' 连续打印.
Is there any way to synchronize this data as I need to maintain the order of printing the results? I want to see 1,2,3,4,.. sequential prints for 'n' even after do_sometask_and_return_the_result(n).
推荐答案
您可以使用优先队列系统(使用 python heapq
库)在作业完成后重新排序作业.像这样.
You could use a priority queue system (using the python heapq
library) to reorder your jobs after they are complete. Something like this.
# add these variables at class/global scope
priority_queue = []
current_job_id = 1
job_id_dict = {}
async def produce(Q, n_jobs):
# same as above
async def consume(Q):
while True:
n = await Q.get()
print(f"Consumed :{n}")
x = do_sometask_and_return_the_result(n)
await process_result(n, x)
async def process_result(n, x):
heappush(priority_queue, n)
job_id_dict[n] = x
while current_job_id == priority_queue[0]:
job_id = heappop(priority_queue)
print(f"Finished :{job_id} and Result: {job_id_dict[job_id]}")
current_job_id += 1
async def main(loop):
Q = asyncio.Queue(loop=loop, maxsize=3)
await asyncio.wait([produce(Q, 10), consume(Q), consume(Q), consume(Q)])
print("Done")
有关 heapq
模块的更多信息:https://docs.python.org/3/library/heapq.html
For more information on the heapq
module: https://docs.python.org/3/library/heapq.html
这篇关于同步异步队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:同步异步队列
- YouTube API v3 返回截断的观看记录 2022-01-01
- 我如何卸载 PyTorch? 2022-01-01
- 使用公司代理使Python3.x Slack(松弛客户端) 2022-01-01
- CTR 中的 AES 如何用于 Python 和 PyCrypto? 2022-01-01
- 使用 Cython 将 Python 链接到共享库 2022-01-01
- 我如何透明地重定向一个Python导入? 2022-01-01
- ";find_element_by_name(';name';)";和&QOOT;FIND_ELEMENT(BY NAME,';NAME';)";之间有什么区别? 2022-01-01
- 检查具有纬度和经度的地理点是否在 shapefile 中 2022-01-01
- 计算测试数量的Python单元测试 2022-01-01
- 如何使用PYSPARK从Spark获得批次行 2022-01-01