How can I implement a pub/sub pattern using multiprocessing?(如何使用多处理实现发布/订阅模式?)
本文介绍了如何使用多处理实现发布/订阅模式?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
有没有办法使用multiprocessing
数据结构创建发布/订阅模式?换句话说,我希望拥有类似队列的东西,只是发布者可以同时向多个工作进程发送单个命令。
推荐答案
您可以创建自己的数据结构,以使用multiprocessing.Queue
包装器实现简单的发布/订阅模式:
import os
import multiprocessing
from functools import wraps
def ensure_parent(func):
@wraps(func)
def inner(self, *args, **kwargs):
if os.getpid() != self._creator_pid:
raise RuntimeError("{} can only be called in the "
"parent.".format(func.__name__))
return func(self, *args, **kwargs)
return inner
class PublishQueue(object):
def __init__(self):
self._queues = []
self._creator_pid = os.getpid()
def __getstate__(self):
self_dict = self.__dict__
self_dict['_queues'] = []
return self_dict
def __setstate__(self, state):
self.__dict__.update(state)
@ensure_parent
def register(self):
q = multiprocessing.Queue()
self._queues.append(q)
return q
@ensure_parent
def publish(self, val):
for q in self._queues:
q.put(val)
def worker(q):
for item in iter(q.get, None):
print("got item {} in process {}".format(item, os.getpid()))
if __name__ == "__main__":
q = PublishQueue()
processes = []
for _ in range(3):
p = multiprocessing.Process(target=worker, args=(q.register(),))
p.start()
processes.append(p)
q.publish('1')
q.publish(2)
q.publish(None) # Shut down workers
for p in processes:
p.join()
输出:
got item 1 in process 4383
got item 2 in process 4383
got item 1 in process 4381
got item 2 in process 4381
got item 1 in process 4382
got item 2 in process 4382
只要父进程是唯一执行发布的进程,并且为父进程中的每个工作进程注册一个订阅队列,然后使用其multiprocessing.Process
构造函数将订阅队列传递给工作进程,则此模式就可以很好地工作。这些限制是由于multiprocessing.Queue
不可拾取。如果需要将订阅队列传递给已在运行的工作进程,则需要调整实现以使用multiprocessing.Manager.Queue
。
这篇关于如何使用多处理实现发布/订阅模式?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
沃梦达教程
本文标题为:如何使用多处理实现发布/订阅模式?
猜你喜欢
- 使用公司代理使Python3.x Slack(松弛客户端) 2022-01-01
- 使用 Cython 将 Python 链接到共享库 2022-01-01
- 计算测试数量的Python单元测试 2022-01-01
- ";find_element_by_name(';name';)";和&QOOT;FIND_ELEMENT(BY NAME,';NAME';)";之间有什么区别? 2022-01-01
- YouTube API v3 返回截断的观看记录 2022-01-01
- CTR 中的 AES 如何用于 Python 和 PyCrypto? 2022-01-01
- 我如何透明地重定向一个Python导入? 2022-01-01
- 我如何卸载 PyTorch? 2022-01-01
- 如何使用PYSPARK从Spark获得批次行 2022-01-01
- 检查具有纬度和经度的地理点是否在 shapefile 中 2022-01-01