421 Stimmen

Threading Pool ähnlich dem Multiprocessing Pool?

Gibt es eine Pool-Klasse für Arbeiter Gewinde ähnlich wie die des Multiprozessor-Moduls Pool-Klasse ?

Mir gefällt zum Beispiel die einfache Möglichkeit, eine Map-Funktion zu parallelisieren

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

Ich möchte dies jedoch ohne den Aufwand für die Erstellung neuer Prozesse tun.

Ich weiß von der GIL. In meinem Anwendungsfall wird die Funktion jedoch eine IO-gebundene C-Funktion sein, für die der Python-Wrapper die GIL vor dem eigentlichen Funktionsaufruf freigeben wird.

Muss ich meinen eigenen Threading-Pool schreiben?

17voto

Kashif Punkte 1070

Ja, es gibt einen Threading-Pool, der dem Multiprocessing-Pool ähnlich ist, allerdings ist er etwas versteckt und nicht richtig dokumentiert. Sie können ihn wie folgt importieren:-

from multiprocessing.pool import ThreadPool

Ich zeige Ihnen nur ein einfaches Beispiel

def test_multithread_stringio_read_csv(self):
        # see gh-11786
        max_row_range = 10000
        num_files = 100

        bytes_to_df = [
            '\n'.join(
                ['%d,%d,%d' % (i, i, i) for i in range(max_row_range)]
            ).encode() for j in range(num_files)]
        files = [BytesIO(b) for b in bytes_to_df]

        # read all files in many threads
        pool = ThreadPool(8)
        results = pool.map(self.read_csv, files)
        first_result = results[0]

        for result in results:
            tm.assert_frame_equal(first_result, result)

14voto

forumulator Punkte 768

Hier ist das Ergebnis, das ich schließlich verwendet habe. Es ist eine modifizierte Version der Klassen von dgorissen oben.

Datei: threadpool.py

from queue import Queue, Empty
import threading
from threading import Thread

class Worker(Thread):
    _TIMEOUT = 2
    """ Thread executing tasks from a given tasks queue. Thread is signalable, 
        to exit
    """
    def __init__(self, tasks, th_num):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon, self.th_num = True, th_num
        self.done = threading.Event()
        self.start()

    def run(self):       
        while not self.done.is_set():
            try:
                func, args, kwargs = self.tasks.get(block=True,
                                                   timeout=self._TIMEOUT)
                try:
                    func(*args, **kwargs)
                except Exception as e:
                    print(e)
                finally:
                    self.tasks.task_done()
            except Empty as e:
                pass
        return

    def signal_exit(self):
        """ Signal to thread to exit """
        self.done.set()

class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads, tasks=[]):
        self.tasks = Queue(num_threads)
        self.workers = []
        self.done = False
        self._init_workers(num_threads)
        for task in tasks:
            self.tasks.put(task)

    def _init_workers(self, num_threads):
        for i in range(num_threads):
            self.workers.append(Worker(self.tasks, i))

    def add_task(self, func, *args, **kwargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kwargs))

    def _close_all_threads(self):
        """ Signal all threads to exit and lose the references to them """
        for workr in self.workers:
            workr.signal_exit()
        self.workers = []

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

    def __del__(self):
        self._close_all_threads()

def create_task(func, *args, **kwargs):
    return (func, args, kwargs)

So nutzen Sie den Pool

from random import randrange
from time import sleep

delays = [randrange(1, 10) for i in range(30)]

def wait_delay(d):
    print('sleeping for (%d)sec' % d)
    sleep(d)

pool = ThreadPool(20)
for i, d in enumerate(delays):
    pool.add_task(wait_delay, d)
pool.wait_completion()

7voto

pelos Punkte 1526

Eine andere Möglichkeit ist das Hinzufügen des Prozesses zum Thread-Queue-Pool

import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
    for i in range(10):
        a = executor.submit(arg1, arg2,....)

4voto

unbeli Punkte 28027

Der Aufwand für die Erstellung der neuen Prozesse ist minimal, vor allem, wenn es sich nur um 4 Prozesse handelt. Ich bezweifle, dass dies ein Leistungsproblem in Ihrer Anwendung ist. Halten Sie es einfach, optimieren Sie dort, wo Sie müssen und wo die Profilergebnisse darauf hindeuten.

3voto

Yann Ramin Punkte 32375

Es gibt keinen eingebauten threadbasierten Pool. Es kann jedoch sehr schnell sein, eine Producer/Consumer-Warteschlange mit der Queue Klasse.

Von: https://docs.python.org/2/library/queue.html

from threading import Thread
from Queue import Queue
def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X