Ich habe einen einzelnen Thread-Produzenten, der einige Aufgabenobjekte erstellt, die dann in eine ArrayBlockingQueue
(die eine feste Größe hat).
Ich starte auch einen Multithreading-Verbraucher. Dieser wird als fester Thread-Pool aufgebaut ( Executors.newFixedThreadPool(threadCount);
). Ich übergebe dann einige ConsumerWorker-Instanzen an diesen ThreadPool, wobei jeder ConsumerWorker eine Referenz auf die oben erwähnte ArrayBlockingQueue-Instanz hat.
Jeder dieser Arbeiter wird eine take()
in der Warteschlange und bearbeiten die Aufgabe.
Mein Problem ist, wie kann ich einem Arbeitnehmer am besten mitteilen, dass es keine Arbeit mehr zu erledigen gibt. Mit anderen Worten, wie teile ich den Workern mit, dass der Producer mit dem Hinzufügen zur Warteschlange fertig ist, und von diesem Zeitpunkt an sollte jeder Worker aufhören, wenn er sieht, dass die Warteschlange leer ist.
Was ich jetzt habe, ist ein Setup, wo mein Producer mit einem Callback initialisiert wird, die ausgelöst wird, wenn er seine Aufgabe beendet (der Hinzufügung von Sachen in die Warteschlange). Ich führe auch eine Liste aller ConsumerWorker, die ich erstellt und an den ThreadPool übergeben habe. Wenn der Producer Callback mir mitteilt, dass der Producer fertig ist, kann ich dies jedem der Worker mitteilen. Zu diesem Zeitpunkt sollten sie einfach prüfen, ob die Warteschlange nicht leer ist, und wenn sie leer ist, sollten sie aufhören, so dass ich den ExecutorService-Thread-Pool ordnungsgemäß schließen kann. Das sieht in etwa so aus
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning || !inputQueue.isEmpty()) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
}
Das Problem hier ist, dass ich eine offensichtliche Race-Bedingung haben, wo manchmal der Producer beenden wird, signalisieren Sie es, und die ConsumerWorkers stoppen, BEVOR Sie alles in der Warteschlange verbrauchen.
Meine Frage ist, was ist der beste Weg, um dies zu synchronisieren, so dass es alles funktioniert ok? Sollte ich den gesamten Teil synchronisieren, in dem geprüft wird, ob der Producer läuft und ob die Warteschlange leer ist und etwas aus der Warteschlange in einem Block (auf dem Warteschlangenobjekt) genommen wird? Sollte ich nur die Aktualisierung der Warteschlange synchronisieren? isRunning
booleschen Wert für die ConsumerWorker-Instanz? Haben Sie einen anderen Vorschlag?
UPDATE, HIER IST DIE FUNKTIONIERENDE IMPLEMENTIERUNG, DIE ICH AM ENDE VERWENDET HABE:
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private final static Produced POISON = new Produced(-1);
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(true) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Produced queueElement = inputQueue.take();
Thread.sleep(new Random().nextInt(100));
if(queueElement==POISON) {
break;
}
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Consumer "+Thread.currentThread().getName()+" END");
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
try {
inputQueue.put(POISON);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Diese Antwort wurde stark von JohnVints Antwort unten inspiriert, mit nur einigen kleinen Änderungen.
\=== Aktualisierung aufgrund des Kommentars von @vendhan.
Ich danke Ihnen für Ihre Beobachtungen. Sie haben Recht, der erste Codeschnipsel in dieser Frage hat (neben anderen Problemen) die eine, wo die while(isRunning || !inputQueue.isEmpty())
macht nicht wirklich Sinn.
In meiner endgültigen Implementierung habe ich etwas getan, das Ihrem Vorschlag, "||" (oder) durch "&&" (und) zu ersetzen, näher kommt, und zwar in dem Sinne, dass jeder Arbeiter (Verbraucher) jetzt nur noch prüft, ob das Element, das er aus der Liste erhalten hat, eine Giftpille ist, und wenn ja, anhält (theoretisch können wir also sagen, dass der Arbeiter laufen muss UND die Warteschlange nicht leer sein darf).