扭曲与多处理事件和队列不兼容?

is twisted incompatible with multiprocessing events and queues?(扭曲与多处理事件和队列不兼容?)

本文介绍了扭曲与多处理事件和队列不兼容?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试模拟使用 twisted 运行的应用程序网络.作为我模拟的一部分,我想同步某些事件并能够为每个进程提供大量数据.我决定使用多处理事件和队列.但是,我的进程正在挂起.

I am trying to simulate a network of applications that run using twisted. As part of my simulation I would like to synchronize certain events and be able to feed each process large amounts of data. I decided to use multiprocessing Events and Queues. However, my processes are getting hung.

我编写了下面的示例代码来说明问题.具体来说,(大约 95% 的时间在我的沙桥机器上),'run_in_thread' 函数完成,但是直到我按下 Ctrl-C 后才会调用 'print_done' 回调.

I wrote the example code below to illustrate the problem. Specifically, (about 95% of the time on my sandy bridge machine), the 'run_in_thread' function finishes, however the 'print_done' callback is not called until after I press Ctrl-C.

此外,我可以更改示例代码中的一些内容以使这项工作更可靠,例如:减少衍生进程的数量、从 reactor_ready 调用 self.ready.set 或更改 deferLater 的延迟.

Additionally, I can change several things in the example code to make this work more reliably such as: reducing the number of spawned processes, calling self.ready.set from reactor_ready, or changing the delay of deferLater.

我猜在扭曲反应器和阻塞多处理调用(例如 Queue.get() 或 Event.wait())之间存在竞争条件?

I am guessing there is a race condition somewhere between the twisted reactor and blocking multiprocessing calls such as Queue.get() or Event.wait()?

我遇到的具体问题是什么?我的代码中是否有我遗漏的错误?我可以解决这个问题还是扭曲与多处理事件/队列不兼容?

What exactly is the problem I am running into? Is there a bug in my code that I am missing? Can I fix this or is twisted incompatible with multiprocessing events/queues?

其次,像 spawnProcess 或 Ampoule 这样的东西会是推荐的替代品吗?(如 Mix Python Twisted 与多处理? 中的建议)

Secondly, would something like spawnProcess or Ampoule be the recommended alternative? (as suggested in Mix Python Twisted with multiprocessing?)

编辑(按要求):

我尝试过 glib2reactor selectreactor、pollreactor 和 epollreactor 的所有反应器都遇到了问题.epollreactor 似乎给出了最好的结果,并且对于下面给出的示例似乎工作正常,但在我的应用程序中仍然给我同样(或类似)的问题.我会继续调查.

I've run into problems with all the reactors I've tried glib2reactor selectreactor, pollreactor, and epollreactor. The epollreactor seems to give the best results and seems to work fine for the example given below but still gives me the same (or a similar) problem in my application. I will continue investigating.

我正在运行 Gentoo Linux 内核 3.3 和 3.4、python 2.7,并且我尝试过 Twisted 10.2.0、11.0.0、11.1.0、12.0.0 和 12.1.0.

I'm running Gentoo Linux kernel 3.3 and 3.4, python 2.7, and I've tried Twisted 10.2.0, 11.0.0, 11.1.0, 12.0.0, and 12.1.0.

除了我的沙桥机器,我在我的双核 amd 机器上也看到了同样的问题.

In addition to my sandy bridge machine, I see the same issue on my dual core amd machine.

#!/usr/bin/python
# -*- coding: utf-8 *-*

from twisted.internet import reactor
from twisted.internet import threads
from twisted.internet import task

from multiprocessing import Process
from multiprocessing import Event

class TestA(Process):
    def __init__(self):
        super(TestA, self).__init__()
        self.ready = Event()
        self.ready.clear()
        self.start()

    def run(self):
        reactor.callWhenRunning(self.reactor_ready)
        reactor.run()

    def reactor_ready(self, *args):
        task.deferLater(reactor, 1, self.node_ready)
        return args

    def node_ready(self, *args):
        print 'node_ready'
        self.ready.set()
        return args

def reactor_running():
    print 'reactor_running'
    df = threads.deferToThread(run_in_thread)
    df.addCallback(print_done)

def run_in_thread():
    print 'run_in_thread'
    for n in processes:
        n.ready.wait()

def print_done(dfResult=None):
    print 'print_done'
    reactor.stop()

if __name__ == '__main__':
    processes = [TestA() for i in range(8)]
    reactor.callWhenRunning(reactor_running)
    reactor.run()

推荐答案

简短的回答是肯定的,Twisted 和 multiprocessing 彼此不兼容,你不能像你尝试的那样可靠地使用它们.

The short answer is yes, Twisted and multiprocessing are not compatible with each other, and you cannot reliably use them as you are attempting to.

在所有 POSIX 平台上,子进程管理与 SIGCHLD 处理密切相关.POSIX 信号处理程序是进程全局的,每种信号类型只能有一个.

On all POSIX platforms, child process management is closely tied to SIGCHLD handling. POSIX signal handlers are process-global, and there can be only one per signal type.

Twisted 和 stdlib multiprocessing 不能同时安装 SIGCHLD 处理程序.只有其中一个可以.这意味着其中只有一个可以可靠地管理子进程.您的示例应用程序无法控制它们中的哪一个将赢得该能力,因此我预计它的行为会因该事实而产生一些不确定性.

Twisted and stdlib multiprocessing cannot both have a SIGCHLD handler installed. Only one of them can. That means only one of them can reliably manage child processes. Your example application doesn't control which of them will win that ability, so I would expect there to be some non-determinism in its behavior arising from that fact.

但是,您的示例更直接的问题是您在父进程中加载​​ Twisted,然后使用 multiprocessing 派生 而不是 exec 所有子进程.Twisted 不支持这样使用.如果你 fork 然后 exec,没有问题.但是,缺少新进程(可能是使用 Twisted 的 Python 进程)的 exec 会导致 Twisted 无法解释的各种额外共享状态.在您的特定情况下,导致此问题的共享状态是用于实现 deferToThread 的内部waker fd".由于 fd 在父级和所有子级之间共享,当父级试图唤醒主线程以传递 deferToThread 调用的结果时,它很可能会唤醒 其中一个子级而是处理.子进程没有什么有用的事情可做,所以这只是浪费时间.同时,父线程中的主线程永远不会醒来,也不会注意到您的线程任务已完成.

However, the more immediate problem with your example is that you load Twisted in the parent process and then use multiprocessing to fork and not exec all of the child processes. Twisted does not support being used like this. If you fork and then exec, there's no problem. However, the lack of an exec of a new process (perhaps a Python process using Twisted) leads to all kinds of extra shared state which Twisted does not account for. In your particular case, the shared state that causes this problem is the internal "waker fd" which is used to implement deferToThread. With the fd shared between the parent and all the children, when the parent tries to wake up the main thread to deliver the result of the deferToThread call, it most likely wakes up one of the child processes instead. The child process has nothing useful to do, so that's just a waste of time. Meanwhile the main thread in the parent never wakes up and never notices your threaded task is done.

您可以通过在创建子进程之前不加载任何 Twisted 来避免此问题.就 Twisted 而言,这会将您的使用转变为单进程用例(在每个进程中,它最初会被加载,然后该进程将不会继续分叉,所以毫无疑问 fork 和 Twisted 是如何交互的).这意味着在创建子进程之前甚至不导入 Twisted.

It's possible you can avoid this issue by not loading any of Twisted until you've already created the child processes. This would turn your usage into a single-process use case as far as Twisted is concerned (in each process, it would be initially loaded, and then that process would not go on to fork at all, so there's no question of how fork and Twisted interact anymore). This means not even importing Twisted until after you've created the child processes.

当然,这只对 Twisted 有帮助.您使用的任何其他库都可能遇到类似的问题(您提到了 glib2,这是另一个库的一个很好的例子,如果您尝试像这样使用它会完全窒息).

Of course, this only helps you out as far as Twisted goes. Any other libraries you use could run into similar trouble (you mentioned glib2, that's a great example of another library that will totally choke if you try to use it like this).

我强烈建议不要使用 multiprocessing 模块.相反,使用任何涉及 fork exec 的多进程方法,而不是单独使用 fork.安瓿属于这一类.

I highly recommend not using the multiprocessing module at all. Instead, use any multi-process approach that involves fork and exec, not fork alone. Ampoule falls into that category.

这篇关于扭曲与多处理事件和队列不兼容?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:扭曲与多处理事件和队列不兼容?