378 Stimmen

Python-Multiprocessing PicklingError: Kann nicht beizen <Typ 'Funktion'>

Es tut mir leid, dass ich den Fehler nicht mit einem einfacheren Beispiel reproduzieren kann, und mein Code ist zu kompliziert, um ihn zu posten. Wenn ich das Programm in der IPython-Shell anstelle des regulären Python ausführe, funktioniert es gut.

Ich habe einige frühere Notizen zu diesem Problem nachgeschlagen. Sie wurden alle durch die Verwendung von pool verursacht, um Funktionen aufzurufen, die innerhalb einer Klassenfunktion definiert sind. Aber dies ist nicht der Fall für mich.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Ich wäre für jede Hilfe dankbar.

Update : Die Funktion I pickle ist auf der obersten Ebene des Moduls definiert. Allerdings ruft sie eine Funktion auf, die eine verschachtelte Funktion enthält, d.h, f() ruft auf. g() ruft auf. h() die eine verschachtelte Funktion hat i() und ich rufe an pool.apply_async(f) . f() , g() , h() sind alle auf der obersten Ebene definiert. Ich habe ein einfacheres Beispiel mit diesem Muster ausprobiert, und es funktioniert trotzdem.

431voto

unutbu Punkte 769083

Hier ist ein Liste dessen, was eingelegt werden kann . Insbesondere sind Funktionen nur dann picklbar, wenn sie auf der obersten Ebene eines Moduls definiert sind.

Dieser Teil des Codes:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

ergibt einen Fehler, der fast identisch mit dem ist, den Sie gepostet haben:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Das Problem ist, dass die pool Methoden verwenden alle eine mp.SimpleQueue um Aufgaben an die Arbeitsprozesse zu übergeben. Alles, was durch den mp.SimpleQueue muss wählbar sein, und foo.work ist nicht auswählbar, da es nicht auf der obersten Ebene des Moduls definiert ist.

Dies kann durch die Definition einer Funktion auf der obersten Ebene behoben werden, die Folgendes aufruft foo.work() :

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

Beachten Sie, dass foo ist wählbar, da Foo ist auf der obersten Ebene definiert und foo.__dict__ ist beizbar.

141voto

Mike McKerns Punkte 30236

Ich würde die pathos.multiprocesssing anstelle von multiprocessing . pathos.multiprocessing ist eine Abzweigung von multiprocessing die die dill . dill kann fast alles in Python serialisieren, so dass Sie viel mehr parallel senden können. Die pathos fork hat auch die Möglichkeit, direkt mit Funktionen mit mehreren Argumenten zu arbeiten, wie Sie es für Klassenmethoden benötigen.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

Siehe pathos (und wenn Sie möchten, dill ) https://github.com/uqfoundation

65voto

Robert Hafner Punkte 2369

Wenn dieses Problem auftritt mit multiprocessing Eine einfache Lösung ist der Wechsel von Pool a ThreadPool . Dies kann ohne eine Änderung des Codes außer dem Import erfolgen.

from multiprocessing.pool import ThreadPool as Pool

Dies funktioniert, weil ThreadPool den Speicher mit dem Hauptthread teilt, anstatt einen neuen Prozess zu erstellen - das bedeutet, dass kein Picking erforderlich ist.

Der Nachteil dieser Methode ist, dass Python nicht die beste Sprache für den Umgang mit Threads ist - es verwendet etwas, das Global Interpreter Lock genannt wird, um thread-sicher zu bleiben, was einige Anwendungsfälle hier verlangsamen kann. Wenn Sie jedoch in erster Linie mit anderen Systemen interagieren (HTTP-Befehle ausführen, mit einer Datenbank kommunizieren, in Dateisysteme schreiben), dann ist Ihr Code wahrscheinlich nicht an die CPU gebunden und wird nicht viel einstecken müssen. Tatsächlich habe ich beim Schreiben von HTTP/HTTPS-Benchmarks festgestellt, dass das hier verwendete Thread-Modell weniger Overhead und Verzögerungen aufweist, da der Overhead durch die Erstellung neuer Prozesse viel höher ist als der Overhead für die Erstellung neuer Threads und das Programm ansonsten nur auf HTTP-Antworten gewartet hat.

Wenn Sie also eine Menge Zeug im Python-Benutzerraum verarbeiten, ist dies vielleicht nicht die beste Methode.

42voto

rocksportrocker Punkte 6993

Wie andere gesagt haben multiprocessing kann nur Python-Objekte an Arbeitsprozesse übertragen, die gepickt werden können. Wenn Sie Ihren Code nicht wie von unutbu beschrieben reorganisieren können, können Sie dill s erweiterte Beiz-/Entbeizfunktionen für die Übertragung von Daten (insbesondere von Codedaten), wie ich unten zeige.

Diese Lösung erfordert lediglich die Installation von dill und keine anderen Bibliotheken als pathos :

import os
from multiprocessing import Pool

import dill

def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)

def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))

if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()

31voto

Ezekiel Kruglick Punkte 4206

Ich habe festgestellt, dass ich genau diese Fehlerausgabe auch bei einem einwandfrei funktionierenden Teil des Codes erzeugen kann, wenn ich versuche, den Profiler dafür zu verwenden.

Beachten Sie, dass dies unter Windows geschah (wo das Forking etwas weniger elegant ist).

Ich war auf der Flucht:

python -m profile -o output.pstats <script> 

Dabei stellte ich fest, dass das Entfernen der Profilerstellung den Fehler beseitigte und das Einfügen der Profilerstellung ihn wiederherstellte. Es hat mich auch verrückt gemacht, weil ich wusste, dass der Code früher funktionierte. Ich überprüfte, ob etwas pool.py aktualisiert hatte... dann hatte ich ein ungutes Gefühl und entfernte die Profilerstellung und das war's.

Ich poste hier für die Archive, falls noch jemand darauf stößt.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X