5 Stimmen

Probleme bei der Verwendung eines Schlosses mit multiprocessing.Pool: Pickling-Fehler

Ich baue ein Python-Modul, um Tags aus einem großen Textkorpus zu extrahieren, und obwohl die Ergebnisse von hoher Qualität sind, wird es sehr langsam ausgeführt. Ich versuche, den Prozess zu beschleunigen, indem ich Multiprocessing verwende, und das hat auch funktioniert, bis ich versucht habe, ein Sperre einzuführen, damit nur ein Prozess gleichzeitig eine Verbindung zu unserer Datenbank herstellt. Ich kann einfach nicht herausfinden, wie ich das zum Laufen bringen kann - trotz intensiver Suche und Anpassung erhalte ich immer noch einen PicklingError: Can't pickle : attribute lookup thread.lock failed. Hier ist der problematische Code - er hat gut funktioniert, bis ich versucht habe, ein Sperr-Objekt als Argument für f zu übergeben.

def make_network(initial_tag, max_tags = 2, max_iter = 3):
    manager = Manager()
    lock = manager.Lock()
    pool = manager.Pool(8)

    # dies ist eine sehr teure Funktion, die ich parallelisieren möchte 
    # über eine Liste von Tags. Es beinhaltet einen (relativ günstigen) Aufruf zu einer externen
    # Datenbank, die einen Sperrmechanismus benötigt, um gleichzeitige Abfragen zu vermeiden. Es akzeptiert eine Liste
    # von Zeichenfolgen (Tags) als einziges Argument und gibt eine Liste von Sets mit Einträgen zurück,
    # die der Eingabeliste entsprechen.
    f = partial(get_more_tags, max_tags = max_tags, lock = lock) 

    def _recursively_find_more_tags(tags, level):
        if level >= max_iter:
            raise StopIteration
        new_tags = pool.map(f, tags)
        to_search = []
        for i, s in zip(tags, new_tags):
            for t in s:
                joined = ' '.join(t)
                print i + "|" + joined
                to_search.append(joined)
        try:
            return _recursively_find_more_tags(to_search, level+1)
        except StopIteration:
            return None

    _recursively_find_more_tags([initial_tag], 0)

7voto

Jonathan Punkte 2375

Ihr Problem ist, dass Verschlusselemente nicht pickelbar sind. In diesem Fall sehe ich zwei mögliche Lösungen für Sie.

  • Um dies zu vermeiden, können Sie Ihre Verschlusselement-Variable zu einer globalen Variable machen. Dann können Sie direkt in Ihrer Pool-Prozessfunktion darauf als globale Variable verweisen und müssen sie nicht als Argument an die Pool-Prozessfunktion übergeben. Dies funktioniert, weil Python den OS-Fork-Mechanismus verwendet, wenn die Pool-Prozesse erstellt werden, und daher den gesamten Inhalt des Prozesses, der die Pool-Prozesse erstellt, auf sie kopiert. Dies ist der einzige Weg, einen Verschluss an einen mit dem multiprocessing-Paket erstellten Python-Prozess zu übergeben. Übrigens ist es nicht notwendig, die Manager-Klasse nur für diesen Verschluss zu verwenden. Mit dieser Änderung würde Ihr Code so aussehen:

    import multiprocessing
    from functools import partial
    
    lock = None  # Globale Definition des Verschlusses
    pool = None  # Globale Definition des Pools
    
    def make_network(initial_tag, max_tags=2, max_iter=3):
        global lock
        global pool
        lock = multiprocessing.Lock()
        pool = multiprocessing.Pool(8)
    
    def get_more_tags():
        global lock
        pass
    
    # Dies ist eine sehr aufwändige Funktion, die ich gerne parallelisieren würde
    # über eine Liste von Tags. Es beinhaltet einen (relativ günstigen) Aufruf an eine externe
    # Datenbank, die einen Verschluss benötigt, um gleichzeitige Abfragen zu vermeiden. Es nimmt
    # eine Liste von Strings (Tags) als einziges Argument entgegen und gibt eine Liste von Sets zurück
    # mit Einträgen, die der Eingabliste entsprechen.
    f = partial(get_more_tags, max_tags=max_tags)
    
    def _recursively_find_more_tags(tags, level):
        global pool
        if level >= max_iter:
            raise StopIteration
        new_tags = pool.map(f, tags)
        to_search = []
        for i, s in zip(tags, new_tags):
            for t in s:
                joined = ' '.join(t)
                print(i + "|" + joined)
                to_search.append(joined)
        try:
            return _recursively_find_more_tags(to_search, level + 1)
        except StopIteration:
            return None
    
    _recursively_find_more_tags([initial_tag], 0)

In Ihrem eigentlichen Code ist es möglich, dass die Verschluss- und Pool-Variablen Klasseninstanzvariablen sein könnten.

  • Ein zweite Lösung, die ganz auf die Verwendung von Verschlüssen verzichtet, aber möglicherweise etwas höhere Overheads hat, wäre die Erstellung eines weiteren Prozesses mit multiprocessing.Process und die Verbindung über eine multiprocessing.Queue zu jedem Ihrer Pool-Prozesse. Dieser Prozess wäre für die Ausführung Ihrer Datenbankabfrage verantwortlich. Sie würden die Warteschlange verwenden, um es den Pool-Prozessen zu ermöglichen, Parameter an den Prozess zu senden, der die Datenbankabfrage verwaltet. Da alle Pool-Prozesse dieselbe Warteschlange verwenden würden, würde der Zugriff auf die Datenbank automatisch serialisiert. Die zusätzlichen Kosten würden durch das Aufnehmen/Auspacken der Datenbankabfrageargumente und der Abfrageantwort entstehen. Beachten Sie, dass Sie ein multiprocessing.Queue-Objekt einem Poolprozess als Argument übergeben können. Beachten Sie auch, dass die auf einem Windows-Betriebssystem basierende multiprocessing.Lock-Lösung nicht funktionieren würde, da Prozesse nicht mit fork-Semantik erstellt werden.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X