895 Stimmen

Wie man multiprocessing pool.map mit mehreren Argumenten verwendet

In der Python multiprocessing Bibliothek, gibt es eine Variante von pool.map die mehrere Argumente unterstützt?

import multiprocessing

text = "test"

def harvester(text, case):
    X = case[0]
    text + str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text, case), case, 1)
    pool.close()
    pool.join()

12 Stimmen

Zu meiner Überraschung konnte ich weder die partial noch lambda dies tun. Ich denke, es hat mit der seltsamen Art und Weise zu tun, wie Funktionen an die Unterprozesse übergeben werden (über pickle ).

15 Stimmen

@senderle: Dies ist ein Fehler in Python 2.6, der aber mit 2.7 behoben wurde: bugs.python.org/issue5228

3 Stimmen

Ersetzen Sie einfach pool.map(harvester(text,case),case, 1) von: pool.apply_async(harvester(text,case),case, 1)

9voto

M. Toya Punkte 585

Um nicht für jede neue Funktion einen Wrapper schreiben zu müssen, können Sie die beiden folgenden Funktionen verwenden:

import itertools
from multiprocessing import Pool

def universal_worker(input_pair):
    function, args = input_pair
    return function(*args)

def pool_args(function, *args):
    return zip(itertools.repeat(function), zip(*args))

Verwenden Sie die Funktion function mit den Listen der Argumente arg_0 , arg_1 y arg_2 wie folgt:

pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()

9voto

Alex Klibisz Punkte 1254

Eine andere einfache Alternative besteht darin, die Funktionsparameter in ein Tupel zu verpacken und dann die Parameter, die übergeben werden sollen, ebenfalls in Tupel zu verpacken. Dies ist vielleicht nicht ideal, wenn man mit großen Datenmengen zu tun hat. Ich glaube, es würde Kopien für jedes Tupel erstellen.

from multiprocessing import Pool

def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d

if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()

Gibt die Ausgabe in einer zufälligen Reihenfolge aus:

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]

0 Stimmen

In der Tat, ich suche immer noch nach einem besseren Weg :(

7voto

cdahms Punkte 2904

Hier ist eine weitere Möglichkeit, die IMHO einfacher und eleganter ist als alle anderen Antworten.

Dieses Programm hat eine Funktion, die zwei Parameter annimmt, sie ausgibt und auch die Summe ausgibt:

import multiprocessing

def main():

    with multiprocessing.Pool(10) as pool:
        params = [ (2, 2), (3, 3), (4, 4) ]
        pool.starmap(printSum, params)
    # end with

# end function

def printSum(num1, num2):
    mySum = num1 + num2
    print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function

if __name__ == '__main__':
    main()

Ausgabe ist:

num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8

Weitere Informationen finden Sie in den Python-Dokumenten:

https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool

Informieren Sie sich insbesondere über die starmap Funktion.

Ich benutze Python 3.6, ich bin nicht sicher, ob dies mit älteren Python-Versionen funktionieren wird

Ich weiß nicht, warum es in den Unterlagen kein einfaches Beispiel wie dieses gibt.

4voto

Tung Nguyen Punkte 1368

Ab Python 3.4.4 können Sie multiprocessing.get_context() verwenden, um ein Kontextobjekt für die Verwendung mehrerer Startmethoden zu erhalten:

import multiprocessing as mp

def foo(q, h, w):
    q.put(h + ' ' + w)
    print(h + ' ' + w)

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,'hello', 'world'))
    p.start()
    print(q.get())
    p.join()

Oder Sie ersetzen einfach nur

pool.map(harvester(text, case), case, 1)

mit:

pool.apply_async(harvester(text, case), case, 1)

3voto

roj4s Punkte 211

In der offiziellen Dokumentation heißt es, dass es nur ein iterables Argument unterstützt. Ich mag es, apply_async in solchen Fällen zu verwenden. In Ihrem Fall würde ich tun:

from multiprocessing import Process, Pool, Manager

text = "test"
def harvester(text, case, q = None):
 X = case[0]
 res = text+ str(X)
 if q:
  q.put(res)
 return res

def block_until(q, results_queue, until_counter=0):
 i = 0
 while i < until_counter:
  results_queue.put(q.get())
  i+=1

if __name__ == '__main__':
 pool = multiprocessing.Pool(processes=6)
 case = RAW_DATASET
 m = Manager()
 q = m.Queue()
 results_queue = m.Queue() # when it completes results will reside in this queue
 blocking_process = Process(block_until, (q, results_queue, len(case)))
 blocking_process.start()
 for c in case:
  try:
   res = pool.apply_async(harvester, (text, case, q = None))
   res.get(timeout=0.1)
  except:
   pass
 blocking_process.join()

0 Stimmen

Sie meinen c 代わりに case hier, richtig? res = pool.apply_async(harvester, (text, case, q = None))

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X