Equivalent of asyncio.Queues with worker quot;threadsquot;(等效于带有工作“线程的 asyncio.Queues)
问题描述
我正在尝试弄清楚如何移植线程程序以使用 asyncio
.我有很多代码可以围绕几个标准库 Queues
进行同步,基本上是这样的:
I'm trying to figure out how to port a threaded program to use asyncio
. I have a lot of code which synchronizes around a few standard library Queues
, basically like this:
import queue, random, threading, time
q = queue.Queue()
def produce():
while True:
time.sleep(0.5 + random.random()) # sleep for .5 - 1.5 seconds
q.put(random.random())
def consume():
while True:
value = q.get(block=True)
print("Consumed", value)
threading.Thread(target=produce).start()
threading.Thread(target=consume).start()
一个线程创建值(可能是用户输入),另一个线程对它们进行处理.关键是这些线程在有新数据之前处于空闲状态,此时它们会醒来并对其进行处理.
One thread creates values (possibly user input), and another thread does something with them. The point is that these threads are idle until there's new data, at which point they wake up and do something with it.
我正在尝试使用 asyncio 来实现这种模式,但我似乎无法弄清楚如何让它运行".
I'm trying to implement this pattern using asyncio, but I can't seem to figure out how to make it "go".
我的尝试或多或少看起来像这样(并且根本不做任何事情).
My attempts look more or less like this (and don't do anything at all).
import asyncio, random
q = asyncio.Queue()
@asyncio.coroutine
def produce():
while True:
q.put(random.random())
yield from asyncio.sleep(0.5 + random.random())
@asyncio.coroutine
def consume():
while True:
value = yield from q.get()
print("Consumed", value)
# do something here to start the coroutines. asyncio.Task()?
loop = asyncio.get_event_loop()
loop.run_forever()
我尝试过使用协程的变体,不使用协程,在任务中包装东西,试图让它们创建或返回期货等.
I've tried variations on using coroutines, not using them, wrapping stuff in Tasks, trying to make them create or return futures, etc.
我开始认为我对应该如何使用 asyncio 有错误的想法(也许这种模式应该以我不知道的不同方式实现).任何指针将不胜感激.
I'm starting to think that I have the wrong idea about how I should be using asyncio (maybe this pattern should be implemented in a different way that I'm not aware of). Any pointers would be appreciated.
推荐答案
是的,没错.任务是你的朋友:
Yes, exactly. Tasks are your friends:
import asyncio, random
q = asyncio.Queue()
@asyncio.coroutine
def produce():
while True:
yield from q.put(random.random())
yield from asyncio.sleep(0.5 + random.random())
@asyncio.coroutine
def consume():
while True:
value = yield from q.get()
print("Consumed", value)
loop = asyncio.get_event_loop()
loop.create_task(produce())
loop.create_task(consume())
loop.run_forever()
asyncio.ensure_future
也可用于创建任务.
请记住:q.put()
是一个协程,所以你应该使用 yield from q.put(value)代码>.
And please keep in mind: q.put()
is a coroutine, so you should to use yield from q.put(value)
.
UPD
从 asyncio.Task()
/asyncio.async()
切换到新品牌 API loop.create_task()
和 示例中的 asyncio.ensure_future()
.
Switched from asyncio.Task()
/asyncio.async()
to new brand API loop.create_task()
and asyncio.ensure_future()
in example.
这篇关于等效于带有工作“线程"的 asyncio.Queues的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:等效于带有工作“线程"的 asyncio.Queues
- 检查具有纬度和经度的地理点是否在 shapefile 中 2022-01-01
- 使用公司代理使Python3.x Slack(松弛客户端) 2022-01-01
- 我如何卸载 PyTorch? 2022-01-01
- 计算测试数量的Python单元测试 2022-01-01
- ";find_element_by_name(';name';)";和&QOOT;FIND_ELEMENT(BY NAME,';NAME';)";之间有什么区别? 2022-01-01
- 如何使用PYSPARK从Spark获得批次行 2022-01-01
- CTR 中的 AES 如何用于 Python 和 PyCrypto? 2022-01-01
- 使用 Cython 将 Python 链接到共享库 2022-01-01
- YouTube API v3 返回截断的观看记录 2022-01-01
- 我如何透明地重定向一个Python导入? 2022-01-01