4 Stimmen

Ist es möglich, Multiprocessing innerhalb einer Schleife in Python 3.2 zu tun?

Ich versuche, Python (3.2) auf Multiprozess (Ubuntu) zu verwenden, um ein massives Suchproblem zu lösen. Im Grunde möchte ich eine Liste nehmen, das erste Element herausnehmen, alle anderen Elemente finden, die die gleichen Eigenschaften wie das Objekt haben, die gefundenen Elemente und das Zielelement in einer Liste zusammenführen, sie aus der ursprünglichen Liste entfernen und (Schleife) alles wiederholen. Das Multiprocessing dient dazu, die Arbeit auf die Prozessoren zu verteilen. Der Code wird ohne Probleme einmal ausgeführt. Er wird sogar in einer Schleife ausgeführt, da die Ausnahme ignoriert wird, und scheint seine Arbeit gut zu machen. Aber innerhalb von 30 Sekunden hat er fast meine gesamten 16 GB RAM verbraucht.

Meine zwei Bedenken bisher sind 1) Ich bekomme "Exception AssertionError: AssertionError('can only test a child process',) in ignored", sobald ich eine Schleife durchlaufe (und ich bekomme eine Menge davon). Zusammen mit diesem ist massive Menge an RAM-Nutzung (die, ich denke, könnte im Zusammenhang stehen, nicht sicher). UND 2) Es scheint nicht einmal die Suche parallel auszuführen, wenn ich einen größeren Datensatz verwende.

Mein Code sieht wie folgt aus:

class triangleListWorker(multiprocessing.Process):
    def __init__(self, work_queue, target, results,start):
        super().__init__()
        self.work_queue = work_queue
        self.results = results
        self.target = target
        self.startIndex = start
    def run(self):
        while True:
            try:
                searching = self.work_queue.get()
                self.do_search(searching)

            finally:
                self.work_queue.task_done()

    def do_search(self,searching):
        for x in range(len(searching)):
            if self.target.same_plane(searching[x]):
                self.results.append(self.startIndex+x)

Was ich hier versuche, ist Manager().list() zu verwenden, um alle Indizes zu speichern, bei denen das Zielobjekt und das gesuchte Objekt auf derselben Ebene liegen.

    def do_multi_find_connections(self, target,searchList):
        work_queue = multiprocessing.JoinableQueue()
        #results= multiprocessing.Queue()

        cpu_count = multiprocessing.cpu_count()
        results = multiprocessing.Manager().list()
        range_per_process = len(searchList) // cpu_count
        start,end = 0, range_per_process + (len(searchList) % cpu_count)
        for i in range(cpu_count):
            worker = triangleListWorker(work_queue,target,results,start)
            worker.daemon = True
            worker.start()
        for x in range(cpu_count):
            searchsub = [searchList[x] for x in range(start,end)]
            work_queue.put(searchList[start:end])
            #work_queue.put(searchList[start:end])
            start,end = end, end + range_per_process
            print(start,end)

        work_queue.join()
        print( "can continue...")

        return results

    def find_connections(self, triangle_list,doMultiProcessing):
        tlist = [x for x in triangle_list]
        print("len tlist", len(tlist))
        results = []
        self.byPlane = []
        if doMultiProcessing:
            while len(tlist) > 0:
                results = []
                target = tlist[0]
                #print("target",tcopy[0])
                self.do_multi_find_connections(target,tlist)

                results = self.do_multi_find_connections(target,tlist)#list of indexes
                plane = []

                print(len(results))
                print(results)
                for x in results:
                    plane.append(tlist[x])
                new_tlist = [tlist[x] for x in range(len(tlist)) if not x in results]
                print(len(new_tlist))
                tlist = new_tlist

                self.byPlane.append(plane)

##                self.byPlane.append(plane)
##                tlist = []

Dieser Code (vielleicht ein wenig hässlich) soll Schleife, um die nächste Ebene zu finden, und erschöpfen alles andere, die in der Ebene durch den Aufruf der Funktion über sie (die die Multiprocessing tut) ist.

Läuft auf Ubuntu 11.04 64, python 3.2.

1voto

Ross Patterson Punkte 5533

Anstatt eine Schleife zu verwenden, denke ich, dass das beabsichtigte Muster für die multiprocessing Modul ist die Erstellung eines Pool und verwenden Sie die Pool.map_async Methode. D.h., konvertieren Sie Ihre Schleife in eine Art Iterator (wahrscheinlich eine Generator Methode). Dann übergeben Sie das Äquivalent Ihrer do_search Methode als Funktion und Ihren Iterator auf map_async .

0voto

Michael Fayad Punkte 1069

Sie können die Pool-Klasse im Multiprocessing verwenden:

from multiprocessing import Pool
pool = Pool(processes=5)
valuesProcessed = pool.map(someFunction, valuesToProcess)

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X