484 Stimmen

Benutzerdefinierter Thread-Pool im Java 8-Parallels stream

Ist es möglich, einen benutzerdefinierten Thread-Pool für Java 8 parallelen Stream anzugeben? Ich kann es nirgendwo finden.

Stellen Sie sich vor, ich habe eine Serveranwendung und möchte parallele Streams verwenden. Aber die Anwendung ist groß und mehrfädig, daher möchte ich sie aufteilen. Ich möchte nicht, dass eine langsame laufende Aufgabe in einem Modul der Anwendung andere Aufgaben aus einem anderen Modul blockiert.

Wenn ich keine unterschiedlichen Thread-Pools für verschiedene Module verwenden kann, bedeutet das, dass ich parallele Streams in den meisten realen Situationen nicht sicher verwenden kann.

Probieren Sie das folgende Beispiel aus. Es gibt einige rechenintensive Aufgaben, die in separaten Threads ausgeführt werden. Die Aufgaben nutzen parallele Streams. Die erste Aufgabe ist fehlerhaft, daher dauert jeder Schritt 1 Sekunde (simuliert durch Thread-Sleep). Das Problem ist, dass andere Threads stecken bleiben und darauf warten, dass die fehlerhafte Aufgabe beendet wird. Dies ist ein konstruiertes Beispiel, aber stellen Sie sich eine Servlet-Anwendung vor und jemand gibt eine langlaufende Aufgabe im gemeinsamen Fork-Join-Pool ein.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //fehlerhafte Aufgabe
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));

        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

0voto

michal.jakubeczy Punkte 6138

Ich habe eine Hilfsmethode erstellt, um eine Aufgabe parallel mit einem Argument auszuführen, das die maximale Anzahl von Threads definiert.

public static void runParallel(final int maxThreads, Runnable task) throws RuntimeException {
    ForkJoinPool forkJoinPool = null;
    try {
        forkJoinPool = new ForkJoinPool(maxThreads);
        forkJoinPool.submit(task).get();
    } catch (InterruptedException | ExecutionException e) {
        throw new RuntimeException(e);
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown();
        }
    }
}

Es erstellt ForkJoinPool mit der maximalen Anzahl von erlaubten Threads und beendet es, nachdem die Aufgabe abgeschlossen ist (oder fehlschlägt).

Die Verwendung ist wie folgt:

final int maxThreads = 4;
runParallel(maxThreads, () -> 
    IntStream.range(1, 1_000_000).parallel()
            .filter(PrimesPrint::isPrime)
            .boxed().collect(Collectors.toList()));

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X