94 Stimmen

Java: ExecutorService, der ab einer bestimmten Größe der Warteschlange bei der Übermittlung blockiert

Ich versuche, eine Lösung zu programmieren, bei der ein einzelner Thread E/A-intensive Aufgaben erzeugt, die parallel ausgeführt werden können. Jede Aufgabe hat erhebliche In-Memory-Daten. Ich möchte also die Anzahl der zu einem Zeitpunkt anstehenden Aufgaben begrenzen können.

Wenn ich den ThreadPoolExecutor wie folgt erstelle:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(maxQueue));

Dann wird die executor.submit(callable) wirft RejectedExecutionException wenn sich die Warteschlange füllt und alle Threads bereits belegt sind.

Was kann ich tun, damit executor.submit(callable) blockieren, wenn die Warteschlange voll ist und alle Threads beschäftigt sind?

EDIT : Ich habe versucht este :

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

Und es etwas erreicht die Wirkung, die ich erreicht, aber in einer uneleganten Art und Weise (im Grunde abgelehnt Threads in der aufrufenden Thread ausgeführt werden, so dass dies blockiert den aufrufenden Thread von mehr einreichen).

EDIT: (5 Jahre nachdem ich die Frage gestellt habe)

An alle, die diese Frage und ihre Antworten lesen: Bitte halten Sie die akzeptierte Antwort nicht für die einzig richtige Lösung. Bitte lesen Sie alle Antworten und Kommentare durch.

66voto

jtahlborn Punkte 51903

Ich habe das Gleiche getan. Der Trick besteht darin, eine BlockingQueue zu erstellen, bei der die offer()-Methode eigentlich eine put() ist. (Sie können jede beliebige Basis-BlockingQueue-Implikation verwenden, die Sie wünschen).

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}

Beachten Sie, dass dies nur für Threadpools funktioniert, bei denen corePoolSize==maxPoolSize Seien Sie also vorsichtig (siehe Kommentare).

17voto

Krease Punkte 15196

Die derzeit akzeptierte Antwort hat ein potenziell erhebliches Problem - sie ändert das Verhalten von ThreadPoolExecutor.execute so, dass, wenn Sie eine corePoolSize < maxPoolSize Die ThreadPoolExecutor-Logik fügt niemals zusätzliche Arbeiter über den Kern hinaus hinzu.

Von ThreadPoolExecutor .execute(Runnable):

    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);

Insbesondere der letzte "else"-Block wird nie getroffen.

Eine bessere Alternative ist es, etwas Ähnliches zu tun, wie es OP bereits tut - eine RejectedExecutionHandler dasselbe zu tun put Logik:

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    try {
        if (!executor.isShutdown()) {
            executor.getQueue().put(r);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);
    }
}

Wie in den Kommentaren hervorgehoben wurde, gibt es bei diesem Ansatz einige Dinge zu beachten (in Bezug auf diese Antwort ):

  1. Si corePoolSize==0 dann gibt es eine Wettlaufsituation, bei der alle Threads im Pool sterben können, bevor die Aufgabe sichtbar ist
  2. Verwendung einer Implementierung, die die Warteschlangenaufgaben umhüllt (gilt nicht für ThreadPoolExecutor ) führt zu Problemen, es sei denn, der Handler wickelt es auch auf die gleiche Weise ein.

Unter Berücksichtigung dieser Probleme wird diese Lösung für die meisten typischen ThreadPoolExecutors funktionieren und den Fall behandeln, dass corePoolSize < maxPoolSize .

16voto

cvacca Punkte 1788

So habe ich das Problem bei mir gelöst:

(Hinweis: Diese Lösung blockiert den Thread, der das Callable übergibt, und verhindert so, dass RejectedExecutionException ausgelöst wird)

public class BoundedExecutor extends ThreadPoolExecutor{

    private final Semaphore semaphore;

    public BoundedExecutor(int bound) {
        super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        semaphore = new Semaphore(bound);
    }

    /**Submits task to execution pool, but blocks while number of running threads 
     * has reached the bound limit
     */
    public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{

        semaphore.acquire();            
        return submit(task);                    
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

        semaphore.release();
    }
}

5voto

beginner_ Punkte 6670

Ich weiß, dies ist eine alte Frage, aber hatte ein ähnliches Problem, das Erstellen neuer Aufgaben war sehr schnell und wenn es zu viele ein OutOfMemoryError auftreten, weil vorhandene Aufgabe nicht schnell genug abgeschlossen wurden.

In meinem Fall Callables eingereicht werden und ich das Ergebnis benötige, muss ich alle Daten speichern. Futures zurückgegeben von executor.submit() . Meine Lösung bestand darin, die Futures in eine BlockingQueue mit einer maximalen Größe. Sobald diese Warteschlange voll ist, werden keine weiteren Aufgaben mehr generiert, bis einige abgeschlossen sind (Elemente aus der Warteschlange entfernt). In Pseudocode:

final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads);
final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize);
try {   
    Thread taskGenerator = new Thread() {
        @Override
        public void run() {
            while (reader.hasNext) {
                Callable task = generateTask(reader.next());
                Future future = executor.submit(task);
                try {
                    // if queue is full blocks until a task
                    // is completed and hence no future tasks are submitted.
                    futures.put(future);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();         
                }
            }
        executor.shutdown();
        }
    }
    taskGenerator.start();

    // read from queue as long as task are being generated
    // or while Queue has elements in it
    while (taskGenerator.isAlive()
                    || !futures.isEmpty()) {
        Future future = futures.take();
        // do something
    }
} catch (InterruptedException ex) {
    Thread.currentThread().interrupt();     
} catch (ExecutionException ex) {
    throw new MyException(ex);
} finally {
    executor.shutdownNow();
}

5voto

progresivoJS Punkte 61

Wie wäre es mit der Verwendung des CallerBlocksPolicy Klasse, wenn Sie Spring-Integration verwenden?

Diese Klasse implementiert die RejectedExecutionHandler Schnittstelle, die ein Handler für Aufgaben ist, die nicht von einem ThreadPoolExecutor .

Sie können diese Richtlinie wie folgt verwenden.

executor.setRejectedExecutionHandler(new CallerBlocksPolicy());

Der Hauptunterschied zwischen CallerBlocksPolicy y CallerRunsPolicy ist, ob er die Aufgabe im Thread des Aufrufers blockiert oder ausführt.

Bitte beachten Sie dieser Code .

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X