在Python多处理库中,是否有支持多个参数的pool.map变体?
import multiprocessing
text = "test"
def harvester(text, case):
X = case[0]
text + str(X)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
pool.map(harvester(text, case), case, 1)
pool.close()
pool.join()
对我来说,以下是一个简单明了的解决方案:
from multiprocessing.pool import ThreadPool
from functools import partial
from time import sleep
from random import randint
def dosomething(var,s):
sleep(randint(1,5))
print(var)
return var + s
array = ["a", "b", "c", "d", "e"]
with ThreadPool(processes=5) as pool:
resp_ = pool.map(partial(dosomething,s="2"), array)
print(resp_)
输出:
a
b
d
e
c
['a2', 'b2', 'c2', 'd2', 'e2']
在官方文档中,它只支持一个可迭代的参数。在这种情况下,我喜欢使用apply_async。如果是你,我会:
from multiprocessing import Process, Pool, Manager
text = "test"
def harvester(text, case, q = None):
X = case[0]
res = text+ str(X)
if q:
q.put(res)
return res
def block_until(q, results_queue, until_counter=0):
i = 0
while i < until_counter:
results_queue.put(q.get())
i+=1
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
m = Manager()
q = m.Queue()
results_queue = m.Queue() # when it completes results will reside in this queue
blocking_process = Process(block_until, (q, results_queue, len(case)))
blocking_process.start()
for c in case:
try:
res = pool.apply_async(harvester, (text, case, q = None))
res.get(timeout=0.1)
except:
pass
blocking_process.join()
另一种方法是将列表列表传递给单参数例程:
import os
from multiprocessing import Pool
def task(args):
print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]
pool = Pool()
pool.map(task, [
[1,2],
[3,4],
[5,6],
[7,8]
])
然后可以用自己喜欢的方法构造一个参数列表。
从Python 3.4.4中,您可以使用multiprocessing.get_context()获取上下文对象,以使用多个启动方法:
import multiprocessing as mp
def foo(q, h, w):
q.put(h + ' ' + w)
print(h + ' ' + w)
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,'hello', 'world'))
p.start()
print(q.get())
p.join()
或者你只是简单地替换
pool.map(harvester(text, case), case, 1)
具有:
pool.apply_async(harvester(text, case), case, 1)
更好的方法是使用修饰符,而不是手工编写包装函数。特别是当您有很多函数要映射时,装饰器将通过避免为每个函数编写包装器来节省时间。通常,修饰函数是不可选择的,但是我们可以使用functools来解决它。更多讨论可以在这里找到。
以下是示例:
def unpack_args(func):
from functools import wraps
@wraps(func)
def wrapper(args):
if isinstance(args, dict):
return func(**args)
else:
return func(*args)
return wrapper
@unpack_args
def func(x, y):
return x + y
然后你可以用压缩的参数来映射它:
np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()
当然,您可能总是在Python3中使用Pool.starmap(>=3.3),正如其他答案中提到的那样。
您可以使用以下两个函数,以避免为每个新函数编写包装器:
import itertools
from multiprocessing import Pool
def universal_worker(input_pair):
function, args = input_pair
return function(*args)
def pool_args(function, *args):
return zip(itertools.repeat(function), zip(*args))
将函数函数与参数arg_0、arg_1和arg_2的列表一起使用,如下所示:
pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()