Java Parallele Streams
Java Streams parallel verarbeiten — wann parallelStream hilft und wann es langsamer wird.
Ein paralleler Stream ist dieselbe Stream-Pipeline, die Sie bisher geschrieben haben, aber die JVM darf die Quelle in Teilmengen aufteilen und auf mehreren Threads verarbeiten. Die Änderung an der Aufrufstelle ist minimal:
long total = nums.parallelStream().mapToLong(n -> heavy(n)).sum();
// ^^^^^^^^^^^^^^^^^oder:
long total = nums.stream().parallel().mapToLong(n -> heavy(n)).sum();Die Pipeline-Struktur, die Operationen, das Ergebnis — alles bleibt unverändert. Was sich ändert, ist wer die Arbeit erledigt: Anstatt dass ein Thread die Quelle durchläuft, teilen sich mehrere Worker aus dem gemeinsamen ForkJoinPool (einer pro CPU-Kern, minus eins) die Arbeit, und ein Koordinator führt die Teilergebnisse zusammen. Wenn die Arbeit pro Element groß genug ist und die Quelle sauber aufgeteilt werden kann, wird die Pipeline in etwa Wanduhrzeit / Kerne fertig. Wenn nicht, ist parallel langsamer als sequenziell — und manchmal fehlerhaft. Dieses Kapitel erklärt, wie man den Unterschied erkennt.
Was "parallel" tatsächlich bewirkt
Ein sequenzieller Stream zieht ein Element nach dem anderen durch die Pipeline. Ein paralleler Stream:
- Teilt die Quelle mithilfe des
Spliteratorder Quelle in Teil-Streams auf. Arrays,ArrayList,IntStream.rangeund ähnliche Quellen lassen sich sauber in O(1) aufteilen.LinkedList,Files.lines,Stream.iterateundStream.generateteilen sich schlecht auf oder verweigern die Aufteilung ganz. - Führt die Zwischenkette jedes Teil-Streams auf einem Worker-Thread aus dem gemeinsamen Pool aus.
- Führt die Teilergebnisse zusammen — bei
reduceundcollectist dercombinerdafür zuständig.
forEach in einem parallelen Stream ruft Ihren Consumer von mehreren Threads gleichzeitig und in unbestimmter Reihenfolge auf. forEachOrdered bewahrt die Begegnungsreihenfolge auf Kosten der Synchronisation. findFirst ist in parallel teurer als findAny aus demselben Grund — es muss koordinieren, um den ersten Treffer zu ermitteln.
Der Vertrag — was Ihre Pipeline erfüllen muss
Parallel liefert nur dann ein korrektes Ergebnis, wenn die Pipeline drei Regeln befolgt. Sequenzieller Code, der sie zufällig ignoriert, funktioniert trotzdem; paralleler Code, der das tut, produziert stillschweigend Unsinn.
- Der Reduzierer muss assoziativ sein.
f(f(a, b), c) == f(a, f(b, c)).+,*,max,min, Mengenvereinigung, Listenkonkatenation qualifizieren sich alle. Subtraktion, Division, "erster Treffer" und "Liste-Anhängen mit Reihenfolge" nicht. Wenn Sie einen nicht-assoziativenBinaryOperatoranreduceoderCollectors.reducingübergeben, hängt das Ergebnis davon ab, wie die JVM zufällig aufteilt. - Die Pipeline muss zustandslos sein. Ihre Lambdas dürfen keinen gemeinsamen veränderlichen Zustand lesen oder schreiben. Ein Lambda, das eine äußere
ArrayListerfasst und mutiert, einen äußerenint[]inkrementiert oder einen nicht-atomaren Zähler verwendet, wird parallel eine Race Condition erzeugen. - Die Pipeline muss frei von Nebeneffekten sein. Logging ist in Ordnung; Persistieren über eine thread-sichere Senke ist in Ordnung; alles andere ist ein Fehler, der darauf wartet, dass ein Worker es anders verschachtelt.
Die in Collectors eingebauten Collectors erfüllen die Punkte 1–3 per Konstruktion (wenn sie wie dokumentiert verwendet werden). Ihre eigenen Lambdas in map, filter, reduce und peek sind die, auf die man achten muss.
Wann parallel hilft (und wann nicht)
Ein paralleler Stream gewinnt nur, wenn die Arbeit pro Element groß genug ist, um die Koordinationskosten — Aufteilen, Planen, Zusammenführen und den Overhead des Frameworks — zu überwiegen. Ein grobes mentales Modell:
- Große Quelle + CPU-gebundene Arbeit pro Element + günstige Zusammenführung + aufteilbare Quelle = parallel gewinnt oft. Bildverarbeitung pro Pixel, Parsen pro Datensatz, Hashing pro Datei — klassische Fälle.
- Winzige Quelle = sequenziell gewinnt. Das Aufwecken des Pools ist teurer als die gesamte Berechnung.
- Günstige Arbeit pro Element = sequenziell gewinnt.
nums.stream().mapToInt(Integer::intValue).sum()ist schneller als seinparallelStream()-Pendant, bisnumsin die Millionen geht; bei kleinen Größen dominiert der Framework-Overhead. - Blockierendes I/O pro Element = parallele Streams sind das falsche Werkzeug. Der gemeinsame
ForkJoinPoolist für CPU-Arbeit ausgelegt; ein blockierender I/O-Aufruf bindet einen Worker und verhungert jeden anderen parallelen Stream in der JVM (einschließlich derer aus Bibliotheken). Verwenden SieCompletableFuturemit einem begrenzten Executor für I/O-Fan-out. - Nicht-aufteilbare Quelle = parallel fällt entweder auf sequenziell zurück oder teilt schlecht auf.
Files.lines,Stream.iterate,Stream.generateundLinkedList.stream()sind die kanonisch schlechten Aufteiler; Arrays,ArrayListundIntStream.rangesind die kanonisch guten.
Der ehrliche Rat: Standardmäßig sequenziell; wechseln Sie zu parallel nur, wenn Sie einen gemessenen Grund dafür haben, mit jmh- oder Wanduhr-Zahlen in der Hand.
Operationen, die sich in parallel merkwürdig verhalten
Einige Operationen, deren Bedeutung sich ändert, wenn die Pipeline parallel wird:
forEach— läuft von mehreren Threads in unbestimmter Reihenfolge. Wenn Reihenfolge wichtig ist, verwenden SieforEachOrdered(was Synchronisation kostet).findFirst— muss Worker koordinieren, um den ersten Treffer in Begegnungsreihenfolge zu identifizieren. Verwenden SiefindAny, wenn es egal ist, welcher Treffer gewinnt.limit/skip— wohldefiniert auf geordneten Streams, aber in parallel teurer, weil die JVM die Reihenfolge einhalten muss. Auf einem parallelen Stream, bei dem die Reihenfolge keine Rolle spielt, iststream.parallel().unordered().limit(n)günstiger.distinct/sorted— muss Worker koordinieren; der Puffer, den sie verwenden, ist gemeinsam genutzt.reducemit der 3-Arg-Überladung verwendet dencombiner, um Worker-Ausgaben zusammenzuführen. Bei der 2-Arg-Überladung verwendet die JVM die Identität zweimal plus den Akkumulator — gleicher Vertrag, gleiche Assoziativitätsregel.collect—Collectorssind für den parallelen Einsatz konzipiert; der Haken ist, dass der Ergebnis-Container eine reguläreHashMapoderArrayListsein könnte, und die parallele Collection koordiniert intern, um das sicherzustellen. Ihre eigenen Downstream-Collectors müssen den Vertrag einhalten.
Die Shared-State-Falle in konkreter Form
Der häufigste Fehler in anfänglichem parallelen Code:
// WRONG -- looks fine, races in parallel
List<String> shouts = new ArrayList<>();
words.parallelStream().forEach(w -> shouts.add(w.toUpperCase()));ArrayList.add ist nicht thread-sicher; konkurrierende Worker verlieren entweder Elemente, fügen doppelt ein, werfen ArrayIndexOutOfBoundsException oder beschädigen die Liste stillschweigend. Die richtige Version drückt das Ergebnis als Ausgabe der Pipeline aus, nicht als Nebeneffekt:
List<String> shouts = words.parallelStream().map(String::toUpperCase).toList();toList() ist wie jeder andere Collector und jedes Terminal, das einen Wert produziert, für den parallelen Einsatz konzipiert. In dem Moment, in dem Sie nach einem forEach greifen, das eine äußere Variable mutiert, haben Sie den sicheren Weg verlassen.
Wenn Sie wirklich eine thread-sichere Senke für forEach benötigen, verwenden Sie eine ConcurrentLinkedQueue, AtomicLong, LongAdder oder Collections.synchronizedList(...). Aber fast immer ist die richtige Antwort: "Verwenden Sie forEach nicht zur Akkumulation; lassen Sie die Pipeline das Ergebnis erstellen."
ForkJoinPool und warum er wichtig ist
Standardmäßig teilen sich alle parallelen Streams in Ihrer JVM den gemeinsamen Pool, der auf Runtime.getRuntime().availableProcessors() - 1 Worker-Threads ausgelegt ist. Das hat zwei Konsequenzen:
- Ein lang laufender paralleler Stream monopolisiert den Pool. Jeder andere parallele Stream — einschließlich derer in Bibliotheken — stellt sich dahinter an.
- Ein paralleler Stream, der blockiert (I/O, Locks,
Thread.sleep), bindet einen Worker-Thread ohne nützliche Arbeit zu leisten und halbiert die effektive Größe des Pools, während er wartet.
Sie können für eine einmalige Pipeline einen privaten Pool widmen:
try (var pool = new java.util.concurrent.ForkJoinPool(4)) {
long total = pool.submit(() ->
nums.parallelStream().mapToLong(n -> heavy(n)).sum()
).get();
}Dies ist der richtige Schritt für lang laufende Berechnungen, die Sie nicht mit dem Rest der JVM teilen möchten. Es ist immer noch der falsche Schritt für blockierendes I/O — wechseln Sie stattdessen zu virtuellen Threads oder einer expliziten CompletableFuture-Kette auf einem begrenzten I/O-Executor.
Ein durchgearbeitetes Beispiel: parallele Beschleunigung, die Shared-State-Falle und ein Assoziativitätsfehler
Das folgende Programm misst die Zeit für sequenziell vs. parallel bei einer CPU-gebundenen IntStream-Summe, demonstriert die Shared-State-Race mit forEach, zeigt die korrekte Collector-basierte Version und kontrastiert assoziative (Integer::sum) mit nicht-assoziativen ((a, b) -> a - b) Reduzierern unter parallel.
Was man aus dem Durchlauf mitnimmt:
- Die parallele Summe lieferte dasselbe Ergebnis wie die sequenzielle und war (auf jedem Mehrkernrechner) in einem Bruchteil der Wanduhrzeit fertig. Der
heavy-Aufruf pro Element ist CPU-gebunden und die Quelle (einint[]) lässt sich sauber aufteilen — die zwei Zutaten, die parallel braucht. - Das
forEach, dasbadSinkmutierte, hat entweder Elemente verloren oder ist abgestürzt. Es gibt keine Lösung, die hier einsynchronizedhinzufügt, ohne die parallele Version langsamer als die sequenzielle zu machen. Die Lösung ist, keinforEachzur Akkumulation zu schreiben — verwenden Sie einen Collector oder ein Terminal, das das Ergebnis produziert. Integer::sumist assoziativ; die parallele Reduktion lieferte dieselbe Antwort wie die sequenzielle. Das nicht-assoziative(a, b) -> a - blieferte unterschiedliche Antworten in sequenziell vs. parallel, weil die JVM frei ist, in jeder assoziativ-äquivalenten Reihenfolge aufzuteilen und zusammenzuführen. Gleicher Code, zwei Antworten — das Symptom, das jeder Parallel-Streams-Fehler letztendlich erzeugt.parallel().forEach(...)druckte0..15in einer nicht-monotonen Reihenfolge;parallel().forEachOrdered(...)druckte sie geordnet auf Kosten der Cross-Worker-Synchronisation. Wenn IhrforEachdie Reihenfolge berücksichtigt, bezahlen Sie dafür.- Der private
ForkJoinPool(2)führte die Pipeline gegen einen dedizierten Pool aus. Verwenden Sie das, wenn Sie einen lang laufenden Rechenauftrag haben und nicht möchten, dass er den gemeinsamen Pool mit dem Rest der JVM teilt. Verwenden Sie es nicht als Pflaster für blockierendes I/O — das ist ein anderes Problem mit einem anderen Werkzeug.
Was kommt als Nächstes
Sie können jetzt über jede Stream-Pipeline nachdenken: wann Sie eine schreiben sollten, wie man sie aufbaut, was lazy ist, was kurzschließt, was sicher parallel läuft und was nicht. Eine zentrale Abstraktion steht noch auf dem Programm — diejenige, die einer Pipeline erlaubt, "dieser Wert könnte fehlen" ohne ein einziges null auszudrücken. Das nächste Kapitel, Java Optional, behandelt Optional<T> — was es ist, wo die Stream-API ihre losen Enden hinterlässt, und wie man map, flatMap, orElse und ifPresent verwendet, um Code zu schreiben, der von Grund auf null-sicher ist.