Combining asyncio with a multi-worker ProcessPoolExecutor and for async(将异步与多工作器ProcessPoolExecutor结合使用和用于异步)
本文介绍了将异步与多工作器ProcessPoolExecutor结合使用和用于异步的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我的问题与Combining asyncio with a multi-worker ProcessPoolExecutor非常相似,但稍有更改(我相信是async for
),我就不会使用那里的优秀答案。
我正在尝试以下MWE:
import concurrent.futures
import asyncio
import time
async def mygen(u: int = 2):
i = 0
while i < u:
yield i
i += 1
def blocking(delay):
time.sleep(delay+1)
return('EXECUTOR: Completed blocking task number ' + str(delay+1))
async def non_blocking(loop):
with concurrent.futures.ProcessPoolExecutor() as executor:
async for i in mygen():
print('MASTER: Sending to executor blocking task number ' + str(i+1))
result = await loop.run_in_executor(executor, blocking, i)
print(result)
print('MASTER: Well done executor - you seem to have completed blocking task number ' + str(i+1))
loop = asyncio.get_event_loop()
loop.run_until_complete(non_blocking(loop))
不出所料,它的输出不是异步的:
MASTER: Sending to executor blocking task number 1
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
MASTER: Sending to executor blocking task number 2
EXECUTOR: Completed blocking task number 2
MASTER: Well done executor - you seem to have completed blocking task number 2
我想调整代码,使任务在两个并发进程中运行,并在输出可用时打印输出。所需输出为:
MASTER: Sending to executor blocking task number 1
MASTER: Sending to executor blocking task number 2
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
EXECUTOR: Completed blocking task number 2
MASTER: Well done executor - you seem to have completed blocking task number 2
我从Combining asyncio with a multi-worker ProcessPoolExecutor了解到,就目前情况而言,我的await loop.run_in_executor()
语法是阻塞的。我不知道如何替换它,以允许async for
移动到下一个生成的值,同时等待执行器完成他们的工作。注意:我没有像他们的示例中那样使用asyncio.gather
。
推荐答案
如果希望最多有两个进程运行任务,最简单的方法是使用max_workers=2
创建执行器。然后,您可以尽可能快地提交任务,即继续async for
的下一个迭代,而无需等待前一个任务完成。您可以在结束时收集所有任务的结果,以确保异常不会被忽略(并可能获得实际结果)。
以下代码生成预期的输出:
from concurrent.futures import ProcessPoolExecutor
import asyncio
import time
async def mygen(u: int = 2):
i = 0
while i < u:
yield i
i += 1
def blocking(delay):
time.sleep(delay+1)
return('EXECUTOR: Completed blocking task number ' + str(delay+1))
async def run_blocking(executor, task_no, delay):
print('MASTER: Sending to executor blocking task number '
+ str(task_no))
result = await loop.run_in_executor(executor, blocking, delay)
print(result)
print('MASTER: Well done executor - you seem to have completed '
'blocking task number ' + str(task_no))
async def non_blocking(loop):
tasks = []
with ProcessPoolExecutor(max_workers=2) as executor:
async for i in mygen():
# spawn the task and let it run in the background
tasks.append(asyncio.create_task(
run_blocking(executor, i + 1, i)))
# if there was an exception, retrieve it now
await asyncio.gather(*tasks)
loop = asyncio.get_event_loop()
loop.run_until_complete(non_blocking(loop))
这篇关于将异步与多工作器ProcessPoolExecutor结合使用和用于异步的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
沃梦达教程
本文标题为:将异步与多工作器ProcessPoolExecutor结合使用和用于异步


猜你喜欢
- 分析异常:路径不存在:dbfs:/databricks/python/lib/python3.7/site-packages/sampleFolder/data; 2022-01-01
- python-m http.server 443--使用SSL? 2022-01-01
- python check_output 失败,退出状态为 1,但 Popen 适用于相同的命令 2022-01-01
- 如何将一个类的函数分成多个文件? 2022-01-01
- pytorch 中的自适应池是如何工作的? 2022-07-12
- 如何在 Python 的元组列表中对每个元组中的第一个值求和? 2022-01-01
- 使用Heroku上托管的Selenium登录Instagram时,找不到元素';用户名'; 2022-01-01
- padding='same' 转换为 PyTorch padding=# 2022-01-01
- 如何在 python3 中将 OrderedDict 转换为常规字典 2022-01-01
- 沿轴计算直方图 2022-01-01