73 Stimmen

Blockierende Warteschlange und Multithreading-Verbraucher, wie man weiß, wann man aufhören muss

Ich habe einen einzelnen Thread-Produzenten, der einige Aufgabenobjekte erstellt, die dann in eine ArrayBlockingQueue (die eine feste Größe hat).

Ich starte auch einen Multithreading-Verbraucher. Dieser wird als fester Thread-Pool aufgebaut ( Executors.newFixedThreadPool(threadCount); ). Ich übergebe dann einige ConsumerWorker-Instanzen an diesen ThreadPool, wobei jeder ConsumerWorker eine Referenz auf die oben erwähnte ArrayBlockingQueue-Instanz hat.

Jeder dieser Arbeiter wird eine take() in der Warteschlange und bearbeiten die Aufgabe.

Mein Problem ist, wie kann ich einem Arbeitnehmer am besten mitteilen, dass es keine Arbeit mehr zu erledigen gibt. Mit anderen Worten, wie teile ich den Workern mit, dass der Producer mit dem Hinzufügen zur Warteschlange fertig ist, und von diesem Zeitpunkt an sollte jeder Worker aufhören, wenn er sieht, dass die Warteschlange leer ist.

Was ich jetzt habe, ist ein Setup, wo mein Producer mit einem Callback initialisiert wird, die ausgelöst wird, wenn er seine Aufgabe beendet (der Hinzufügung von Sachen in die Warteschlange). Ich führe auch eine Liste aller ConsumerWorker, die ich erstellt und an den ThreadPool übergeben habe. Wenn der Producer Callback mir mitteilt, dass der Producer fertig ist, kann ich dies jedem der Worker mitteilen. Zu diesem Zeitpunkt sollten sie einfach prüfen, ob die Warteschlange nicht leer ist, und wenn sie leer ist, sollten sie aufhören, so dass ich den ExecutorService-Thread-Pool ordnungsgemäß schließen kann. Das sieht in etwa so aus

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

Das Problem hier ist, dass ich eine offensichtliche Race-Bedingung haben, wo manchmal der Producer beenden wird, signalisieren Sie es, und die ConsumerWorkers stoppen, BEVOR Sie alles in der Warteschlange verbrauchen.

Meine Frage ist, was ist der beste Weg, um dies zu synchronisieren, so dass es alles funktioniert ok? Sollte ich den gesamten Teil synchronisieren, in dem geprüft wird, ob der Producer läuft und ob die Warteschlange leer ist und etwas aus der Warteschlange in einem Block (auf dem Warteschlangenobjekt) genommen wird? Sollte ich nur die Aktualisierung der Warteschlange synchronisieren? isRunning booleschen Wert für die ConsumerWorker-Instanz? Haben Sie einen anderen Vorschlag?

UPDATE, HIER IST DIE FUNKTIONIERENDE IMPLEMENTIERUNG, DIE ICH AM ENDE VERWENDET HABE:

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

Diese Antwort wurde stark von JohnVints Antwort unten inspiriert, mit nur einigen kleinen Änderungen.

\=== Aktualisierung aufgrund des Kommentars von @vendhan.

Ich danke Ihnen für Ihre Beobachtungen. Sie haben Recht, der erste Codeschnipsel in dieser Frage hat (neben anderen Problemen) die eine, wo die while(isRunning || !inputQueue.isEmpty()) macht nicht wirklich Sinn.

In meiner endgültigen Implementierung habe ich etwas getan, das Ihrem Vorschlag, "||" (oder) durch "&&" (und) zu ersetzen, näher kommt, und zwar in dem Sinne, dass jeder Arbeiter (Verbraucher) jetzt nur noch prüft, ob das Element, das er aus der Liste erhalten hat, eine Giftpille ist, und wenn ja, anhält (theoretisch können wir also sagen, dass der Arbeiter laufen muss UND die Warteschlange nicht leer sein darf).

88voto

John Vint Punkte 38604

Sie sollten weiterhin take() aus der Warteschlange. Sie können eine Giftpille verwenden, um dem Arbeiter mitzuteilen, dass er aufhören soll. Zum Beispiel:

private final Object POISON_PILL = new Object();

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            if(queueElement == POISON_PILL) {
                 inputQueue.add(POISON_PILL);//notify other threads to stop
                 return;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
    //you can also clear here if you wanted
    isRunning = false;
    inputQueue.add(POISON_PILL);
}

14voto

NPE Punkte 462670

Ich würde den Arbeitnehmern ein spezielles Arbeitspaket schicken, um ihnen zu signalisieren, dass sie die Arbeit niederlegen sollen:

public class ConsumerWorker implements Runnable{

private static final Produced DONE = new Produced();

private BlockingQueue<Produced> inputQueue;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    for (;;) {
        try {
            Produced item = inputQueue.take();
            if (item == DONE) {
                inputQueue.add(item); // keep in the queue so all workers stop
                break;
            }
            // process `item`
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

}

Um die Arbeiter zu stoppen, fügen Sie einfach ConsumerWorker.DONE in die Warteschlange.

1voto

Bhaskar Punkte 7323

In Ihrem Code-Block, in dem Sie versuchen, ein Element aus der Warteschlange zurückzuholen, verwenden Sie poll(time,unit) anstelle der take() .

try { 
    Object queueElement = inputQueue.poll(timeout,unit);
     //process queueElement        
 } catch (InterruptedException e) {
        if(!isRunning && queue.isEmpty())
         return ; 
 } 

Durch die Angabe geeigneter Werte für die Zeitüberschreitung stellen Sie sicher, dass Threads nicht weiter blockieren, wenn es eine unglückliche Folge von

  1. isRunning ist wahr
  2. Die Warteschlange wird leer, so dass die Threads in das blockierte Warten übergehen (bei Verwendung von take()
  3. isRunning ist auf false gesetzt

1voto

Arpan Das Punkte 955

Können wir das nicht mit einer CountDownLatch , wobei die Größe die Anzahl der Datensätze im Produzenten ist. Und jeder Verbraucher wird countDown nach der Verarbeitung eines Datensatzes. Und seine kreuzt die awaits() Methode, wenn alle Aufgaben abgeschlossen sind. Dann stoppen Sie alle Ihre Verbraucher. Wenn alle Datensätze verarbeitet sind.

0voto

Kaelin Colclasure Punkte 3885

Es gibt eine Reihe von Strategien, die Sie anwenden können, aber eine einfache besteht darin, eine Unterklasse der Aufgabe zu haben, die das Ende des Auftrags signalisiert. Der Producer sendet dieses Signal nicht direkt. Stattdessen stellt er eine Instanz dieser Aufgabenunterklasse in die Warteschlange. Wenn einer Ihrer Verbraucher diese Aufgabe abruft und ausführt, wird das Signal gesendet.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X