496 Stimmen

Wie kann ich eine einfache Python-Schleife parallelisieren?

Dies ist wahrscheinlich eine triviale Frage, aber wie kann ich die folgende Schleife in Python parallelisieren?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

Ich weiß, wie man einzelne Threads in Python startet, aber ich weiß nicht, wie man die Ergebnisse "sammelt".

Mehrere Prozesse wären auch in Ordnung - was immer in diesem Fall am einfachsten ist. Ich verwende derzeit Linux, aber der Code sollte auch auf Windows und Mac laufen.

Wie lässt sich dieser Code am einfachsten parallelisieren?

28voto

Robert Nishihara Punkte 2816

Es gibt eine Reihe von Vorteilen bei der Verwendung von Ray :

  • Sie können nicht nur auf mehreren Kernen, sondern auch auf mehreren Rechnern parallelisieren (mit demselben Code).
  • Effizienter Umgang mit numerischen Daten durch gemeinsamen Speicher (und Null-Kopie-Serialisierung).
  • Hoher Aufgabendurchsatz mit verteilter Terminplanung.
  • Fehlertoleranz.

In Ihrem Fall könnten Sie Ray starten und eine Remote-Funktion definieren

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

und rufen es dann parallel dazu auf

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

Um dasselbe Beispiel auf einem Cluster laufen zu lassen, wäre die einzige Zeile, die sich ändern würde, der Aufruf von ray.init(). Die entsprechende Dokumentation finden Sie unter aquí .

Beachten Sie, dass ich bei der Entwicklung von Ray mitwirke.

10voto

miuxu Punkte 173

Ich fand joblib ist für mich sehr nützlich. Bitte sehen Sie das folgende Beispiel:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

n_jobs=-1: alle verfügbaren Kerne verwenden

10voto

itwasthekix Punkte 535

Dask futures; ich bin überrascht, dass es noch niemand erwähnt hat.

from dask.distributed import Client

client = Client(n_workers=8) # In this example I have 8 cores and processes (can also use threads if desired)

def my_function(i):
    output = <code to execute in the for loop here>
    return output

futures = []

for i in <whatever you want to loop across here>:
    future = client.submit(my_function, i)
    futures.append(future)

results = client.gather(futures)
client.close()

7voto

Felipe de Macêdo Punkte 194

Danke @iuryxavier

from multiprocessing import Pool
from multiprocessing import cpu_count

def add_1(x):
    return x + 1

if __name__ == "__main__":
    pool = Pool(cpu_count())
    results = pool.map(add_1, range(10**12))
    pool.close()  # 'TERM'
    pool.join()   # 'KILL'

6voto

w-m Punkte 9671

En gleichzeitig Umhüllungen durch den tqdm-Bibliothek sind eine gute Möglichkeit, länger laufenden Code zu parallelisieren. tqdm gibt über eine intelligente Fortschrittsanzeige Rückmeldung über den aktuellen Fortschritt und die verbleibende Zeit, was ich bei langen Berechnungen sehr nützlich finde.

Schleifen können so umgeschrieben werden, dass sie als gleichzeitige Threads durch einen einfachen Aufruf von thread_map oder als gleichzeitige Multiprozesse durch einen einfachen Aufruf von process_map :

from tqdm.contrib.concurrent import thread_map, process_map

def calc_stuff(num, multiplier):
    import time

    time.sleep(1)

    return num, num * multiplier

if __name__ == "__main__":

    # let's parallelize this for loop:
    # results = [calc_stuff(i, 2) for i in range(64)]

    loop_idx = range(64)
    multiplier = [2] * len(loop_idx)

    # either with threading:
    results_threading = thread_map(calc_stuff, loop_idx, multiplier)

    # or with multi-processing:
    results_processes = process_map(calc_stuff, loop_idx, multiplier)

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X