Jakes Antwort ist gut, aber wenn Sie keinen Threadpool verwenden wollen (Sie wissen nicht, wie viele Threads Sie benötigen, aber erstellen Sie sie nach Bedarf), dann ist eine gute Möglichkeit, Informationen zwischen Threads zu übertragen, die eingebaute Warteschlange.Warteschlange Klasse, da sie Thread-Sicherheit bietet.
Ich habe den folgenden Dekorator erstellt, damit er sich ähnlich wie der Threadpool verhält:
def threaded(f, daemon=False):
import Queue
def wrapped_f(q, *args, **kwargs):
'''this function calls the decorated function and puts the
result in a queue'''
ret = f(*args, **kwargs)
q.put(ret)
def wrap(*args, **kwargs):
'''this is the function returned from the decorator. It fires off
wrapped_f in a new thread and returns the thread object with
the result queue attached'''
q = Queue.Queue()
t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
t.daemon = daemon
t.start()
t.result_queue = q
return t
return wrap
Dann verwenden Sie es einfach als:
@threaded
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x
# does not block, returns Thread object
y = long_task(10)
print y
# this blocks, waiting for the result
result = y.result_queue.get()
print result
Die dekorierte Funktion erstellt bei jedem Aufruf einen neuen Thread und gibt ein Thread-Objekt zurück, das die Warteschlange enthält, die das Ergebnis erhalten wird.
UPDATE
Es ist schon eine ganze Weile her, seit ich diese Antwort gepostet habe, aber sie wird immer noch aufgerufen, also dachte ich, ich würde sie aktualisieren, um die Art und Weise wiederzugeben, wie ich das in neueren Versionen von Python mache:
Python 3.2 hinzugefügt in der concurrent.futures
Modul, das eine High-Level-Schnittstelle für parallele Aufgaben bietet. Es bietet ThreadPoolExecutor
y ProcessPoolExecutor
Sie können also einen Thread oder einen Prozess-Pool mit der gleichen API verwenden.
Ein Vorteil dieser API ist, dass die Übermittlung einer Aufgabe an eine Executor
gibt eine Future
Objekt, das mit dem Rückgabewert der von Ihnen übermittelten Callable abgeschlossen wird.
Dies macht das Anbringen einer queue
Objekt überflüssig, was den Dekorator erheblich vereinfacht:
_DEFAULT_POOL = ThreadPoolExecutor()
def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)
return wrap
Dabei wird ein Standard Modul Threadpool-Executor, wenn kein solcher übergeben wird.
Die Verwendung ist sehr ähnlich wie zuvor:
@threadpool
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x
# does not block, returns Future object
y = long_task(10)
print y
# this blocks, waiting for the result
result = y.result()
print result
Wenn Sie Python 3.4+ verwenden, ist eine wirklich nette Funktion dieser Methode (und von Future-Objekten im Allgemeinen), dass die zurückgegebene Zukunft in eine "Wrapping"-Methode umgewandelt werden kann. asyncio.Future
con asyncio.wrap_future
. Dies erleichtert die Arbeit mit Koroutinen:
result = await asyncio.wrap_future(long_task(10))
Wenn Sie keinen Zugriff auf die zugrunde liegenden Daten benötigen concurrent.Future
Objekts können Sie den Umbruch in den Dekorator aufnehmen:
_DEFAULT_POOL = ThreadPoolExecutor()
def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))
return wrap
Wenn Sie dann einen rechenintensiven oder blockierenden Code aus dem Ereignisschleifen-Thread auslagern müssen, können Sie ihn in eine dekorierte Funktion einfügen:
@threadpool
def some_long_calculation():
...
# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()