Java CompletableFuture
Asynchrone Berechnungen mit CompletableFuture kombinieren — thenApply, thenCompose, allOf, exceptionally und typische Fallstricke.
Future ist ein einmaliger Ergebnisträger: Sie übergeben, Sie warten, Sie lesen. Verkettung ist nicht möglich. Wenn Sie „führe A aus, nutze dann das Ergebnis von A für B, kombiniere B mit C und übergebe es an D" ausdrücken möchten, ohne eine Zustandsmaschine von Hand zu schreiben, benötigen Sie CompletableFuture — Javas Neugestaltung der asynchronen Ergebnis-Idee aus Java 8, die auf Komposition ausgerichtet ist.
CompletableFuture<V> implementiert Future<V>, sodass die alte API vollständig erhalten bleibt. Das Neue ist die Kombinator-API: gut dreißig Methoden, mit denen Sie Datenflussgraphen aus asynchroner Arbeit aufbauen können — Funktionen anwenden, Nebeneffekte ausführen, mehrere Futures kombinieren, Ausnahmen behandeln, Timeouts setzen — ohne jemals einen Thread zu blockieren, um auf ein Zwischenergebnis zu warten.
Die Startmethoden
Normalerweise erstellen Sie ein CompletableFuture nicht direkt. Sie starten eine Pipeline mit einer dieser Methoden:
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> log("hello"));
CompletableFuture<String> c = CompletableFuture.completedFuture("ready");
CompletableFuture<String> d = CompletableFuture.failedFuture(new IOException("nope"));| Startmethode | Verhalten |
|---|---|
supplyAsync(Supplier) | Führt einen Supplier im Common Pool aus und gibt dessen Wert zurück |
runAsync(Runnable) | Führt ein Runnable im Common Pool aus, kein Rückgabewert |
completedFuture(v) | Ein bereits aufgelöstes Future mit dem angegebenen Wert |
failedFuture(t) | Ein bereits fehlgeschlagenes Future mit dem angegebenen Throwable |
supplyAsync und runAsync haben Überladungen, die einen expliziten Executor akzeptieren. Sie sollten fast immer einen übergeben. Der Standardwert ist ForkJoinPool.commonPool() — ein gemeinsamer Pool, der auf Ihre CPU-Anzahl ausgelegt ist, gut für kurze CPU-Arbeit, aber verheerend für I/O (ein langsamer Aufruf blockiert für alle einen Kern). Übergeben Sie für I/O oder Arbeit mit unbekannten Kosten immer einen expliziten Executor.
Verkettung: thenApply, thenAccept, thenRun
Die einfachsten Kombinatoren wandeln ein Future in ein anderes um:
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<String> b = a.thenApply(n -> "value is " + n); // transform
CompletableFuture<Void> c = a.thenAccept(n -> System.out.println(n)); // consume, no result
CompletableFuture<Void> d = a.thenRun(() -> System.out.println("done")); // side-effect, ignore value| Methode | Lambda-Typ | Rückgabe |
|---|---|---|
thenApply | Function<T,U> | CompletableFuture<U> |
thenAccept | Consumer<T> | CompletableFuture<Void> |
thenRun | Runnable | CompletableFuture<Void> |
Jede Methode hat drei Varianten:
thenApply(fn)— läuft auf dem Thread, der die vorherige Stufe abgeschlossen hatthenApplyAsync(fn)— läuft im Common PoolthenApplyAsync(fn, executor)— läuft auf einem bestimmten Executor
Die nicht-Async-Variante ist am schnellsten (kein Thread-Wechsel), bedeutet aber, dass Ihre fn auf dem Thread läuft, der die vorherige Stufe abgeschlossen hat — möglicherweise der I/O-Thread, den Sie nicht mit CPU-Arbeit belegen möchten. Die *Async-Varianten sind in heterogenen Pipelines die sicherere Standardwahl.
thenCompose — ein Future eines Futures flach machen
thenApply ist in Ordnung, wenn die Funktion einen einfachen Wert zurückgibt. Wenn sie ein weiteres CompletableFuture zurückgibt, möchten Sie kein CompletableFuture<CompletableFuture<V>> — Sie brauchen thenCompose:
CompletableFuture<User> user = lookupUser(id);
CompletableFuture<Profile> profile = user.thenCompose(u -> loadProfile(u.profileId()));
// ^ Function<User, CompletableFuture<Profile>>thenCompose ist flatMap für Futures. Verwenden Sie es, wenn der nächste Schritt selbst asynchron ist; verwenden Sie thenApply, wenn er es nicht ist.
Zwei Futures kombinieren: thenCombine
Wenn Sie zwei unabhängige asynchrone Werte haben und diese kombinieren möchten:
CompletableFuture<Integer> price = fetchPrice(symbol);
CompletableFuture<Integer> shares = fetchShares(account);
CompletableFuture<Integer> total = price.thenCombine(shares, (p, s) -> p * s);thenCombine wartet auf beide Eingaben und wendet dann eine BiFunction auf ihre Ergebnisse an. Die beiden Futures laufen parallel — price und shares sind bereits in Bearbeitung, wenn thenCombine registriert wird. Der Kombinator läuft auf dem Thread, der als zweites abschließt.
Die „Any"-Variante, applyToEither, nimmt das erste Ergebnis und ignoriert das zweite.
Viele Futures: allOf und anyOf
Wenn die Parallelität über eine Sammlung von Futures erfolgt:
List<CompletableFuture<String>> all = ids.stream()
.map(this::fetchAsync)
.toList();
CompletableFuture<Void> doneAll = CompletableFuture.allOf(all.toArray(new CompletableFuture[0]));
CompletableFuture<Object> firstOne = CompletableFuture.anyOf(all.toArray(new CompletableFuture[0]));allOf schließt ab, wenn jede Eingabe fertig ist. Es gibt CompletableFuture<Void> zurück — um tatsächlich die Ergebnisliste zu erhalten, müssen Sie thenApply verwenden und sie herausziehen:
CompletableFuture<List<String>> results = doneAll.thenApply(v ->
all.stream().map(CompletableFuture::join).toList()); // .join() never blocks here — they're all completeanyOf gibt den Wert des Inputs zurück, der als erster abschließt (als Object — es gibt keine Möglichkeit, „eines dieser typisierten Futures" mit einem Rückgabetyp auszudrücken).
Fehlerbehandlung: exceptionally und handle
Ein CompletableFuture kann fehlschlagen (jede Stufe, die eine Ausnahme wirft, erzeugt ein fehlgeschlagenes Future flussabwärts). Die Kombinatoren, die es wiederherstellen oder transformieren:
CompletableFuture<String> safe = riskyAsync()
.exceptionally(ex -> "fallback for: " + ex.getMessage());
CompletableFuture<String> either = riskyAsync()
.handle((value, ex) -> ex == null ? value : "fallback");| Methode | Wann sie läuft | Was sie zurückgibt |
|---|---|---|
exceptionally(fn) | Nur bei Fehler; erhält die Ursache | Wiederhergestellter Wert |
handle(bi) | Immer; erhält (value, ex) (eines ist null) | Transformierter Wert |
whenComplete(bi) | Immer; erhält (value, ex) | Dasselbe Future, nur Nebeneffekt |
exceptionally ist der einfache „Catch und Ersetzen"-Pfad. handle ist der allgemeinere „immer ausführen, basierend auf dem Ergebnis entscheiden"-Ansatz — nützlich, wenn Sie jeden Abschluss unabhängig vom Erfolg protokollieren möchten.
orTimeout und completeOnTimeout
Java 9 hat Timeouts direkt zur Futures-API hinzugefügt:
CompletableFuture<String> withDeadline = riskyAsync()
.orTimeout(2, TimeUnit.SECONDS); // completes exceptionally if not done in 2s
CompletableFuture<String> withDefault = riskyAsync()
.completeOnTimeout("fallback", 2, TimeUnit.SECONDS);Damit können Sie Fristen ausdrücken, ohne einen eigenen Watchdog zu schreiben. Sie verwenden interne geplante Threads und sind daher günstig anzuhängen.
Nicht in asynchronen Stufen blockieren
Der häufigste Fehler mit CompletableFuture: .get() oder .join() innerhalb einer Async-Stufe aufrufen. Das ist ein Thread des Executor-Pools, der untätig wartet auf einen anderen Thread desselben Pools — unter Last können Sie den gesamten Pool in einem Deadlock blockieren.
// WRONG — joining inside an async stage on the common pool
CompletableFuture.supplyAsync(() -> {
Integer x = anotherFuture().join(); // blocks a pool thread
return x * 2;
});
// RIGHT — compose instead of join
anotherFuture().thenApply(x -> x * 2);Wenn Sie innerhalb einer Async-Stufe nach .get() greifen, wollten Sie eigentlich thenCompose/thenApply verwenden.
Einen eigenen Executor verwenden
Der Common-Pool-Standard ist für kurze CPU-Arbeit geeignet. Für I/O oder alles, was blockieren könnte, verwenden Sie Ihren eigenen:
ExecutorService io = Executors.newFixedThreadPool(50, namedFactory("io"));
ExecutorService cpu = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), namedFactory("cpu"));
CompletableFuture.supplyAsync(this::loadFromDb, io)
.thenApplyAsync(this::transform, cpu)
.thenAcceptAsync(this::sendToClient, io);Jeder Schritt läuft auf dem richtigen Pool. Der Common Pool bleibt frei für parallelStream und andere Framework-Nutzung. Diese Art der Mischung ist der Kern eines gut verhaltenden asynchronen Java.
Ein praktisches Beispiel: eine kleine asynchrone Pipeline
Das folgende Programm ruft einen „Benutzer" und ein „Profil" parallel ab, kombiniert sie, setzt eine Frist und erholt sich von einem Fehlerpfad.
Was aus dem Lauf zu entnehmen ist:
- Abschnitt 1 verwendete
thenCombineauf zwei unabhängigen Abrufen. Sie liefen parallel —name(50 ms) undage(80 ms) waren bereits in Bearbeitung, bevor der Kombinator angehängt wurde. Das kombinierte Future schloss kurz nach dem Abschluss des langsameren ab. Das ist die Parallelität: Eine asynchrone Pipeline wartet nicht auf jeden Schritt, sie kombiniert die Schritte als Graphen. - Abschnitt 2 verwendete
thenCompose, um Schritte zu verketten, bei denen jeder Schritt selbst asynchron ist.thenApplyhätte einCompletableFuture<CompletableFuture<String>>ergeben — nutzlos.thenComposemacht es flach, so wieflatMapes für Streams undOptionaltut. - Abschnitt 3 verwendete
allOfüber eine Liste und dannthenApply, um die Werte herauszuziehen. DasallOfselbst gibtVoidzurück; das Ernte der Ergebnisse erfolgt in einem separaten Stream über die (jetzt abgeschlossenen) Futures mitjoin(). Diejoin()-Aufrufe blockieren hier nicht, weil dasallOfbereits abgeschlossen ist. - Abschnitt 4 zeigte
exceptionallybeim Erholen von einer geworfenen Aufgabe. Das vorgelagerte Future schlug fehl; das nachgelagerte Future gab den Fallback-String zurück. Ohneexceptionally(oderhandle) würde der Fehler alsCompletionExceptionzu.join()weitergegeben. - Abschnitt 5 verwendete
orTimeout, um eine 100-ms-Frist auf eine 500-ms-Aufgabe anzuwenden. Das Future schloss mitTimeoutExceptionab; dasjoinwarf es erneut innerhalb vonCompletionException. Dies ist die richtige Form für „Ich möchte dieses Ergebnis, aber nur, wenn es schnell genug ankommt." - Abschnitt 6 verwendete
handle, um in einem einzigen Schritt auf Erfolg/Fehler zu verzweigen.handleläuft immer und erhält sowohl(value, ex)— eines ist null. Nützlich, wenn Sie ein einheitliches Ende der Pipeline möchten, unabhängig davon, ob die Arbeit erfolgreich war.
Nächste Schritte
Das nächste Kapitel, Java Fork/Join, behandelt den ForkJoinPool — den Work-Stealing-Pool, der parallele Streams und den CompletableFuture-Common-Pool unterstützt, und das richtige Werkzeug für Teile-und-Herrsche-CPU-Arbeit.