在Python多处理库中,是否有支持多个参数的pool.map变体?

import multiprocessing

text = "test"

def harvester(text, case):
    X = case[0]
    text + str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text, case), case, 1)
    pool.close()
    pool.join()

当前回答

import time
from multiprocessing import Pool


def f1(args):
    vfirst, vsecond, vthird = args[0] , args[1] , args[2]
    print(f'First Param: {vfirst}, Second value: {vsecond} and finally third value is: {vthird}')
    pass


if __name__ == '__main__':
    p = Pool()
    result = p.map(f1, [['Dog','Cat','Mouse']])
    p.close()
    p.join()
    print(result)

其他回答

import time
from multiprocessing import Pool


def f1(args):
    vfirst, vsecond, vthird = args[0] , args[1] , args[2]
    print(f'First Param: {vfirst}, Second value: {vsecond} and finally third value is: {vthird}')
    pass


if __name__ == '__main__':
    p = Pool()
    result = p.map(f1, [['Dog','Cat','Mouse']])
    p.close()
    p.join()
    print(result)

另一种方法是将列表列表传递给单参数例程:

import os
from multiprocessing import Pool

def task(args):
    print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]

pool = Pool()

pool.map(task, [
        [1,2],
        [3,4],
        [5,6],
        [7,8]
    ])

然后可以用自己喜欢的方法构造一个参数列表。

从Python 3.4.4中,您可以使用multiprocessing.get_context()获取上下文对象,以使用多个启动方法:

import multiprocessing as mp

def foo(q, h, w):
    q.put(h + ' ' + w)
    print(h + ' ' + w)

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,'hello', 'world'))
    p.start()
    print(q.get())
    p.join()

或者你只是简单地替换

pool.map(harvester(text, case), case, 1)

具有:

pool.apply_async(harvester(text, case), case, 1)

将所有参数存储为元组数组。

该示例表示,通常调用函数为:

def mainImage(fragCoord: vec2, iResolution: vec3, iTime: float) -> vec3:

而是传递一个元组并解压缩参数:

def mainImage(package_iter) -> vec3:
    fragCoord = package_iter[0]
    iResolution = package_iter[1]
    iTime = package_iter[2]

预先使用循环构建元组:

package_iter = []
iResolution = vec3(nx, ny, 1)
for j in range((ny-1), -1, -1):
    for i in range(0, nx, 1):
        fragCoord: vec2 = vec2(i, j)
        time_elapsed_seconds = 10
        package_iter.append((fragCoord, iResolution, time_elapsed_seconds))

然后通过传递元组数组来执行所有using map:

array_rgb_values = []

with concurrent.futures.ProcessPoolExecutor() as executor:
    for val in executor.map(mainImage, package_iter):
        fragColor = val
        ir = clip(int(255* fragColor.r), 0, 255)
        ig = clip(int(255* fragColor.g), 0, 255)
        ib = clip(int(255* fragColor.b), 0, 255)

        array_rgb_values.append((ir, ig, ib))

我知道Python有*和**用于开箱,但我还没有尝试过。

使用高级库并发期货也比使用低级多处理库更好。

在J.F.Sebastian的回答中了解了itertools之后,我决定更进一步,编写一个关注并行化的parmap包,在Python 2.7和Python 3.2(以及更高版本)中提供可以接受任意数量位置参数的map和starmap函数。

安装

pip install parmap

如何并行化:

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)

# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)

# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

我已经将parmap上传到PyPI和GitHub存储库。

例如,问题的答案如下:

import parmap

def harvester(case, text):
    X = case[0]
    text+ str(X)

if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)