94 Stimmen

Java: ExecutorService, der ab einer bestimmten Größe der Warteschlange bei der Übermittlung blockiert

Ich versuche, eine Lösung zu programmieren, bei der ein einzelner Thread E/A-intensive Aufgaben erzeugt, die parallel ausgeführt werden können. Jede Aufgabe hat erhebliche In-Memory-Daten. Ich möchte also die Anzahl der zu einem Zeitpunkt anstehenden Aufgaben begrenzen können.

Wenn ich den ThreadPoolExecutor wie folgt erstelle:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(maxQueue));

Dann wird die executor.submit(callable) wirft RejectedExecutionException wenn sich die Warteschlange füllt und alle Threads bereits belegt sind.

Was kann ich tun, damit executor.submit(callable) blockieren, wenn die Warteschlange voll ist und alle Threads beschäftigt sind?

EDIT : Ich habe versucht este :

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

Und es etwas erreicht die Wirkung, die ich erreicht, aber in einer uneleganten Art und Weise (im Grunde abgelehnt Threads in der aufrufenden Thread ausgeführt werden, so dass dies blockiert den aufrufenden Thread von mehr einreichen).

EDIT: (5 Jahre nachdem ich die Frage gestellt habe)

An alle, die diese Frage und ihre Antworten lesen: Bitte halten Sie die akzeptierte Antwort nicht für die einzig richtige Lösung. Bitte lesen Sie alle Antworten und Kommentare durch.

3voto

Petro Semeniuk Punkte 6692

Ich hatte ein ähnliches Problem und habe es mit Hilfe von beforeExecute/afterExecute Haken von ThreadPoolExecutor :

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Blocks current task execution if there is not enough resources for it.
 * Maximum task count usage controlled by maxTaskCount property.
 */
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {

    private final ReentrantLock taskLock = new ReentrantLock();
    private final Condition unpaused = taskLock.newCondition();
    private final int maxTaskCount;

    private volatile int currentTaskCount;

    public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, int maxTaskCount) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.maxTaskCount = maxTaskCount;
    }

    /**
     * Executes task if there is enough system resources for it. Otherwise
     * waits.
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        taskLock.lock();
        try {
            // Spin while we will not have enough capacity for this job
            while (maxTaskCount < currentTaskCount) {
                try {
                    unpaused.await();
                } catch (InterruptedException e) {
                    t.interrupt();
                }
            }
            currentTaskCount++;
        } finally {
            taskLock.unlock();
        }
    }

    /**
     * Signalling that one more task is welcome
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        taskLock.lock();
        try {
            currentTaskCount--;
            unpaused.signalAll();
        } finally {
            taskLock.unlock();
        }
    }
}

Das sollte für Sie ausreichend sein. Btw, ursprüngliche Implementierung war Aufgabe Größe basiert, weil eine Aufgabe größer 100 Zeit als eine andere sein könnte und zwei riesige Aufgaben einreichen war die Box zu töten, aber laufen eine große und viele kleine war Okay. Wenn Ihre E/A-intensive Aufgaben sind etwa die gleiche Größe, die Sie diese Klasse verwenden könnte, andernfalls lassen Sie mich wissen, und ich werde Größe basierte Implementierung posten.

P.S. Sie sollten Folgendes überprüfen ThreadPoolExecutor javadoc. Es ist wirklich schön, Benutzerhandbuch von Doug Lea, wie es leicht angepasst werden könnte.

3voto

Ich habe eine Lösung implementiert, die dem Decorator-Muster folgt und eine Semaphore verwendet, um die Anzahl der ausgeführten Tasks zu kontrollieren. Sie können es mit jeder Executor und:

  • Geben Sie das Maximum der laufenden Aufgaben an
  • Geben Sie die maximale Zeitspanne an, in der auf eine Genehmigung zur Ausführung einer Aufgabe gewartet werden soll (wenn die Zeitspanne verstreicht und keine Genehmigung eingeholt wird, wird ein RejectedExecutionException geworfen wird)

    import static java.util.concurrent.TimeUnit.MILLISECONDS;

    import java.time.Duration; import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore;

    import javax.annotation.Nonnull;

    public class BlockingOnFullQueueExecutorDecorator implements Executor {

    private static final class PermitReleasingDecorator implements Runnable {
    
        @Nonnull
        private final Runnable delegate;
    
        @Nonnull
        private final Semaphore semaphore;
    
        private PermitReleasingDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) {
            this.delegate = task;
            this.semaphore = semaphoreToRelease;
        }
    
        @Override
        public void run() {
            try {
                this.delegate.run();
            }
            finally {
                // however execution goes, release permit for next task
                this.semaphore.release();
            }
        }
    
        @Override
        public final String toString() {
            return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate);
        }
    }
    
    @Nonnull
    private final Semaphore taskLimit;
    
    @Nonnull
    private final Duration timeout;
    
    @Nonnull
    private final Executor delegate;
    
    public BlockingOnFullQueueExecutorDecorator(@Nonnull final Executor executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) {
        this.delegate = Objects.requireNonNull(executor, "'executor' must not be null");
        if (maximumTaskNumber < 1) {
            throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber));
        }
        this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null");
        if (this.timeout.isNegative()) {
            throw new IllegalArgumentException("'maximumTimeout' must not be negative");
        }
        this.taskLimit = new Semaphore(maximumTaskNumber);
    }
    
    @Override
    public final void execute(final Runnable command) {
        Objects.requireNonNull(command, "'command' must not be null");
        try {
            // attempt to acquire permit for task execution
            if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) {
                throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate));
            }
        }
        catch (final InterruptedException e) {
            // restore interrupt status
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }
    
        this.delegate.execute(new PermitReleasingDecorator(command, this.taskLimit));
    }
    
    @Override
    public final String toString() {
        return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(),
                this.timeout, this.delegate);
    }

    }

1voto

Gareth Davis Punkte 27204

Ich denke, es ist so einfach wie die Verwendung einer ArrayBlockingQueue anstelle von a a LinkedBlockingQueue .

Ignorieren Sie mich... das ist völlig falsch. ThreadPoolExecutor ruft auf. Queue#offer no put was die von Ihnen gewünschte Wirkung haben würde.

Sie könnten die ThreadPoolExecutor und bieten eine Implementierung von execute(Runnable) das ruft put anstelle von offer .

Ich fürchte, das ist keine völlig zufriedenstellende Antwort.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X