391 Stimmen

Python-Multiprocessing PicklingError: Kann nicht beizen <Typ 'Funktion'>

Es tut mir leid, dass ich den Fehler nicht mit einem einfacheren Beispiel reproduzieren kann, und mein Code ist zu kompliziert, um ihn zu posten. Wenn ich das Programm in der IPython-Shell anstelle des regulären Python ausführe, funktioniert es gut.

Ich habe einige frühere Notizen zu diesem Problem nachgeschlagen. Sie wurden alle durch die Verwendung von pool verursacht, um Funktionen aufzurufen, die innerhalb einer Klassenfunktion definiert sind. Aber dies ist nicht der Fall für mich.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Ich wäre für jede Hilfe dankbar.

Update : Die Funktion I pickle ist auf der obersten Ebene des Moduls definiert. Allerdings ruft sie eine Funktion auf, die eine verschachtelte Funktion enthält, d.h, f() ruft auf. g() ruft auf. h() die eine verschachtelte Funktion hat i() und ich rufe an pool.apply_async(f) . f() , g() , h() sind alle auf der obersten Ebene definiert. Ich habe ein einfacheres Beispiel mit diesem Muster ausprobiert, und es funktioniert trotzdem.

6voto

Penkey Suresh Punkte 5536
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Dieser Fehler tritt auch auf, wenn Sie eine eingebaute Funktion innerhalb des Modellobjekts haben, die an den asynchronen Auftrag übergeben wurde.

Prüfen Sie also unbedingt die Modellobjekte die übergeben werden, haben keine eingebauten Funktionen. (In unserem Fall haben wir FieldTracker() Funktion von django-model-utils innerhalb des Modells, um ein bestimmtes Feld zu verfolgen). Hier ist die Link zum entsprechenden GitHub-Thema.

4voto

Ilia w495 Nikitin Punkte 251

Diese Lösung erfordert nur die Installation von Dill und keine anderen Bibliotheken wie Pathos

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res

def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

Es funktioniert auch für Numpy-Arrays.

4voto

Eine schnelle Lösung besteht darin, die Funktion global zu machen

from multiprocessing import Pool

class Test:
    def __init__(self, x):
        self.x = x

    @staticmethod
    def test(x):
        return x**2

    def test_apply(self, list_):
        global r
        def r(x):
            return Test.test(x + self.x)

        with Pool() as p:
            l = p.map(r, list_)

        return l

if __name__ == '__main__':
    o = Test(2)
    print(o.test_apply(range(10)))

0voto

shouldsee Punkte 416

Aufbauend auf @rocksportrocker Lösung, Es würde Sinn machen, beim Senden und Empfangen der Ergebnisse zu dillen.

import dill
import itertools
def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    res = fun(*args)
    res = dill.dumps(res)
    return res

def dill_map_async(pool, fun, args_list,
                   as_tuple=True,
                   **kw):
    if as_tuple:
        args_list = ((x,) for x in args_list)

    it = itertools.izip(
        itertools.cycle([fun]),
        args_list)
    it = itertools.imap(dill.dumps, it)
    return pool.map_async(run_dill_encoded, it, **kw)

if __name__ == '__main__':
    import multiprocessing as mp
    import sys,os
    p = mp.Pool(4)
    res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
                  [lambda x:x+1]*10,)
    res = res.get(timeout=100)
    res = map(dill.loads,res)
    print(res)

-2voto

Gru Punkte 516

Wie @penky Suresh in diesem Artikel vorgeschlagen hat Antwort verwenden Sie keine eingebauten Schlüsselwörter.

Offensichtlich args ist ein eingebautes Schlüsselwort, wenn es um Multiprocessing geht

class TTS:
    def __init__(self):
        pass

    def process_and_render_items(self):
        multiprocessing_args = [{"a": "b", "c": "d"}, {"e": "f", "g": "h"}]

        with ProcessPoolExecutor(max_workers=10) as executor:
          # Using args here is fine. 
            future_processes = {
              executor.submit(TTS.process_and_render_item, args)
                for args in multiprocessing_args
            }

            for future in as_completed(future_processes):
                try:
                    data = future.result()
                except Exception as exc:
                    print(f"Generated an exception: {exc}")
                else:
                   print(f"Generated data for comment process: {future}")

    # Dont use 'args' here. It seems to be a built-in keyword.
    # Changing 'args' to 'arg' worked for me.
    def process_and_render_item(arg):
        print(arg)
      # This will print {"a": "b", "c": "d"} for the first process
      # and {"e": "f", "g": "h"} for the second process.

PS: Die Tabulatoren/Leerzeichen sind vielleicht ein bisschen daneben.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X