7 Stimmen

Aufrechterhaltung der Ordnung in einer Multi-Threaded-Pipeline

Ich erwäge eine Multithreading-Architektur für eine Verarbeitungspipeline. Mein Hauptverarbeitungsmodul hat eine Eingabewarteschlange, aus der es Datenpakete empfängt. Es führt dann Transformationen an diesen Paketen durch (Entschlüsselung usw.) und stellt sie in eine Ausgabewarteschlange.

Das Threading setzt dort an, wo viele Eingabepakete unabhängig voneinander umgewandelt werden können.

Die Pointe ist jedoch, dass die Ausgabewarteschlange dieselbe Reihenfolge haben muss wie die Eingabewarteschlange (d. h., das erste, das aus der Eingabewarteschlange gezogen wird, muss auch das erste sein, das in die Ausgabewarteschlange geschoben wird, unabhängig davon, ob seine Umwandlungen zuerst abgeschlossen wurden).

Natürlich wird es irgendeine Art von Synchronisation in der Ausgabewarteschlange geben. Meine Frage lautet also: Wie kann ich am besten sicherstellen, dass diese Reihenfolge beibehalten wird?

16voto

Anthony Williams Punkte 64334

Ein einzelner Thread soll die Eingabewarteschlange lesen, einen Platzhalter in die Ausgabewarteschlange stellen und dann das Element an einen Worker-Thread zur Verarbeitung weitergeben. Wenn die Daten fertig sind, aktualisiert der Worker-Thread den Platzhalter. Wenn der Thread, der den Wert aus der Ausgabewarteschlange benötigt, den Platzhalter liest, kann er blockieren, bis die zugehörigen Daten bereit sind.

Da nur ein einziger Thread die Eingabewarteschlange liest und dieser Thread den Platzhalter sofort in die Ausgabewarteschlange stellt, ist die Reihenfolge in der Ausgabewarteschlange die gleiche wie in der Eingabewarteschlange. Die Worker-Threads können zahlreich sein und die Transformationen in beliebiger Reihenfolge durchführen.

Auf Plattformen, die Futures unterstützen, sind sie als Platzhalter ideal. Auf anderen Systemen können Sie eine Ereignis-, Monitor- oder Bedingungsvariable verwenden.

5voto

ULysses Punkte 990

Unter den folgenden Annahmen

  • es sollte eine Eingabewarteschlange, eine Ausgabewarteschlange und eine Arbeitswarteschlange geben
  • es sollte nur eine Eingabewarteschlange geben Hörer
  • Die Ausgabemeldung sollte eine Wartezeit enthalten Handle und einen Zeiger auf Worker/Output-Daten enthalten
  • kann es eine beliebige Anzahl von Arbeits-Threads

Ich würde den folgenden Ablauf in Betracht ziehen:

Der Hörer der Eingabewarteschlange führt diese Schritte aus:

  1. extrahiert die Eingabenachricht;
  2. erzeugt eine Ausgabemeldung:
    1. initialisiert die Arbeitsdatenstruktur
    2. setzt den Wartegriff zurück
  3. reiht den Zeiger auf die Ausgabemeldung in die arbeiten Warteschlange
  4. reiht den Zeiger auf die Ausgabemeldung in die Ausgabe Warteschlange

Der Worker-Thread macht Folgendes:

  1. wartet auf eine Arbeitsvorrat auf einen Zeiger auf eine Ausgabemeldung zu extrahieren Nachricht zu extrahieren
  2. verarbeitet die Nachricht auf der Grundlage der angegebenen Daten und setzt das Ereignis, wenn es fertig ist

Der Verbraucher tut Folgendes:

  1. wartet auf n Ausgabewarteschlange zu einen Zeiger auf eine Ausgabe zu extrahieren Nachricht zu extrahieren
  2. wartet auf einen Handle, bis die Ausgabedaten bereit sind
  3. macht etwas mit den Daten

1voto

Steven Sudit Punkte 19005

Das wird implementierungsspezifisch sein. Eine allgemeine Lösung besteht darin, die Eingabeelemente zu nummerieren und die Nummerierung beizubehalten, damit Sie die Ausgabeelemente später sortieren können. Dies könnte geschehen, sobald die Ausgabewarteschlange gefüllt ist, oder es könnte als Teil des Füllens der Warteschlange geschehen. Mit anderen Worten, man könnte sie an der richtigen Stelle einfügen und das Lesen der Warteschlange erst zulassen, wenn das nächste verfügbare Element fortlaufend ist.

editar

Ich werde ein grundlegendes Schema skizzieren und versuchen, es einfach zu halten, indem ich die entsprechenden Primitive verwende:

  1. Anstatt eine Warteschlange für eine Packet in die Eingabewarteschlange ein, erstellen wir einen Zukunftswert um ihn herum und reihen ihn sowohl in die Eingabe- als auch in die Ausgabewarteschlange ein. In C# könnte man es so schreiben:

    var future = new Lazy<Packet>(delegate() { return Process(packet); }, LazyThreadSafetyMode.ExecutionAndPublication);
  2. Ein Thread aus dem Pool der Arbeiter dequeued eine future aus der Eingabewarteschlange und führt aus future.Value die den Delegaten veranlasst, JIT auszuführen, und zurückkehrt, sobald der Delegat das Paket verarbeitet hat.

  3. Ein oder mehrere Verbraucher dequeues eine future aus der Ausgabewarteschlange. Wann immer sie den Wert des Pakets benötigen, rufen sie future.Value der sofort zurückkehrt, wenn ein Arbeitsthread den Delegaten bereits aufgerufen hat.

Einfach, aber funktioniert.

0voto

Wenn Sie einen windowed-approach (bekannte Anzahl von Elementen) verwenden, benutzen Sie ein Array für die Ausgabewarteschlange. Zum Beispiel, wenn es sich um Medien-Streaming handelt und Sie Pakete verwerfen, die nicht schnell genug verarbeitet wurden.

Andernfalls verwenden Sie eine Prioritäts-Warteschlange (eine besondere Art von Heap, oft auf der Grundlage eines Arrays fester Größe implementiert) für die Ausgabeelemente.

Sie müssen jedem Datenpaket eine laufende Nummer oder ein beliebiges Datum hinzufügen, nach dem Sie die Elemente sortieren können. Eine Prioritätswarteschlange ist eine baumartige Struktur, die die Reihenfolge der Elemente beim Einfügen/Pop gewährleistet.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X