421 Stimmen

Threading Pool ähnlich dem Multiprocessing Pool?

Gibt es eine Pool-Klasse für Arbeiter Gewinde ähnlich wie die des Multiprozessor-Moduls Pool-Klasse ?

Mir gefällt zum Beispiel die einfache Möglichkeit, eine Map-Funktion zu parallelisieren

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

Ich möchte dies jedoch ohne den Aufwand für die Erstellung neuer Prozesse tun.

Ich weiß von der GIL. In meinem Anwendungsfall wird die Funktion jedoch eine IO-gebundene C-Funktion sein, für die der Python-Wrapper die GIL vor dem eigentlichen Funktionsaufruf freigeben wird.

Muss ich meinen eigenen Threading-Pool schreiben?

0voto

Wenn es Ihnen nichts ausmacht, den Code anderer auszuführen, hier ist meiner:

Note : Es gibt eine Menge zusätzlichen Code, den Sie vielleicht entfernen möchten [zur besseren Verdeutlichung und zur Demonstration der Funktionsweise hinzugefügt].

Note : Für Methoden- und Variablennamen wurden die Python-Namenskonventionen anstelle von camelCase verwendet.

Arbeitsverfahren:

  1. Die MultiThread-Klasse wird mit einer Anzahl von Instanzen von Threads initiiert, indem sie Sperre, Arbeitswarteschlange, Exit-Flag und Ergebnisse gemeinsam nutzen.
  2. SingleThread wird von MultiThread gestartet, sobald dieser alle Instanzen erstellt hat.
  3. Wir können Arbeiten mit MultiThread hinzufügen (er kümmert sich um das Sperren).
  4. SingleThreads verarbeitet die Arbeitswarteschlange mit einer Sperre in der Mitte.
  5. Nach getaner Arbeit können Sie alle Threads mit gemeinsamen booleschen Werten löschen.
  6. Hier kann die Arbeit alles sein. Es kann automatisch importieren (unkommentiert importieren Zeile) und verarbeiten das Modul mit den angegebenen Argumenten.
  7. Die Ergebnisse werden den Ergebnissen hinzugefügt und können mit get_results abgerufen werden.

Code:

import threading
import queue

class SingleThread(threading.Thread):
    def __init__(self, name, work_queue, lock, exit_flag, results):
        threading.Thread.__init__(self)
        self.name = name
        self.work_queue = work_queue
        self.lock = lock
        self.exit_flag = exit_flag
        self.results = results

    def run(self):
        # print("Coming %s with parameters %s", self.name, self.exit_flag)
        while not self.exit_flag:
            # print(self.exit_flag)
            self.lock.acquire()
            if not self.work_queue.empty():
                work = self.work_queue.get()
                module, operation, args, kwargs = work.module, work.operation, work.args, work.kwargs
                self.lock.release()
                print("Processing : " + operation + " with parameters " + str(args) + " and " + str(kwargs) + " by " + self.name + "\n")
                # module = __import__(module_name)
                result = str(getattr(module, operation)(*args, **kwargs))
                print("Result : " + result + " for operation " + operation + " and input " + str(args) + " " + str(kwargs))
                self.results.append(result)
            else:
                self.lock.release()
        # process_work_queue(self.work_queue)

class MultiThread:
    def __init__(self, no_of_threads):
        self.exit_flag = bool_instance()
        self.queue_lock = threading.Lock()
        self.threads = []
        self.work_queue = queue.Queue()
        self.results = []
        for index in range(0, no_of_threads):
            thread = SingleThread("Thread" + str(index+1), self.work_queue, self.queue_lock, self.exit_flag, self.results)
            thread.start()
            self.threads.append(thread)

    def add_work(self, work):
        self.queue_lock.acquire()
        self.work_queue._put(work)
        self.queue_lock.release()

    def destroy(self):
        self.exit_flag.value = True
        for thread in self.threads:
            thread.join()

    def get_results(self):
        return self.results

class Work:
    def __init__(self, module, operation, args, kwargs={}):
        self.module = module
        self.operation = operation
        self.args = args
        self.kwargs = kwargs

class SimpleOperations:
    def sum(self, *args):
        return sum([int(arg) for arg in args])

    @staticmethod
    def mul(a, b, c=0):
        return int(a) * int(b) + int(c)

class bool_instance:
    def __init__(self, value=False):
        self.value = value

    def __setattr__(self, key, value):
        if key != "value":
            raise AttributeError("Only value can be set!")
        if not isinstance(value, bool):
            raise AttributeError("Only True/False can be set!")
        self.__dict__[key] = value
        # super.__setattr__(key, bool(value))

    def __bool__(self):
        return self.value

if __name__ == "__main__":
    multi_thread = MultiThread(5)
    multi_thread.add_work(Work(SimpleOperations(), "mul", [2, 3], {"c":4}))
    while True:
        data_input = input()
        if data_input == "":
            pass
        elif data_input == "break":
            break
        else:
            work = data_input.split()
            multi_thread.add_work(Work(SimpleOperations(), work[0], work[1:], {}))
    multi_thread.destroy()
    print(multi_thread.get_results())

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X