484 Stimmen

Benutzerdefinierter Thread-Pool im Java 8-Parallels stream

Ist es möglich, einen benutzerdefinierten Thread-Pool für Java 8 parallelen Stream anzugeben? Ich kann es nirgendwo finden.

Stellen Sie sich vor, ich habe eine Serveranwendung und möchte parallele Streams verwenden. Aber die Anwendung ist groß und mehrfädig, daher möchte ich sie aufteilen. Ich möchte nicht, dass eine langsame laufende Aufgabe in einem Modul der Anwendung andere Aufgaben aus einem anderen Modul blockiert.

Wenn ich keine unterschiedlichen Thread-Pools für verschiedene Module verwenden kann, bedeutet das, dass ich parallele Streams in den meisten realen Situationen nicht sicher verwenden kann.

Probieren Sie das folgende Beispiel aus. Es gibt einige rechenintensive Aufgaben, die in separaten Threads ausgeführt werden. Die Aufgaben nutzen parallele Streams. Die erste Aufgabe ist fehlerhaft, daher dauert jeder Schritt 1 Sekunde (simuliert durch Thread-Sleep). Das Problem ist, dass andere Threads stecken bleiben und darauf warten, dass die fehlerhafte Aufgabe beendet wird. Dies ist ein konstruiertes Beispiel, aber stellen Sie sich eine Servlet-Anwendung vor und jemand gibt eine langlaufende Aufgabe im gemeinsamen Fork-Join-Pool ein.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //fehlerhafte Aufgabe
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));

        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

507voto

Lukas Punkte 12650

Es gibt tatsächlich einen Trick, wie man eine parallele Operation in einem spezifischen Fork-Join-Pool ausführen kann. Wenn Sie es als Aufgabe in einem Fork-Join-Pool ausführen, bleibt es dort und verwendet nicht den gemeinsamen Pool.

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List primes = forkJoinPool.submit(() ->
        // Parallele Aufgabe hier, zum Beispiel
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

Der Trick basiert auf ForkJoinTask.fork, der besagt: "Ordnet die asynchrone Ausführung dieser Aufgabe im Pool an, in dem die aktuelle Aufgabe ausgeführt wird, falls zutreffend, oder unter Verwendung des ForkJoinPool.commonPool(), wenn nicht inForkJoinPool()"

247voto

assylias Punkte 308529

Die parallelen Streams verwenden standardmäßig den ForkJoinPool.commonPool, der standardmäßig eine Thread weniger hat als Prozessoren, wie von Runtime.getRuntime().availableProcessors() zurückgegeben wird (Das bedeutet, dass parallele Streams einen Prozessor für den aufrufenden Thread freilassen).

Für Anwendungen, die separate oder benutzerdefinierte Pools benötigen, kann ein ForkJoinPool mit einem festgelegten Ziel-Parallelitätsgrad erstellt werden; standardmäßig gleich der Anzahl der verfügbaren Prozessoren.

Dies bedeutet auch, dass wenn Sie verschachtelte parallele Streams oder mehrere parallele Streams gleichzeitig starten, sie alle den gleichen Pool nutzen. Vorteil: Sie werden nie mehr als die Standardanzahl (verfügbare Prozessoren) verwenden. Nachteil: Sie erhalten möglicherweise nicht "alle Prozessoren", die jedem parallelen Stream zugewiesen werden, den Sie starten (falls Sie mehr als einen haben). (Offenbar können Sie einen ManagedBlocker verwenden, um das zu umgehen.)

Um die Art und Weise zu ändern, wie parallele Streams ausgeführt werden, können Sie entweder

  • die Ausführung des parallelen Streams an Ihren eigenen ForkJoinPool übergeben: IhrFJP.submit(() -> stream.parallel().forEach(macheEtwas)).get(); oder
  • Sie können die Größe des Common Pools mithilfe von Systemeigenschaften ändern: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") für eine Ziel-Parallelität von 20 Threads.

Beispiel für Letzteres auf meinem Rechner mit 8 Prozessoren. Wenn ich das folgende Programm ausführe:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

Die Ausgabe ist:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

Sie können sehen, dass der parallele Stream jeweils 8 Elemente verarbeitet, d.h. er verwendet 8 Threads. Wenn ich jedoch die auskommentierte Zeile aktiviere, lautet die Ausgabe:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

Diesmal hat der parallele Stream 20 Threads verwendet und alle 20 Elemente im Stream wurden gleichzeitig verarbeitet.

53voto

Mario Fusco Punkte 12920

Alternativ zum Trick, die parallele Berechnung in Ihrem eigenen forkJoinPool auszulösen, können Sie diesen Pool auch an die CompletableFuture.supplyAsync-Methode übergeben, wie in:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);

24voto

Tod Casasent Punkte 583

Die originale Lösung (Einstellung der ForkJoinPool common parallelism Eigenschaft) funktioniert nicht mehr. Wenn man sich die Links in der Originalantwort ansieht, wurde ein Update zurückportiert, das dies in Java 8 auch kaputt macht. Wie in den verlinkten Threads erwähnt, war diese Lösung nicht garantiert für immer zu funktionieren. Basierend darauf ist die Lösung die forkjoinpool.submit mit .get Lösung, die in der akzeptierten Antwort diskutiert wird. Ich denke, der Backport behebt auch die Unzuverlässigkeit dieser Lösung.

ForkJoinPool fjpool = new ForkJoinPool(10);
System.out.println("stream.parallel");
IntStream range = IntStream.range(0, 20);
fjpool.submit(() -> range.parallel()
        .forEach((int theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();
System.out.println("list.parallelStream");
int [] array = IntStream.range(0, 20).toArray();
List list = new ArrayList<>();
for (int theInt: array)
{
    list.add(theInt);
}
fjpool.submit(() -> list.parallelStream()
        .forEach((theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();

18voto

KayV Punkte 11233

Wir können die Standardparallelität mit der folgenden Eigenschaft ändern:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=16

die bis zu mehr Parallelität einstellen kann.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X