496 Stimmen

Wie kann ich eine einfache Python-Schleife parallelisieren?

Dies ist wahrscheinlich eine triviale Frage, aber wie kann ich die folgende Schleife in Python parallelisieren?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

Ich weiß, wie man einzelne Threads in Python startet, aber ich weiß nicht, wie man die Ergebnisse "sammelt".

Mehrere Prozesse wären auch in Ordnung - was immer in diesem Fall am einfachsten ist. Ich verwende derzeit Linux, aber der Code sollte auch auf Windows und Mac laufen.

Wie lässt sich dieser Code am einfachsten parallelisieren?

301voto

Sven Marnach Punkte 525472

Die Verwendung mehrerer Threads unter CPython wird Ihnen aufgrund der globalen Interpretersperre (GIL) keine bessere Leistung für reinen Python-Code bringen. Ich empfehle die Verwendung der multiprocessing Modul stattdessen:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

Beachten Sie, dass dies im interaktiven Interpreter nicht funktioniert.

Um den üblichen FUD rund um die GIL zu vermeiden: Die Verwendung von Threads hätte für dieses Beispiel ohnehin keinen Vorteil. Sie wollen hier Prozesse und nicht Threads zu verwenden, weil sie eine ganze Reihe von Problemen vermeiden.

189voto

tyrex Punkte 7105
from joblib import Parallel, delayed
def process(i):
    return i * i

results = Parallel(n_jobs=2)(delayed(process)(i) for i in range(10))
print(results)  # prints [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Auf meinem Rechner (Ubuntu, Paket joblib war vorinstalliert, kann aber auch über pip install joblib ).

Entnommen aus https://blog.dominodatalab.com/simple-parallelization/


Bearbeiten am 31. März 2021: Auf joblib , multiprocessing , threading y asyncio

  • joblib im obigen Code verwendet import multiprocessing unter der Haube (und damit mehrere Prozesse, was in der Regel der beste Weg ist, um CPU-Arbeit über mehrere Kerne hinweg auszuführen - wegen der GIL)
  • Sie können die joblib mehrere Threads anstelle von mehreren Prozessen verwenden, aber dies (oder die Verwendung von import threading direkt) ist nur dann von Vorteil, wenn die Threads viel Zeit mit E/A verbringen (z. B. Lesen/Schreiben auf der Festplatte, Senden einer HTTP-Anfrage). Bei E/A-Arbeiten blockiert die GIL nicht die Ausführung eines anderen Threads
  • Seit Python 3.7 kann als Alternative zu threading können Sie die Arbeit parallelisieren mit asyncio aber es gelten die gleichen Ratschläge wie für import threading (im Gegensatz zu letzterem wird jedoch nur 1 Faden verwendet; das ist ein Vorteil, asyncio hat eine Menge netter Funktionen, die für die asynchrone Programmierung hilfreich sind)
  • Die Verwendung mehrerer Prozesse verursacht Overhead. Denken Sie darüber nach: Normalerweise muss jeder Prozess alles initialisieren/laden, was Sie für die Ausführung Ihrer Berechnung benötigen. Prüfen Sie selbst, ob der obige Codeschnipsel Ihre Rechenzeit verbessert. Hier ist ein weiteres Beispiel, bei dem ich bestätigt habe, dass joblib führt zu besseren Ergebnissen:

    import time from joblib import Parallel, delayed

    def countdown(n): while n>0: n -= 1 return n

    t = time.time() for _ in range(20): print(countdown(10**7), end=" ") print(time.time() - t)

    takes ~10.5 seconds on medium sized Macbook Pro

    t = time.time() results = Parallel(njobs=2)(delayed(countdown)(10**7) for in range(20)) print(results) print(time.time() - t)

    takes ~6.3 seconds on medium sized Macbook Pro

128voto

Hamza Punkte 4236

Das ist die einfachste Art, es zu tun!

Sie können verwenden asyncio . (Die Dokumentation ist zu finden unter aquí ). Es wird als Grundlage für mehrere asynchrone Python-Frameworks verwendet, die leistungsstarke Netzwerk- und Webserver, Bibliotheken für Datenbankverbindungen, verteilte Aufgabenwarteschlangen usw. bereitstellen. Außerdem verfügt es sowohl über High-Level- als auch Low-Level-APIs, um jede Art von Problem zu bewältigen.

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

Jetzt wird diese Funktion bei jedem Aufruf parallel ausgeführt, ohne dass das Hauptprogramm in den Wartezustand versetzt wird. Sie können sie auch zur Parallelisierung einer for-Schleife verwenden. Wenn sie für eine for-Schleife aufgerufen wird, ist die Schleife zwar sequentiell, aber jede Iteration läuft parallel zum Hauptprogramm, sobald der Interpreter dort ankommt.

1. Schleife parallel zum Hauptthread ohne Wartezeit abfeuern

enter image description here

@background
def your_function(argument):
    time.sleep(5)
    print('function finished for '+str(argument))

for i in range(10):
    your_function(i)

print('loop finished')

Dies führt zu folgender Ausgabe:

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1

Aktualisierung: Mai 2022

Obwohl dies die ursprüngliche Frage beantwortet, gibt es Möglichkeiten, wo wir warten können, bis Schleifen zu beenden, wie von upvoted Kommentare angefordert. Daher fügen wir sie hier ebenfalls hinzu. Die Schlüssel zu den Implementierungen sind: asyncio.gather() & run_until_complete() . Betrachten Sie die folgenden Funktionen:

import asyncio
import time

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument, other_argument): # Added another argument
    time.sleep(5)
    print(f"function finished for {argument=} and {other_argument=}")

def code_to_run_before():
    print('This runs Before Loop!')

def code_to_run_after():
    print('This runs After Loop!')

2. Parallel laufen, aber auf das Ende warten

enter image description here

code_to_run_before()                                                         # Anything you want to run before, run here!

loop = asyncio.get_event_loop()                                              # Have a new event loop

looper = asyncio.gather(*[your_function(i, 1) for i in range(1, 5)])         # Run the loop

results = loop.run_until_complete(looper)                                    # Wait until finish

code_to_run_after()                                                          # Anything you want to run after, run here!

Dies führt zu folgender Ausgabe:

This runs Before Loop!
function finished for argument=2 and other_argument=1
function finished for argument=3 and other_argument=1
function finished for argument=1 and other_argument=1
function finished for argument=4 and other_argument=1
This runs After Loop!

3. Mehrere Schleifen parallel laufen lassen und auf das Ende warten

enter image description here

code_to_run_before()                                                         # Anything you want to run before, run here!   

loop = asyncio.get_event_loop()                                              # Have a new event loop

group1 = asyncio.gather(*[your_function(i, 1) for i in range(1, 2)])         # Run all the loops you want
group2 = asyncio.gather(*[your_function(i, 2) for i in range(3, 5)])         # Run all the loops you want
group3 = asyncio.gather(*[your_function(i, 3) for i in range(6, 9)])         # Run all the loops you want

all_groups = asyncio.gather(group1, group2, group3)                          # Gather them all                                    
results = loop.run_until_complete(all_groups)                                # Wait until finish

code_to_run_after()                                                          # Anything you want to run after, run here!

Dies führt zu folgender Ausgabe:

This runs Before Loop!
function finished for argument=3 and other_argument=2
function finished for argument=1 and other_argument=1
function finished for argument=6 and other_argument=3
function finished for argument=4 and other_argument=2
function finished for argument=7 and other_argument=3
function finished for argument=8 and other_argument=3
This runs After Loop!

4. Sequentiell ablaufende Schleifen, wobei die Iterationen der einzelnen Schleifen parallel zueinander ablaufen

enter image description here

code_to_run_before()                                                               # Anything you want to run before, run here!

for loop_number in range(3):

    loop = asyncio.get_event_loop()                                                # Have a new event loop

    looper = asyncio.gather(*[your_function(i, loop_number) for i in range(1, 5)]) # Run the loop

    results = loop.run_until_complete(looper)                                      # Wait until finish

    print(f"finished for {loop_number=}")       

code_to_run_after()                                                                # Anything you want to run after, run here!

Dies führt zu folgender Ausgabe:

This runs Before Loop!
function finished for argument=3 and other_argument=0
function finished for argument=4 and other_argument=0
function finished for argument=1 and other_argument=0
function finished for argument=2 and other_argument=0
finished for loop_number=0
function finished for argument=4 and other_argument=1
function finished for argument=3 and other_argument=1
function finished for argument=2 and other_argument=1
function finished for argument=1 and other_argument=1
finished for loop_number=1
function finished for argument=1 and other_argument=2
function finished for argument=4 and other_argument=2
function finished for argument=3 and other_argument=2
function finished for argument=2 and other_argument=2
finished for loop_number=2
This runs After Loop!

Aktualisierung: Juni 2022

In seiner jetzigen Form läuft es möglicherweise nicht auf einigen Versionen von jupyter notebook. Der Grund dafür ist, dass jupyter notebook eine Ereignisschleife verwendet. Damit es auf solchen Jupyter-Versionen funktioniert, nest_asyncio (die die Ereignisschleife verschachteln würde, wie aus dem Namen ersichtlich) ist der richtige Weg. Importieren Sie einfach und wenden Sie es am oberen Rand der Zelle als an:

import nest_asyncio
nest_asyncio.apply()

Und alle oben genannten Funktionen sollten auch in einer Notebook-Umgebung verfügbar sein.

106voto

Gael Varoquaux Punkte 2346

Zur Parallelisierung einer einfachen for-Schleife, Joblib bringt der rohen Nutzung von Multiprocessing einen großen Nutzen. Nicht nur die kurze Syntax, sondern auch Dinge wie die transparente Bündelung von Iterationen, wenn sie sehr schnell sind (um den Overhead zu entfernen) oder die Erfassung des Tracebacks des Kindprozesses, um eine bessere Fehlerberichterstattung zu haben.

Haftungsausschluss: Ich bin der ursprüngliche Autor von joblib.

78voto

Wie lässt sich dieser Code am einfachsten parallelisieren?

Verwenden Sie einen PoolExecutor von concurrent.futures . Vergleichen Sie den ursprünglichen Code mit diesem, Seite an Seite. Erstens ist die prägnanteste Art, dies zu tun, mit executor.map :

...
with ProcessPoolExecutor() as executor:
    for out1, out2, out3 in executor.map(calc_stuff, parameters):
        ...

oder aufgeschlüsselt, indem jede Aufforderung einzeln eingereicht wird:

...
with ThreadPoolExecutor() as executor:
    futures = []
    for parameter in parameters:
        futures.append(executor.submit(calc_stuff, parameter))

    for future in futures:
        out1, out2, out3 = future.result() # this will block
        ...

Das Verlassen des Kontexts signalisiert dem Executor, dass er Ressourcen freigeben soll

Sie können Threads oder Prozesse verwenden und genau dieselbe Schnittstelle nutzen.

Ein praktisches Beispiel

Hier finden Sie einen funktionierenden Beispielcode, der den Wert von :

Geben Sie dies in eine Datei ein - futuretest.py:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import time
from http.client import HTTPSConnection

def processor_intensive(arg):
    def fib(n): # recursive, processor intensive calculation (avoid n > 36)
        return fib(n-1) + fib(n-2) if n > 1 else n
    start = time()
    result = fib(arg)
    return time() - start, result

def io_bound(arg):
    start = time()
    con = HTTPSConnection(arg)
    con.request('GET', '/')
    result = con.getresponse().getcode()
    return time() - start, result

def manager(PoolExecutor, calc_stuff):
    if calc_stuff is io_bound:
        inputs = ('python.org', 'stackoverflow.com', 'stackexchange.com',
                  'noaa.gov', 'parler.com', 'aaronhall.dev')
    else:
        inputs = range(25, 32)
    timings, results = list(), list()
    start = time()
    with PoolExecutor() as executor:
        for timing, result in executor.map(calc_stuff, inputs):
            # put results into correct output list:
            timings.append(timing), results.append(result)
    finish = time()
    print(f'{calc_stuff.__name__}, {PoolExecutor.__name__}')
    print(f'wall time to execute: {finish-start}')
    print(f'total of timings for each call: {sum(timings)}')
    print(f'time saved by parallelizing: {sum(timings) - (finish-start)}')
    print(dict(zip(inputs, results)), end = '\n\n')

def main():
    for computation in (processor_intensive, io_bound):
        for pool_executor in (ProcessPoolExecutor, ThreadPoolExecutor):
            manager(pool_executor, calc_stuff=computation)

if __name__ == '__main__':
    main()

Und hier ist die Ausgabe für einen Lauf von python -m futuretest :

processor_intensive, ProcessPoolExecutor
wall time to execute: 0.7326343059539795
total of timings for each call: 1.8033506870269775
time saved by parallelizing: 1.070716381072998
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}

processor_intensive, ThreadPoolExecutor
wall time to execute: 1.190223217010498
total of timings for each call: 3.3561410903930664
time saved by parallelizing: 2.1659178733825684
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}

io_bound, ProcessPoolExecutor
wall time to execute: 0.533886194229126
total of timings for each call: 1.2977914810180664
time saved by parallelizing: 0.7639052867889404
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}

io_bound, ThreadPoolExecutor
wall time to execute: 0.38941240310668945
total of timings for each call: 1.6049387454986572
time saved by parallelizing: 1.2155263423919678
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}

Prozessorintensive Analyse

Wenn Sie in Python prozessorintensive Berechnungen durchführen, sollten Sie die ProcessPoolExecutor leistungsfähiger zu sein als die ThreadPoolExecutor .

Aufgrund der Global Interpreter Lock (auch bekannt als GIL) können Threads nicht mehrere Prozessoren verwenden, so dass die Zeit für die einzelnen Berechnungen und die Wall-Time (verstrichene Echtzeit) größer sein werden.

IO-gebundene Analyse

Andererseits ist bei der Durchführung von IO-gebundenen Operationen zu erwarten, dass ThreadPoolExecutor leistungsfähiger zu sein als ProcessPoolExecutor .

Die Threads von Python sind echte Betriebssystem-Threads. Sie können vom Betriebssystem in den Schlaf versetzt und wieder aufgeweckt werden, wenn ihre Informationen eintreffen.

Abschließende Überlegungen

Ich vermute, dass Multiprocessing unter Windows langsamer sein wird, da Windows kein Forking unterstützt, so dass jeder neue Prozess Zeit braucht, um zu starten.

Sie können mehrere Threads innerhalb mehrerer Prozesse verschachteln, aber es wird empfohlen, nicht mehrere Threads zu verwenden, um mehrere Prozesse abzuspalten.

Wenn man in Python mit einem schweren Verarbeitungsproblem konfrontiert wird, kann man trivialerweise mit zusätzlichen Prozessen skalieren - aber nicht so sehr mit Threading.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X