484 Stimmen

Benutzerdefinierter Thread-Pool im Java 8-Parallels stream

Ist es möglich, einen benutzerdefinierten Thread-Pool für Java 8 parallelen Stream anzugeben? Ich kann es nirgendwo finden.

Stellen Sie sich vor, ich habe eine Serveranwendung und möchte parallele Streams verwenden. Aber die Anwendung ist groß und mehrfädig, daher möchte ich sie aufteilen. Ich möchte nicht, dass eine langsame laufende Aufgabe in einem Modul der Anwendung andere Aufgaben aus einem anderen Modul blockiert.

Wenn ich keine unterschiedlichen Thread-Pools für verschiedene Module verwenden kann, bedeutet das, dass ich parallele Streams in den meisten realen Situationen nicht sicher verwenden kann.

Probieren Sie das folgende Beispiel aus. Es gibt einige rechenintensive Aufgaben, die in separaten Threads ausgeführt werden. Die Aufgaben nutzen parallele Streams. Die erste Aufgabe ist fehlerhaft, daher dauert jeder Schritt 1 Sekunde (simuliert durch Thread-Sleep). Das Problem ist, dass andere Threads stecken bleiben und darauf warten, dass die fehlerhafte Aufgabe beendet wird. Dies ist ein konstruiertes Beispiel, aber stellen Sie sich eine Servlet-Anwendung vor und jemand gibt eine langlaufende Aufgabe im gemeinsamen Fork-Join-Pool ein.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //fehlerhafte Aufgabe
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));

        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

1voto

user_3380739 Punkte 1495

Gehen Sie zu abacus-common. Die Thread-Nummer kann für den parallelen Stream angegeben werden. Hier ist der Beispielcode:

LongStream.range(4, 1_000_000).parallel(threadNum)...

Hinweis: Ich bin der Entwickler von abacus-common.

1voto

Martin Vseticka Punkte 26325

Wenn Sie keinen benutzerdefinierten ThreadPool benötigen, sondern die Anzahl der gleichzeitigen Aufgaben begrenzen möchten, können Sie Folgendes verwenden:

List paths = List.of("/path/file1.csv", "/path/file2.csv", "/path/file3.csv").stream().map(e -> Paths.get(e)).collect(toList());
List> partitions = Lists.partition(paths, 4); // Guava-Methode

partitions.forEach(group -> group.parallelStream().forEach(csvFilePath -> {
       // führen Sie Ihre Verarbeitung durch   
}));

(Die doppelte Frage dazu ist gesperrt, also halten Sie bitte durch)

1voto

Borislav Stoilov Punkte 2995

Hier ist, wie ich das oben genannte Maximalgewindenzählerflag programmgesteuert einstelle, und ein Codeausschnitt, um zu überprüfen, ob der Parameter berücksichtigt wird

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2");
Set threadNames = Stream.iterate(0, n -> n + 1)
  .parallel()
  .limit(100000)
  .map(i -> Thread.currentThread().getName())
  .collect(Collectors.toSet());
System.out.println(threadNames);

// Ausgabe -> [ForkJoinPool.commonPool-worker-1, Test worker, ForkJoinPool.commonPool-worker-3]

0voto

John McClean Punkte 5017

Wenn Sie nichts dagegen haben, eine Drittanbieter-Bibliothek zu verwenden, können Sie mit cyclops-react sequenzielle und parallele Streams innerhalb desselben Pipelines mischen und benutzerdefinierte ForkJoinPools bereitstellen. Zum Beispiel

 ReactiveSeq.range(1, 1_000_000)
            .foldParallel(new ForkJoinPool(10),
                          s->s.filter(i->true)
                              .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
                              .max(Comparator.naturalOrder()));

Oder wenn wir die Verarbeitung innerhalb eines sequenziellen Streams fortsetzen möchten

 ReactiveSeq.range(1, 1_000_000)
            .parallel(new ForkJoinPool(10),
                      s->s.filter(i->true)
                          .peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
            .map(this::processSequentially)
            .forEach(System.out::println);

[Offenlegung Ich bin der leitende Entwickler von cyclops-react]

0voto

Hearen Punkte 6799

Ich habe versucht, den benutzerdefinierten ForkJoinPool wie folgt anzupassen, um die Poolgröße anzupassen:

private static Set ThreadNameSet = new HashSet<>();
private static Callable getSum() {
    List aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
    return () -> aList.parallelStream()
            .peek((i) -> {
                String threadName = Thread.currentThread().getName();
                ThreadNameSet.add(threadName);
            })
            .reduce(0L, Long::sum);
}

private static void testForkJoinPool() {
    final int parallelism = 10;

    ForkJoinPool forkJoinPool = null;
    Long result = 0L;
    try {
        forkJoinPool = new ForkJoinPool(parallelism);
        result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown(); //always remember to shutdown the pool
        }
    }
    out.println(result);
    out.println(ThreadNameSet);
}

Hier ist die Ausgabe, die besagt, dass der Pool mehr Threads als die Standardanzahl von 4 verwendet.

50000005000000
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]

Aber tatsächlich gibt es einen Sonderling, als ich versuchte, das gleiche Ergebnis mit ThreadPoolExecutor wie folgt zu erreichen:

BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));

aber ich habe versagt.

Es wird nur den parallelStream in einem neuen Thread starten und dann ist alles andere genauso, was erneut beweist, dass der parallelStream den ForkJoinPool verwenden wird, um seine Threads zu starten.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X