Java Fork/Join-Framework
Arbeit rekursiv aufteilen und parallelisieren mit dem Fork/Join-Framework — ForkJoinPool, RecursiveTask, Work-Stealing.
Ein normaler Thread-Pool ist hervorragend für „viele unabhängige Aufgaben". Nicht so gut für „eine große Aufgabe, die rekursiv in kleinere Versionen ihrer selbst aufgeteilt werden kann". Für diese zweite Form — Divide-and-Conquer-Arbeit — bietet Java einen spezialisierten Executor: den ForkJoinPool. Er ist der Pool hinter parallelStream, CompletableFuture.supplyAsync (wenn kein Executor angegeben wird) sowie jedem Code, den Sie mit RecursiveTask/RecursiveAction schreiben.
Der Trick, der ForkJoinPool besonders macht, ist Work-Stealing: Jeder Worker hat seine eigene Deque, und wenn diese leer ist, stiehlt er eine Aufgabe vom unteren Ende der Deque eines anderen Workers. Das Ergebnis ist automatischer Lastausgleich — schnelle Worker helfen langsamen Workers ohne jede Koordination.
Wann man es verwenden sollte
Fork/Join ist das richtige Werkzeug für:
- Rekursives Divide-and-Conquer. Quicksort, Mergesort, Baumdurchläufe, rekursive numerische Algorithmen, Matrixmultiplikation durch Halbierung.
- CPU-gebundene Arbeit, die in annähernd gleich große Teile parallelisierbar ist.
- Arbeit, deren Granularität sich anpasst: aufteilen, wenn der Abschnitt groß ist; direkt ausführen, wenn er klein ist.
Es ist das falsche Werkzeug für:
- I/O-gebundene Arbeit. Ein blockierter Worker stiehlt nicht — und die Standardgröße des Pools entspricht Ihrer CPU-Anzahl. Blockieren Sie einen Worker, verlieren Sie einen Kern.
- Unabhängige, unzusammenhängende Aufgaben. Ein normaler
ThreadPoolExecutorist für diese Form einfacher und genauso schnell. - Aufgaben, die von einem festen externen Zeitplan abhängen. Verwenden Sie
ScheduledExecutorService.
Ein nützliches mentales Modell: Wenn Sie dazu neigen würden, parallelStream zu verwenden, ist Fork/Join dieselbe Form, direkt ausgedrückt. (Fork/Join kam in Java 7; parallelStream in Java 8 wurde darauf aufgebaut.)
Die drei Klassen
ForkJoinPool pool; // the executor
RecursiveTask<V>; // an abstract task returning V
RecursiveAction; // an abstract task returning nothingSie erweitern RecursiveTask oder RecursiveAction, überschreiben compute(), entscheiden innerhalb von compute(), ob aufgeteilt oder die Arbeit direkt erledigt werden soll, und rufen fork()/join() für die Teilaufgaben auf.
class Sum extends RecursiveTask<Long> {
private static final int THRESHOLD = 1000;
private final long[] data;
private final int lo, hi;
Sum(long[] data, int lo, int hi) {
this.data = data; this.lo = lo; this.hi = hi;
}
@Override
protected Long compute() {
int len = hi - lo;
if (len <= THRESHOLD) {
long s = 0;
for (int i = lo; i < hi; i++) s += data[i];
return s;
}
int mid = lo + len / 2;
Sum left = new Sum(data, lo, mid);
Sum right = new Sum(data, mid, hi);
left.fork(); // schedule left to run on another worker
long rightResult = right.compute(); // run right on this worker (avoid extra task)
long leftResult = left.join(); // wait for left
return leftResult + rightResult;
}
}
ForkJoinPool pool = new ForkJoinPool();
long total = pool.invoke(new Sum(data, 0, data.length));Die Form — Schwellenwert prüfen → aufteilen → eine Hälfte forken → die andere berechnen → joinen — ist das kanonische Fork/Join-Idiom. Der Trick, „eine Hälfte hier zu berechnen statt beide zu forken", vermeidet die Erstellung einer unnötigen Aufgabe und ist ein kleiner, aber realer Gewinn.
Der Schwellenwert ist entscheidend
Die wichtigste Entscheidung: wann das Aufteilen beendet werden soll. Ein zu kleiner Schwellenwert führt zu Tausenden von Aufgaben für triviale Teile — der Overhead dominiert die eigentliche Arbeit. Ein zu großer Schwellenwert verhindert die vollständige Nutzung der Kerne — viele Workers stehen untätig, während einer einen großen Abschnitt verarbeitet.
Faustregeln:
- Der Aufgabenkörper sollte mindestens 10 Mikrosekunden dauern. Darunter ist der Aufgabenverwaltungs-Overhead vergleichbar mit der eigentlichen Arbeit.
- Machen Sie den Schwellenwert zu einer Konstante, die Sie anpassen können.
100,1000,10000sind typisch für primitive Arrays; die richtige Zahl hängt von den Kosten pro Element ab. - Für sehr kleine Eingaben fällt man auf eine rein serielle Implementierung zurück. Der Split-and-Fork-Overhead ist bei Eingaben, die in den Cache passen, verschwendet.
fork(), join(), invoke()
Die drei Operationen auf einem RecursiveTask:
| Methode | Verhalten |
|---|---|
fork() | Aufgabe im aktuellen Pool einplanen; sofort zurückkehren |
join() | Auf die Aufgabe warten und ihr Ergebnis zurückgeben (oder die geworfene Ausnahme weiterwerfen) |
invoke() | Kombination aus fork + join für diesen Thread — synchron |
compute() | Den Körper direkt im aufrufenden Thread ausführen (kein Fork) |
Im obigen Muster macht left.fork(); right.compute(); left.join(); das Richtige — eine Hälfte zu einem anderen Worker forken, die andere Hälfte hier ausführen, dann auf den Fork warten.
Sie sollten nicht left.fork(); right.fork(); left.join(); right.join(); schreiben. Die rechte Seite wird geforkt und der aktuelle Worker wartet — es gibt keinen Ausführungs-Thread, der right tatsächlich ausführen kann, bis der Worker join erreicht. Diese Kombination verschwendet den Zeitslot des aktuellen Workers.
Der gemeinsame Pool
ForkJoinPool.commonPool() ist ein JVM-weiter gemeinsamer Pool, standardmäßig auf Runtime.getRuntime().availableProcessors() - 1 dimensioniert. Er treibt an:
Stream.parallelStream()CompletableFuture.supplyAsync(supplier)(die Überladung ohne Executor)Arrays.parallelSort()
Sie können die Größe des gemeinsamen Pools über die Systemeigenschaft java.util.concurrent.ForkJoinPool.common.parallelism beim JVM-Start konfigurieren. Sie sollten den gemeinsamen Pool nicht für I/O verwenden — ein einzelner blockierender Aufruf belegt einen Worker, den die gesamte JVM teilt.
Work-Stealing bildlich dargestellt
worker-1 deque: [t1 t2 t3 t4] (it forked these; t4 just got pushed)
worker-2 deque: [] (empty — workers steal)
worker-3 deque: [t10 t11] (still has its own)
worker-2 finds its deque empty; steals t1 from the BOTTOM of worker-1's deque
worker-1 keeps pulling its own tasks from the TOPDie doppelseitige Queue ist das Herzstück des Designs: Workers schieben und holen von einem Ende (LIFO — Lokalität der Referenz für Cache-Treffer), Diebe nehmen vom anderen Ende (FIFO — minimaler Konflikt mit dem Besitzer). Deshalb erzielt Fork/Join so gute Ergebnisse: Workers konkurrieren selten um die Datenstrukturen des anderen, selbst unter hoher Last.
Ein ausgearbeitetes Beispiel: parallele Summe vs. seriell
Das folgende Programm summiert ein Array mit 10 Millionen Elementen auf zwei Arten — serieller Loop und Fork/Join-Rekursion — und gibt die Wanduhrzeit für jede aus.
Was man aus dem Durchlauf mitnehmen sollte:
- Die Fork/Join-Version war mehrfach schneller als der serielle Loop. Auf einer
N-Kern-Maschine ist die Obergrenze grobN×— die tatsächliche Zahl war niedriger, weil JVM, GC und andere JVM-Threads ebenfalls CPU beanspruchten und die Schwellenwertarbeit nicht perfekt ausbalanciert ist. Trotzdem ist das eine erhebliche Beschleunigung für wenige Zeilen rekursiven Codes. - Beide Summen waren gleich. Das ist die Partition-and-Merge-Korrektheitsprüfung: Jedes Blatt summierte seinen nicht überlappenden Abschnitt; der Kombinationsschritt (
l + r) addierte sie; kein Doppelzählen oder Datenrennen, weil jedes Blatt in seine eigene lokale Variable schrieb. - Die
SumTiny-Variante mit Schwellenwert10war langsamer als der serielle Loop. Bei 10M Elementen, die auf Abschnitte von 10 aufgeteilt werden, entstehen etwa 2M Aufgaben — und der Aufgabenverwaltungs-Overhead überwiegt die eigentliche Additionsarbeit deutlich. Der Schwellenwert ist ein echter Einstellknopf; messen Sie ihn mit repräsentativen Eingaben. - Das Muster
left.fork(); long r = right.compute(); long l = left.join();verwendete eine Aufgabe weniger alsfork(); fork(); join(); join();. Der aktuelle Worker hat freie Zeit während descompute()— diese für eine der Hälften zu nutzen spart eine gesamte Aufgabenzuteilung. Das ist der kleine, aber kumulative Gewinn bei vielen realen Workloads. ForkJoinPool.commonPool()war der in dieser Demo verwendete Executor. Für einen einmaligen Durchlauf ist der gemeinsame Pool in Ordnung. Für ein langläufiges Programm, das Fork/Join-Arbeit mit Stream-parallel-Aufrufen und asynchronen Futures mischt, geben Sie dem schweren Fork/Join-Workload seinen eigenen Pool — der gemeinsame Pool ist für kleine Bursts gedacht, nicht für dauerhaft schwere Berechnungen.
Was als Nächstes kommt
Das nächste Kapitel, Java Concurrent Collections, behandelt die Datenstrukturen, die darauf ausgelegt sind, von mehreren Threads gleichzeitig genutzt zu werden — ConcurrentHashMap, CopyOnWriteArrayList, BlockingQueue und den Rest von java.util.concurrent.