6 Stimmen

Ist dieser Python Producer-Consumer Lockless-Ansatz thread-sicher?

Ich habe kürzlich ein Programm geschrieben, das ein einfaches Erzeuger/Verbraucher-Muster verwendet. Ursprünglich hatte es einen Fehler im Zusammenhang mit der unsachgemäßen Verwendung von threading.Lock, den ich schließlich behoben habe. Aber es hat mich zum Nachdenken gebracht, ob es möglich ist, das Producer/Consumer-Muster ohne Sperren zu implementieren.

Die Anforderungen in meinem Fall waren einfach:

  • Ein Hersteller-Thread.
  • Ein Verbraucher-Thread.
  • Die Warteschlange hat nur Platz für einen Artikel.
  • Der Produzent kann den nächsten Gegenstand herstellen, bevor der aktuelle Gegenstand verbraucht ist. Der aktuelle Gegenstand ist also verloren, aber das ist für mich in Ordnung.
  • Der Verbraucher kann den aktuellen Artikel verbrauchen, bevor der nächste produziert wird. Der aktuelle Gegenstand wird also zweimal (oder öfter) verbraucht, aber das ist für mich in Ordnung.

Also habe ich dies geschrieben:

QUEUE_ITEM = None

# this is executed in one threading.Thread object
def producer():
    global QUEUE_ITEM
    while True:
        i = produce_item()
        QUEUE_ITEM = i

# this is executed in another threading.Thread object
def consumer():
    global QUEUE_ITEM
    while True:
        i = QUEUE_ITEM
        consume_item(i)

Meine Frage ist: Ist dieser Code thread-sicher?

Unmittelbarer Kommentar: dieser Code ist nicht wirklich schlosslos - ich benutze CPython und es hat GIL.

Ich habe den Code ein wenig getestet und er scheint zu funktionieren. Er übersetzt einige LOAD- und STORE-Operationen, die aufgrund von GIL atomar sind. Aber ich weiß auch, dass del x Operation ist nicht atomar, wenn x implementiert __del__ Methode. Wenn mein Artikel also eine __del__ Methode und eine unangenehme Terminplanung passiert, kann es zu Problemen kommen. Oder nicht?

Eine andere Frage ist: Welche Art von Einschränkungen (z. B. für den Typ der produzierten Artikel) muss ich vornehmen, damit der obige Code gut funktioniert?

Meine Fragen beziehen sich nur auf die theoretische Möglichkeit, die Eigenheiten von CPython und GIL auszunutzen, um eine sperrfreie (d.h. keine Sperren wie threading.Lock explizit im Code) Lösung zu finden.

6voto

Dustin Punkte 85400

Trickserei wird dich beißen. Verwenden Sie einfach Queue für die Kommunikation zwischen Threads.

2voto

Unknown Punkte 44574

Ja, das wird so funktionieren, wie Sie es beschrieben haben:

  1. Dass der Hersteller ein überspringbares Element produzieren kann.
  2. Dass der Verbraucher das gleiche Element verbrauchen kann.

Aber ich weiß auch, dass del x Operation nicht atomar ist, wenn x implementiert del Methode. Wenn mein Artikel also eine del Methode und eine unangenehme Terminplanung passiert, kann es zu Problemen kommen.

Ich sehe hier kein "del". Wenn ein "del" in "consume_item" vorkommt, dann wird die del im Produzenten-Thread auftreten können. Ich glaube nicht, dass dies ein "Problem" wäre.

Machen Sie sich aber nicht die Mühe, dies zu benutzen. Sie werden am Ende mit CPU auf sinnlose Polling-Zyklen, und es ist nicht schneller als mit einer Warteschlange mit Sperren, da Python bereits eine globale Sperre hat.

1voto

David Locke Punkte 17314

Dies ist nicht vraiment thread-safe, weil der Hersteller überschreiben könnte QUEUE_ITEM bevor der Verbraucher es konsumiert hat und der Verbraucher es konsumieren könnte QUEUE_ITEM zweimal. Wie Sie schon sagten, ist das für Sie in Ordnung, aber für die meisten Menschen nicht.

Jemand mit mehr Wissen über cpython Interna wird Ihnen mehr theoretische Fragen beantworten müssen.

0voto

Bastien Léonard Punkte 57728

Ich denke, dass es möglich ist, dass ein Thread während des Produzierens/Verbrauchens unterbrochen wird, insbesondere wenn es sich um große Objekte handelt. Edit: Das ist nur eine wilde Vermutung. Ich bin kein Experte.

Außerdem können die Threads eine beliebige Anzahl von Gegenständen produzieren/verbrauchen, bevor der andere in Gang kommt.

0voto

null Punkte 828

Sie können eine Liste als Warteschlange verwenden, solange Sie sich an append/pop halten, da beide atomar sind.

QUEUE = []

# this is executed in one threading.Thread object
def producer():
    global QUEUE
    while True:
        i = produce_item()
        QUEUE.append(i)

# this is executed in another threading.Thread object
def consumer():
    global QUEUE
    while True:
        try:
            i = QUEUE.pop(0)
        except IndexError:
            # queue is empty
            continue

        consume_item(i)

In einem Klassenbereich wie unten können Sie sogar die Warteschlange löschen.

class Atomic(object):
    def __init__(self):
        self.queue = []

    # this is executed in one threading.Thread object
    def producer(self):
        while True:
            i = produce_item()
            self.queue.append(i)

    # this is executed in another threading.Thread object
    def consumer(self):
        while True:
            try:
                i = self.queue.pop(0)
            except IndexError:
                # queue is empty
                continue

            consume_item(i)

    # There's the possibility producer is still working on it's current item.
    def clear_queue(self):
        self.queue = []

Sie müssen herausfinden, welche Listenoperationen atomar sind, indem Sie sich den erzeugten Bytecode ansehen.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X