How to utilize all cores with python multiprocessing(如何通过 python 多处理利用所有内核)
问题描述
我已经摆弄 Python 的 multiprocessing
功能一个多小时了,尝试使用 multiprocessing.Process
和 multiprocessing 并行化一个相当复杂的图形遍历函数.经理
:
I have been fiddling with Python's multiprocessing
functionality for upwards of an hour now, trying to parallelize a rather complex graph traversal function using multiprocessing.Process
and multiprocessing.Manager
:
import networkx as nx
import csv
import time
from operator import itemgetter
import os
import multiprocessing as mp
cutoff = 1
exclusionlist = ["cpd:C00024"]
DG = nx.read_gml("KeggComplete.gml", relabel=True)
for exclusion in exclusionlist:
DG.remove_node(exclusion)
# checks if 'memorizedPaths exists, and if not, creates it
fn = os.path.join(os.path.dirname(__file__),
'memorizedPaths' + str(cutoff+1))
if not os.path.exists(fn):
os.makedirs(fn)
manager = mp.Manager()
memorizedPaths = manager.dict()
filepaths = manager.dict()
degreelist = sorted(DG.degree_iter(),
key=itemgetter(1),
reverse=True)
def _all_simple_paths_graph(item, DG, cutoff, memorizedPaths, filepaths):
source = item[0]
uniqueTreePaths = []
if cutoff < 1:
return
visited = [source]
stack = [iter(DG[source])]
while stack:
children = stack[-1]
child = next(children, None)
if child is None:
stack.pop()
visited.pop()
elif child in memorizedPaths:
for path in memorizedPaths[child]:
newPath = (tuple(visited) + tuple(path))
if (len(newPath) <= cutoff) and
(len(set(visited) & set(path)) == 0):
uniqueTreePaths.append(newPath)
continue
elif len(visited) < cutoff:
if child not in visited:
visited.append(child)
stack.append(iter(DG[child]))
if visited not in uniqueTreePaths:
uniqueTreePaths.append(tuple(visited))
else: # len(visited) == cutoff:
if (visited not in uniqueTreePaths) and
(child not in visited):
uniqueTreePaths.append(tuple(visited + [child]))
stack.pop()
visited.pop()
# writes the absolute path of the node path file into the hash table
filepaths[source] = str(fn) + "/" + str(source) + "path.txt"
with open (filepaths[source], "wb") as csvfile2:
writer = csv.writer(csvfile2, delimiter=" ", quotechar="|")
for path in uniqueTreePaths:
writer.writerow(path)
memorizedPaths[source] = uniqueTreePaths
############################################################################
if __name__ == '__main__':
start = time.clock()
for item in degreelist:
test = mp.Process(target=_all_simple_paths_graph,
args=(DG, cutoff, item, memorizedPaths, filepaths))
test.start()
test.join()
end = time.clock()
print (end-start)
目前 - 尽管运气和魔法 - 它有效(有点).我的问题是我只使用了 24 个内核中的 12 个.
Currently - though luck and magic - it works (sort of). My problem is I'm only using 12 of my 24 cores.
有人可以解释为什么会这样吗?也许我的代码不是最好的多处理解决方案,或者它是我架构的一个特性Intel Xeon CPU E5-2640 @ 2.50GHz x18 在 Ubuntu 13.04 x64 上运行?
Can someone explain why this might be the case? Perhaps my code isn't the best multiprocessing solution, or is it a feature of my architecture Intel Xeon CPU E5-2640 @ 2.50GHz x18 running on Ubuntu 13.04 x64?
我设法得到:
p = mp.Pool()
for item in degreelist:
p.apply_async(_all_simple_paths_graph,
args=(DG, cutoff, item, memorizedPaths, filepaths))
p.close()
p.join()
工作,但是,它非常慢!所以我假设我在工作中使用了错误的功能.希望它有助于澄清我想要完成的事情!
Working, however, it's VERY SLOW! So I assume I'm using the wrong function for the job. hopefully it helps clarify exactly what I'm trying to accomplish!
.map
尝试:
partialfunc = partial(_all_simple_paths_graph,
DG=DG,
cutoff=cutoff,
memorizedPaths=memorizedPaths,
filepaths=filepaths)
p = mp.Pool()
for item in processList:
processVar = p.map(partialfunc, xrange(len(processList)))
p.close()
p.join()
工作,比单核慢.是时候优化了!
Works, is slower than singlecore. Time to optimize!
推荐答案
这里堆积太多,无法在注释中解决,所以,mp
是 multiprocessing
的地方:
Too much piling up here to address in comments, so, where mp
is multiprocessing
:
mp.cpu_count()
应该返回处理器的数量.但是测试一下.有些平台很时髦,而且这些信息并不总是很容易获得.Python 尽其所能.
mp.cpu_count()
should return the number of processors. But test it. Some platforms are funky, and this info isn't always easy to get. Python does the best it can.
如果您启动 24 个进程,它们将完全按照您的指示执行 ;-) 看起来 mp.Pool()
对您来说最方便.您将要创建的进程数传递给其构造函数.mp.Pool(processes=None)
将使用 mp.cpu_count()
作为处理器数量.
If you start 24 processes, they'll do exactly what you tell them to do ;-) Looks like mp.Pool()
would be most convenient for you. You pass the number of processes you want to create to its constructor. mp.Pool(processes=None)
will use mp.cpu_count()
for the number of processors.
然后,您可以在您的 Pool
实例上使用例如 .imap_unordered(...)
来跨进程传播您的 degreelist
.或者,也许其他一些 Pool
方法更适合您 - 实验.
Then you can use, for example, .imap_unordered(...)
on your Pool
instance to spread your degreelist
across processes. Or maybe some other Pool
method would work better for you - experiment.
如果你不能将问题放到 Pool
的世界观中,你可以改为创建一个 mp.Queue
来创建一个工作队列,.put()
'ing 节点(或节点切片,以减少开销)在主程序中工作,并将工作人员写入 .get()
工作项队列.询问您是否需要示例.请注意,您需要在所有真实"工作项之后将标记值(每个进程一个)放在队列中,以便工作进程可以测试标记以了解它们何时完成.
If you can't bash the problem into Pool
's view of the world, you could instead create an mp.Queue
to create a work queue, .put()
'ing nodes (or slices of nodes, to reduce overhead) to work on in the main program, and write the workers to .get()
work items off that queue. Ask if you need examples. Note that you need to put sentinel values (one per process) on the queue, after all the "real" work items, so that worker processes can test for the sentinel to know when they're done.
仅供参考,我喜欢队列,因为它们更明确.许多其他人更喜欢 Pool
,因为它们更神奇 ;-)
FYI, I like queues because they're more explicit. Many others like Pool
s better because they're more magical ;-)
这是一个可执行的原型.这显示了一种将 imap_unordered
与 Pool
和 chunksize
一起使用的方法,不需要更改任何函数签名.当然,您必须插入您的真实代码;-) 请注意,init_worker
方法允许每个处理器仅传递大部分"参数一次,而不是为 中的每个项目传递一次度列表
.减少进程间通信量对于提高速度至关重要.
Here's an executable prototype for you. This shows one way to use imap_unordered
with Pool
and chunksize
that doesn't require changing any function signatures. Of course you'll have to plug in your real code ;-) Note that the init_worker
approach allows passing "most of" the arguments only once per processor, not once for every item in your degreeslist
. Cutting the amount of inter-process communication can be crucial for speed.
import multiprocessing as mp
def init_worker(mps, fps, cut):
global memorizedPaths, filepaths, cutoff
global DG
print "process initializing", mp.current_process()
memorizedPaths, filepaths, cutoff = mps, fps, cut
DG = 1##nx.read_gml("KeggComplete.gml", relabel = True)
def work(item):
_all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths)
def _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths):
pass # print "doing " + str(item)
if __name__ == "__main__":
m = mp.Manager()
memorizedPaths = m.dict()
filepaths = m.dict()
cutoff = 1 ##
# use all available CPUs
p = mp.Pool(initializer=init_worker, initargs=(memorizedPaths,
filepaths,
cutoff))
degreelist = range(100000) ##
for _ in p.imap_unordered(work, degreelist, chunksize=500):
pass
p.close()
p.join()
我强烈建议完全按原样运行它,这样您就可以看到它的速度非常快.然后稍微添加一些东西,看看它是如何影响时间的.例如,只需添加
I strongly advise running this exactly as-is, so you can see that it's blazing fast. Then add things to it a bit a time, to see how that affects the time. For example, just adding
memorizedPaths[item] = item
to _all_simple_paths_graph()
大大减慢了它的速度.为什么?因为字典随着每次添加而变得越来越大,并且这个进程安全的字典必须在所有进程之间同步(在幕后).同步的单位是整个 dict"——mp 机器无法利用内部结构对共享 dict 进行增量更新.
to _all_simple_paths_graph()
slows it down enormously. Why? Because the dict gets bigger and bigger with each addition, and this process-safe dict has to be synchronized (under the covers) among all the processes. The unit of synchronization is "the entire dict" - there's no internal structure the mp machinery can exploit to do incremental updates to the shared dict.
如果您负担不起这笔费用,那么您不能为此使用 Manager.dict()
.聪明的机会比比皆是;-)
If you can't afford this expense, then you can't use a Manager.dict()
for this. Opportunities for cleverness abound ;-)
这篇关于如何通过 python 多处理利用所有内核的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:如何通过 python 多处理利用所有内核
- CTR 中的 AES 如何用于 Python 和 PyCrypto? 2022-01-01
- 计算测试数量的Python单元测试 2022-01-01
- 如何使用PYSPARK从Spark获得批次行 2022-01-01
- YouTube API v3 返回截断的观看记录 2022-01-01
- 使用 Cython 将 Python 链接到共享库 2022-01-01
- 我如何透明地重定向一个Python导入? 2022-01-01
- 检查具有纬度和经度的地理点是否在 shapefile 中 2022-01-01
- 使用公司代理使Python3.x Slack(松弛客户端) 2022-01-01
- 我如何卸载 PyTorch? 2022-01-01
- ";find_element_by_name(';name';)";和&QOOT;FIND_ELEMENT(BY NAME,';NAME';)";之间有什么区别? 2022-01-01