Process vs. Thread with regards to using Queue()/deque() and class variable for communication and quot;poison pillquot;(关于使用 Queue()/deque() 和类变量进行通信和“毒丸的进程与线程)
问题描述
我想创建一个在 While True 循环中永远运行的线程或进程.
I would like to create either a Thread or a Process which runs forever in a While True loop.
我需要以队列的形式向工作人员发送和接收数据,无论是 multiprocessing.Queue() 还是 collections.deque().我更喜欢使用 collections.deque(),因为它明显更快.
I need to send and receive data to the worker in the form for queues, either a multiprocessing.Queue() or a collections.deque(). I prefer to use collections.deque() as it is significantly faster.
我还需要最终能够杀死工作人员(因为它在 while True 循环中运行.这是我整理的一些测试代码,以尝试了解线程、进程、队列和 deque 之间的区别..
I also need to be able to kill the worker eventually (as it runs in a while True loop. Here is some test code I've put together to try and understand the differences between Threads, Processes, Queues, and deque ..
import time
from multiprocessing import Process, Queue
from threading import Thread
from collections import deque
class ThreadingTest(Thread):
def __init__(self, q):
super(ThreadingTest, self).__init__()
self.q = q
self.toRun = False
def run(self):
print("Started Thread")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Thread deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
print("Thread Queue: " + str(i))
def stop(self):
print("Trying to stop Thread")
self.toRun = False
while self.isAlive():
time.sleep(0.1)
print("Stopped Thread")
class ProcessTest(Process):
def __init__(self, q):
super(ProcessTest, self).__init__()
self.q = q
self.toRun = False
self.ctr = 0
def run(self):
print("Started Process")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Process deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
print("Process Queue: " + str(i))
def stop(self):
print("Trying to stop Process")
self.toRun = False
while self.is_alive():
time.sleep(0.1)
print("Stopped Process")
if __name__ == '__main__':
q = Queue()
t1 = ProcessTest(q)
t1.start()
for i in range(10):
if type(q) == type(deque()):
q.append(i)
elif type(q) == type(Queue()):
q.put_nowait(i)
time.sleep(1)
t1.stop()
t1.join()
if type(q) == type(deque()):
print(q)
elif type(q) == type(Queue()):
while q.qsize() > 0:
print(str(q.get_nowait()))
如您所见,t1 可以是 ThreadingTest 或 ProcessTest.此外,传递给它的队列可以是 multiprocessing.Queue 或 collections.deque.
As you can see, t1 can either be ThreadingTest, or ProcessTest. Also, the queue passed to it can either be a multiprocessing.Queue or a collections.deque.
ThreadingTest 与 Queue 或 deque() 一起使用.当调用 stop() 方法时,它也会正确终止 run().
ThreadingTest works with a Queue or deque(). It also kills run() properly when the stop() method is called.
Started Thread
Thread deque: 0
Thread deque: 1
Thread deque: 2
Thread deque: 3
Thread deque: 4
Thread deque: 5
Thread deque: 6
Thread deque: 7
Thread deque: 8
Thread deque: 9
Trying to stop Thread
Stopped Thread
deque([])
ProcessTest 只有在队列类型为 multiprocessing.Queue 时才能从队列中读取.它不适用于 collections.deque.此外,我无法使用 stop() 终止进程.
ProcessTest is only able to read from the queue if it is of type multiprocessing.Queue. It doesn't work with collections.deque. Furthermore, I am unable to kill the process using stop().
Process Queue: 0
Process Queue: 1
Process Queue: 2
Process Queue: 3
Process Queue: 4
Process Queue: 5
Process Queue: 6
Process Queue: 7
Process Queue: 8
Process Queue: 9
Trying to stop Process
我想知道为什么?另外,在进程中使用双端队列的最佳方法是什么?而且,我将如何使用某种 stop() 方法来终止进程.
I'm trying to figure out why? Also, what would be the best way to use deque with a process? And, how would I go about killing the process using some sort of stop() method.
推荐答案
你不能使用 collections.deque
在两个 multiprocessing.Process
实例之间传递数据,因为 collections.deque
不是进程感知的.multiprocessing.Queue
在内部将其内容写入 multiprocessing.Pipe
,这意味着其中的数据可以在一个进程中排队并在另一个进程中检索.collections.deque
没有那种管道,所以它不会工作.当您在一个进程中写入 deque
时,另一个进程中的 deque
实例完全不会受到影响;它们是完全独立的实例.
You can't use a collections.deque
to pass data between two multiprocessing.Process
instances, because collections.deque
is not process-aware. multiprocessing.Queue
writes its contents to a multiprocessing.Pipe
internally, which means that data in it can be enqueued in once process and retrieved in another. collections.deque
doesn't have that kind of plumbing, so it won't work. When you write to the deque
in one process, the deque
instance in the other process won't be affected at all; they're completely separate instances.
您的 stop()
方法也发生了类似的问题.您正在主进程中更改 toRun
的值,但这根本不会影响子进程.它们是完全独立的实例.结束孩子的最好方法是向 Queue
发送一些哨兵.当你在child中获得哨兵时,跳出无限循环:
A similar issue is happening to your stop()
method. You're changing the value of toRun
in the main process, but this won't affect the child at all. They're completely separate instances. The best way to end the child would be to send some sentinel to the Queue
. When you get the sentinel in the child, break out of the infinite loop:
def run(self):
print("Started Process")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Process deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
if i is None:
break # Got sentinel, so break
print("Process Queue: " + str(i))
def stop(self):
print("Trying to stop Process")
self.q.put(None) # Send sentinel
while self.is_alive():
time.sleep(0.1)
print("Stopped Process")
如果你确实需要两个进程之间的 deque
语义,你可以使用 自定义 multiprocessing.Manager()
以在 Manager
进程中创建共享 deque
,以及您的每个 Process
实例都会获得一个 Proxy
:
If you actually do need deque
semantics between two process, you can use a custom multiprocessing.Manager()
to create a shared deque
in a Manager
process, and each of your Process
instances will get a Proxy
to it:
import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from collections import deque
SyncManager.register('deque', deque)
def Manager():
m = SyncManager()
m.start()
return m
class ProcessTest(Process):
def __init__(self, q):
super(ProcessTest, self).__init__()
self.q = q
self.ctr = 0
def run(self):
print("Started Process")
self.toRun = True
while self.toRun:
if self.q._getvalue():
i = self.q.popleft()
if i is None:
break
print("Process deque: " + str(i))
def stop(self):
print("Trying to stop Process")
self.q.append(None)
while self.is_alive():
time.sleep(0.1)
print("Stopped Process")
if __name__ == '__main__':
m = Manager()
q = m.deque()
t1 = ProcessTest(q)
t1.start()
for i in range(10):
q.append(i)
time.sleep(1)
t1.stop()
t1.join()
print(q)
请注意,这可能不会比 multiprocessing.Queue
快,因为每次访问 deque
都会产生 IPC 成本.对于以您的方式传递消息来说,它也是一种不太自然的数据结构.
Note that this probably isn't going to be faster than a multiprocessing.Queue
, though, since there's an IPC cost for every time you access the deque
. It's also a much less natural data structure for passing messages the way you are.
这篇关于关于使用 Queue()/deque() 和类变量进行通信和“毒丸"的进程与线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:关于使用 Queue()/deque() 和类变量进行通信和“毒丸"的进程与线程
- 计算测试数量的Python单元测试 2022-01-01
- 我如何卸载 PyTorch? 2022-01-01
- 使用 Cython 将 Python 链接到共享库 2022-01-01
- ";find_element_by_name(';name';)";和&QOOT;FIND_ELEMENT(BY NAME,';NAME';)";之间有什么区别? 2022-01-01
- 我如何透明地重定向一个Python导入? 2022-01-01
- CTR 中的 AES 如何用于 Python 和 PyCrypto? 2022-01-01
- 检查具有纬度和经度的地理点是否在 shapefile 中 2022-01-01
- 如何使用PYSPARK从Spark获得批次行 2022-01-01
- YouTube API v3 返回截断的观看记录 2022-01-01
- 使用公司代理使Python3.x Slack(松弛客户端) 2022-01-01