在Python多处理库中,是否有支持多个参数的pool.map变体?

import multiprocessing

text = "test"

def harvester(text, case):
    X = case[0]
    text + str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text, case), case, 1)
    pool.close()
    pool.join()

当前回答

对我来说,以下是一个简单明了的解决方案:

from multiprocessing.pool import ThreadPool
from functools import partial
from time import sleep
from random import randint

def dosomething(var,s):
    sleep(randint(1,5))
    print(var)
    return var + s

array = ["a", "b", "c", "d", "e"]
with ThreadPool(processes=5) as pool:
    resp_ = pool.map(partial(dosomething,s="2"), array)
    print(resp_)

输出:

a
b
d
e
c
['a2', 'b2', 'c2', 'd2', 'e2']

其他回答

有一个叫做pathos的多处理分支(注意:使用GitHub上的版本),它不需要starmap——map函数镜像Python map的API,因此map可以接受多个参数。

使用pathos,您通常也可以在解释器中执行多处理,而不是陷入__main__块。Pathos将在经过一些轻微的更新后发布——主要是转换为Python3.x。

  Python 2.7.5 (default, Sep 30 2013, 20:15:49)
  [GCC 4.2.1 (Apple Inc. build 5566)] on darwin
  Type "help", "copyright", "credits" or "license" for more information.
  >>> def func(a,b):
  ...     print a,b
  ...
  >>>
  >>> from pathos.multiprocessing import ProcessingPool
  >>> pool = ProcessingPool(nodes=4)
  >>> pool.map(func, [1,2,3], [1,1,1])
  1 1
  2 1
  3 1
  [None, None, None]
  >>>
  >>> # also can pickle stuff like lambdas
  >>> result = pool.map(lambda x: x**2, range(10))
  >>> result
  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  >>>
  >>> # also does asynchronous map
  >>> result = pool.amap(pow, [1,2,3], [4,5,6])
  >>> result.get()
  [1, 32, 729]
  >>>
  >>> # or can return a map iterator
  >>> result = pool.imap(pow, [1,2,3], [4,5,6])
  >>> result
  <processing.pool.IMapIterator object at 0x110c2ffd0>
  >>> list(result)
  [1, 32, 729]

pathos有几种方法可以让你得到星图的精确行为。

>>> def add(*x):
...   return sum(x)
...
>>> x = [[1,2,3],[4,5,6]]
>>> import pathos
>>> import numpy as np
>>> # use ProcessPool's map and transposing the inputs
>>> pp = pathos.pools.ProcessPool()
>>> pp.map(add, *np.array(x).T)
[6, 15]
>>> # use ProcessPool's map and a lambda to apply the star
>>> pp.map(lambda x: add(*x), x)
[6, 15]
>>> # use a _ProcessPool, which has starmap
>>> _pp = pathos.pools._ProcessPool()
>>> _pp.starmap(add, x)
[6, 15]
>>>

您可以使用以下两个函数,以避免为每个新函数编写包装器:

import itertools
from multiprocessing import Pool

def universal_worker(input_pair):
    function, args = input_pair
    return function(*args)

def pool_args(function, *args):
    return zip(itertools.repeat(function), zip(*args))

将函数函数与参数arg_0、arg_1和arg_2的列表一起使用,如下所示:

pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()

对于Python 2,可以使用此技巧

def fun(a, b):
    return a + b

pool = multiprocessing.Pool(processes=6)
b = 233
pool.map(lambda x:fun(x, b), range(1000))

另一种方法是将列表列表传递给单参数例程:

import os
from multiprocessing import Pool

def task(args):
    print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]

pool = Pool()

pool.map(task, [
        [1,2],
        [3,4],
        [5,6],
        [7,8]
    ])

然后可以用自己喜欢的方法构造一个参数列表。

更好的方法是使用修饰符,而不是手工编写包装函数。特别是当您有很多函数要映射时,装饰器将通过避免为每个函数编写包装器来节省时间。通常,修饰函数是不可选择的,但是我们可以使用functools来解决它。更多讨论可以在这里找到。

以下是示例:

def unpack_args(func):
    from functools import wraps
    @wraps(func)
    def wrapper(args):
        if isinstance(args, dict):
            return func(**args)
        else:
            return func(*args)
    return wrapper

@unpack_args
def func(x, y):
    return x + y

然后你可以用压缩的参数来映射它:

np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()

当然,您可能总是在Python3中使用Pool.starmap(>=3.3),正如其他答案中提到的那样。