496 Stimmen

Wie kann ich eine einfache Python-Schleife parallelisieren?

Dies ist wahrscheinlich eine triviale Frage, aber wie kann ich die folgende Schleife in Python parallelisieren?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

Ich weiß, wie man einzelne Threads in Python startet, aber ich weiß nicht, wie man die Ergebnisse "sammelt".

Mehrere Prozesse wären auch in Ordnung - was immer in diesem Fall am einfachsten ist. Ich verwende derzeit Linux, aber der Code sollte auch auf Windows und Mac laufen.

Wie lässt sich dieser Code am einfachsten parallelisieren?

5voto

jackdoe Punkte 1816

Warum verwenden Sie nicht Threads und eine Mutex, um eine globale Liste zu schützen?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   

threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

denken Sie daran, dass Sie so schnell sind wie Ihr langsamster Faden

3voto

Amit Teli Punkte 845

Nehmen wir an, wir haben eine asynchrone Funktion

async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
    # Do some async procesing    

Das muss auf einem großen Array ausgeführt werden. Einige Attribute werden an das Programm übergeben und einige werden von der Eigenschaft des Wörterbuch-Elements im Array verwendet.

async def process_students(self, student_name: str, loop):
    market = sys.argv[2]
    subjects = [...] #Some large array
    batchsize = 5
    for i in range(0, len(subjects), batchsize):
        batch = subjects[i:i+batchsize]
        await asyncio.gather(*(self.work_async(student_name,
                                           sub['Code'],
                                           loop)
                       for sub in batch))

2voto

TEe Punkte 49

Dies könnte bei der Implementierung von Multiprocessing und parallelem/verteiltem Rechnen in Python nützlich sein.

YouTube-Tutorial zur Verwendung des Pakets techila

Techila ist eine Middleware für verteiltes Rechnen, die sich über das Paket techila direkt in Python integrieren lässt. Die Peach-Funktion in diesem Paket kann bei der Parallelisierung von Schleifenstrukturen nützlich sein. (Der folgende Codeschnipsel stammt aus der Techila Gemeinschaftsforen )

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )

1voto

MerreM Punkte 83

Schauen Sie sich das an;

http://docs.python.org/library/queue.html

Das ist vielleicht nicht der richtige Weg, aber ich würde es so machen;

Aktueller Code;

from multiprocessing import Process, JoinableQueue as Queue 

class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()

Ich hoffe, das hilft.

-6voto

Adil Warsi Punkte 485

Ein sehr einfaches Beispiel für Parallelverarbeitung ist

from multiprocessing import Process

output1 = list()
output2 = list()
output3 = list()

def yourfunction():
    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter=parameter)

        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)

if __name__ == '__main__':
    p = Process(target=pa.yourfunction, args=('bob',))
    p.start()
    p.join()

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X