Ich versuche, Python (3.2) auf Multiprozess (Ubuntu) zu verwenden, um ein massives Suchproblem zu lösen. Im Grunde möchte ich eine Liste nehmen, das erste Element herausnehmen, alle anderen Elemente finden, die die gleichen Eigenschaften wie das Objekt haben, die gefundenen Elemente und das Zielelement in einer Liste zusammenführen, sie aus der ursprünglichen Liste entfernen und (Schleife) alles wiederholen. Das Multiprocessing dient dazu, die Arbeit auf die Prozessoren zu verteilen. Der Code wird ohne Probleme einmal ausgeführt. Er wird sogar in einer Schleife ausgeführt, da die Ausnahme ignoriert wird, und scheint seine Arbeit gut zu machen. Aber innerhalb von 30 Sekunden hat er fast meine gesamten 16 GB RAM verbraucht.
Meine zwei Bedenken bisher sind 1) Ich bekomme "Exception AssertionError: AssertionError('can only test a child process',) in ignored", sobald ich eine Schleife durchlaufe (und ich bekomme eine Menge davon). Zusammen mit diesem ist massive Menge an RAM-Nutzung (die, ich denke, könnte im Zusammenhang stehen, nicht sicher). UND 2) Es scheint nicht einmal die Suche parallel auszuführen, wenn ich einen größeren Datensatz verwende.
Mein Code sieht wie folgt aus:
class triangleListWorker(multiprocessing.Process):
def __init__(self, work_queue, target, results,start):
super().__init__()
self.work_queue = work_queue
self.results = results
self.target = target
self.startIndex = start
def run(self):
while True:
try:
searching = self.work_queue.get()
self.do_search(searching)
finally:
self.work_queue.task_done()
def do_search(self,searching):
for x in range(len(searching)):
if self.target.same_plane(searching[x]):
self.results.append(self.startIndex+x)
Was ich hier versuche, ist Manager().list() zu verwenden, um alle Indizes zu speichern, bei denen das Zielobjekt und das gesuchte Objekt auf derselben Ebene liegen.
def do_multi_find_connections(self, target,searchList):
work_queue = multiprocessing.JoinableQueue()
#results= multiprocessing.Queue()
cpu_count = multiprocessing.cpu_count()
results = multiprocessing.Manager().list()
range_per_process = len(searchList) // cpu_count
start,end = 0, range_per_process + (len(searchList) % cpu_count)
for i in range(cpu_count):
worker = triangleListWorker(work_queue,target,results,start)
worker.daemon = True
worker.start()
for x in range(cpu_count):
searchsub = [searchList[x] for x in range(start,end)]
work_queue.put(searchList[start:end])
#work_queue.put(searchList[start:end])
start,end = end, end + range_per_process
print(start,end)
work_queue.join()
print( "can continue...")
return results
def find_connections(self, triangle_list,doMultiProcessing):
tlist = [x for x in triangle_list]
print("len tlist", len(tlist))
results = []
self.byPlane = []
if doMultiProcessing:
while len(tlist) > 0:
results = []
target = tlist[0]
#print("target",tcopy[0])
self.do_multi_find_connections(target,tlist)
results = self.do_multi_find_connections(target,tlist)#list of indexes
plane = []
print(len(results))
print(results)
for x in results:
plane.append(tlist[x])
new_tlist = [tlist[x] for x in range(len(tlist)) if not x in results]
print(len(new_tlist))
tlist = new_tlist
self.byPlane.append(plane)
## self.byPlane.append(plane)
## tlist = []
Dieser Code (vielleicht ein wenig hässlich) soll Schleife, um die nächste Ebene zu finden, und erschöpfen alles andere, die in der Ebene durch den Aufruf der Funktion über sie (die die Multiprocessing tut) ist.
Läuft auf Ubuntu 11.04 64, python 3.2.