484 Stimmen

Benutzerdefinierter Thread-Pool im Java 8-Parallels stream

Ist es möglich, einen benutzerdefinierten Thread-Pool für Java 8 parallelen Stream anzugeben? Ich kann es nirgendwo finden.

Stellen Sie sich vor, ich habe eine Serveranwendung und möchte parallele Streams verwenden. Aber die Anwendung ist groß und mehrfädig, daher möchte ich sie aufteilen. Ich möchte nicht, dass eine langsame laufende Aufgabe in einem Modul der Anwendung andere Aufgaben aus einem anderen Modul blockiert.

Wenn ich keine unterschiedlichen Thread-Pools für verschiedene Module verwenden kann, bedeutet das, dass ich parallele Streams in den meisten realen Situationen nicht sicher verwenden kann.

Probieren Sie das folgende Beispiel aus. Es gibt einige rechenintensive Aufgaben, die in separaten Threads ausgeführt werden. Die Aufgaben nutzen parallele Streams. Die erste Aufgabe ist fehlerhaft, daher dauert jeder Schritt 1 Sekunde (simuliert durch Thread-Sleep). Das Problem ist, dass andere Threads stecken bleiben und darauf warten, dass die fehlerhafte Aufgabe beendet wird. Dies ist ein konstruiertes Beispiel, aber stellen Sie sich eine Servlet-Anwendung vor und jemand gibt eine langlaufende Aufgabe im gemeinsamen Fork-Join-Pool ein.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //fehlerhafte Aufgabe
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));

        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

10voto

charlie Punkte 1440

Um die tatsächliche Anzahl der verwendeten Threads zu messen, können Sie Thread.activeCount() überprüfen:

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

Dies kann auf einem 4-Kern-CPU eine Ausgabe wie folgt erzeugen:

5 // common pool
23 // custom pool

Ohne .parallel() gibt es:

3 // common pool
4 // custom pool

9voto

Stefan Ferstl Punkte 4915

Bis jetzt habe ich die Lösungen, die in den Antworten auf diese Frage beschrieben sind, verwendet. Jetzt habe ich eine kleine Bibliothek namens Parallel Stream Support dafür erstellt:

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())

Aber wie @PabloMatiasGomez in den Kommentaren angemerkt hat, gibt es Nachteile hinsichtlich des Aufteilungsmechanismus von parallelen Streams, der stark von der Größe des gemeinsamen Pools abhängt. Siehe Parallel stream from a HashSet doesn't run in parallel.

Ich verwende diese Lösung nur, um separate Pools für verschiedene Arten von Arbeit zu haben, aber ich kann die Größe des gemeinsamen Pools nicht auf 1 setzen, auch wenn ich ihn nicht verwende.

6voto

Scott Langley Punkte 61

Hinweis: Es scheint eine Korrektur in JDK 10 implementiert zu sein, die sicherstellt, dass der benutzerdefinierte Thread-Pool die erwartete Anzahl von Threads verwendet.

Die Ausführung von Parallelstreams innerhalb eines benutzerdefinierten ForkJoinPools sollte den Parallelismus beachten https://bugs.openjdk.java.net/browse/JDK-8190974

5voto

Grzegorz Piwowarek Punkte 12514

Wenn Sie nicht auf Implementierungstricks angewiesen sein möchten, gibt es immer einen Weg, das Gleiche zu erreichen, indem Sie benutzerdefinierte Sammler implementieren, die die Semantik von map und collect kombinieren... und Sie wären nicht auf ForkJoinPool beschränkt:

list.stream()
  .collect(parallel(i -> process(i), executor, 4))
  .join()

Glücklicherweise ist dies bereits hier erledigt und auf Maven Central verfügbar: http://github.com/pivovarit/parallel-collectors

Haftungsausschluss: Ich habe es geschrieben und übernehme die Verantwortung dafür.

2voto

Dirk Hillbrecht Punkte 554

Die (derzeit) akzeptierte Antwort ist teilweise falsch. Es reicht nicht aus, einfach den Parallelstrom an den dedizierten Fork-Join-Pool zu submit(). In diesem Fall verwendet der Stream die Threads dieses Pools und zusätzlich den gemeinsamen Fork-Join-Pool und sogar den aufrufenden Thread, um die Arbeitslast des Streams zu bearbeiten, scheinbar bis zur Größe des gemeinsamen Fork-Join-Pools. Das Verhalten ist ein wenig seltsam, aber definitiv nicht das, was erforderlich ist.

Um die Arbeit tatsächlich vollständig auf den dedizierten Pool zu beschränken, müssen Sie sie in ein CompletableFuture einbetten:

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
  forkJoinPool = new ForkJoinPool(parallelism);
  final List primes = CompletableFuture.supplyAsync(() -> 
        // Parallele Aufgabe hier, zum Beispiel
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList()),
    forkJoinPool)  // <- übergibt dedizierten Fork-Join-Pool als Executor
    .join();  // <- Warte auf das Ergebnis vom forkJoinPool
    System.out.println(primes);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

Dieser Code bleibt bei allen Operationen in forkJoinPool sowohl in Java 8u352 als auch in Java 17.0.1.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X