W3docs

Java CompletableFuture

Asynchrone Berechnungen mit CompletableFuture kombinieren — thenApply, thenCompose, allOf, exceptionally und typische Fallstricke.

Future ist ein einmaliger Ergebnisträger: Sie übergeben, Sie warten, Sie lesen. Verkettung ist nicht möglich. Wenn Sie „führe A aus, nutze dann das Ergebnis von A für B, kombiniere B mit C und übergebe es an D" ausdrücken möchten, ohne eine Zustandsmaschine von Hand zu schreiben, benötigen Sie CompletableFuture — Javas Neugestaltung der asynchronen Ergebnis-Idee aus Java 8, die auf Komposition ausgerichtet ist.

CompletableFuture<V> implementiert Future<V>, sodass die alte API vollständig erhalten bleibt. Das Neue ist die Kombinator-API: gut dreißig Methoden, mit denen Sie Datenflussgraphen aus asynchroner Arbeit aufbauen können — Funktionen anwenden, Nebeneffekte ausführen, mehrere Futures kombinieren, Ausnahmen behandeln, Timeouts setzen — ohne jemals einen Thread zu blockieren, um auf ein Zwischenergebnis zu warten.

Die Startmethoden

Normalerweise erstellen Sie ein CompletableFuture nicht direkt. Sie starten eine Pipeline mit einer dieser Methoden:

CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<Void>    b = CompletableFuture.runAsync(() -> log("hello"));
CompletableFuture<String>  c = CompletableFuture.completedFuture("ready");
CompletableFuture<String>  d = CompletableFuture.failedFuture(new IOException("nope"));
StartmethodeVerhalten
supplyAsync(Supplier)Führt einen Supplier im Common Pool aus und gibt dessen Wert zurück
runAsync(Runnable)Führt ein Runnable im Common Pool aus, kein Rückgabewert
completedFuture(v)Ein bereits aufgelöstes Future mit dem angegebenen Wert
failedFuture(t)Ein bereits fehlgeschlagenes Future mit dem angegebenen Throwable

supplyAsync und runAsync haben Überladungen, die einen expliziten Executor akzeptieren. Sie sollten fast immer einen übergeben. Der Standardwert ist ForkJoinPool.commonPool() — ein gemeinsamer Pool, der auf Ihre CPU-Anzahl ausgelegt ist, gut für kurze CPU-Arbeit, aber verheerend für I/O (ein langsamer Aufruf blockiert für alle einen Kern). Übergeben Sie für I/O oder Arbeit mit unbekannten Kosten immer einen expliziten Executor.

Verkettung: thenApply, thenAccept, thenRun

Die einfachsten Kombinatoren wandeln ein Future in ein anderes um:

CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> 42);

CompletableFuture<String>  b = a.thenApply(n -> "value is " + n);          // transform
CompletableFuture<Void>    c = a.thenAccept(n -> System.out.println(n));    // consume, no result
CompletableFuture<Void>    d = a.thenRun(() -> System.out.println("done")); // side-effect, ignore value
MethodeLambda-TypRückgabe
thenApplyFunction<T,U>CompletableFuture<U>
thenAcceptConsumer<T>CompletableFuture<Void>
thenRunRunnableCompletableFuture<Void>

Jede Methode hat drei Varianten:

  • thenApply(fn) — läuft auf dem Thread, der die vorherige Stufe abgeschlossen hat
  • thenApplyAsync(fn) — läuft im Common Pool
  • thenApplyAsync(fn, executor) — läuft auf einem bestimmten Executor

Die nicht-Async-Variante ist am schnellsten (kein Thread-Wechsel), bedeutet aber, dass Ihre fn auf dem Thread läuft, der die vorherige Stufe abgeschlossen hat — möglicherweise der I/O-Thread, den Sie nicht mit CPU-Arbeit belegen möchten. Die *Async-Varianten sind in heterogenen Pipelines die sicherere Standardwahl.

thenCompose — ein Future eines Futures flach machen

thenApply ist in Ordnung, wenn die Funktion einen einfachen Wert zurückgibt. Wenn sie ein weiteres CompletableFuture zurückgibt, möchten Sie kein CompletableFuture<CompletableFuture<V>> — Sie brauchen thenCompose:

CompletableFuture<User> user = lookupUser(id);
CompletableFuture<Profile> profile = user.thenCompose(u -> loadProfile(u.profileId()));
//                                          ^ Function<User, CompletableFuture<Profile>>

thenCompose ist flatMap für Futures. Verwenden Sie es, wenn der nächste Schritt selbst asynchron ist; verwenden Sie thenApply, wenn er es nicht ist.

Zwei Futures kombinieren: thenCombine

Wenn Sie zwei unabhängige asynchrone Werte haben und diese kombinieren möchten:

CompletableFuture<Integer> price   = fetchPrice(symbol);
CompletableFuture<Integer> shares  = fetchShares(account);
CompletableFuture<Integer> total   = price.thenCombine(shares, (p, s) -> p * s);

thenCombine wartet auf beide Eingaben und wendet dann eine BiFunction auf ihre Ergebnisse an. Die beiden Futures laufen parallel — price und shares sind bereits in Bearbeitung, wenn thenCombine registriert wird. Der Kombinator läuft auf dem Thread, der als zweites abschließt.

Die „Any"-Variante, applyToEither, nimmt das erste Ergebnis und ignoriert das zweite.

Viele Futures: allOf und anyOf

Wenn die Parallelität über eine Sammlung von Futures erfolgt:

List<CompletableFuture<String>> all = ids.stream()
    .map(this::fetchAsync)
    .toList();

CompletableFuture<Void> doneAll  = CompletableFuture.allOf(all.toArray(new CompletableFuture[0]));
CompletableFuture<Object> firstOne = CompletableFuture.anyOf(all.toArray(new CompletableFuture[0]));

allOf schließt ab, wenn jede Eingabe fertig ist. Es gibt CompletableFuture<Void> zurück — um tatsächlich die Ergebnisliste zu erhalten, müssen Sie thenApply verwenden und sie herausziehen:

CompletableFuture<List<String>> results = doneAll.thenApply(v ->
    all.stream().map(CompletableFuture::join).toList());        // .join() never blocks here — they're all complete

anyOf gibt den Wert des Inputs zurück, der als erster abschließt (als Object — es gibt keine Möglichkeit, „eines dieser typisierten Futures" mit einem Rückgabetyp auszudrücken).

Fehlerbehandlung: exceptionally und handle

Ein CompletableFuture kann fehlschlagen (jede Stufe, die eine Ausnahme wirft, erzeugt ein fehlgeschlagenes Future flussabwärts). Die Kombinatoren, die es wiederherstellen oder transformieren:

CompletableFuture<String> safe = riskyAsync()
    .exceptionally(ex -> "fallback for: " + ex.getMessage());

CompletableFuture<String> either = riskyAsync()
    .handle((value, ex) -> ex == null ? value : "fallback");
MethodeWann sie läuftWas sie zurückgibt
exceptionally(fn)Nur bei Fehler; erhält die UrsacheWiederhergestellter Wert
handle(bi)Immer; erhält (value, ex) (eines ist null)Transformierter Wert
whenComplete(bi)Immer; erhält (value, ex)Dasselbe Future, nur Nebeneffekt

exceptionally ist der einfache „Catch und Ersetzen"-Pfad. handle ist der allgemeinere „immer ausführen, basierend auf dem Ergebnis entscheiden"-Ansatz — nützlich, wenn Sie jeden Abschluss unabhängig vom Erfolg protokollieren möchten.

orTimeout und completeOnTimeout

Java 9 hat Timeouts direkt zur Futures-API hinzugefügt:

CompletableFuture<String> withDeadline = riskyAsync()
    .orTimeout(2, TimeUnit.SECONDS);                  // completes exceptionally if not done in 2s

CompletableFuture<String> withDefault = riskyAsync()
    .completeOnTimeout("fallback", 2, TimeUnit.SECONDS);

Damit können Sie Fristen ausdrücken, ohne einen eigenen Watchdog zu schreiben. Sie verwenden interne geplante Threads und sind daher günstig anzuhängen.

Nicht in asynchronen Stufen blockieren

Der häufigste Fehler mit CompletableFuture: .get() oder .join() innerhalb einer Async-Stufe aufrufen. Das ist ein Thread des Executor-Pools, der untätig wartet auf einen anderen Thread desselben Pools — unter Last können Sie den gesamten Pool in einem Deadlock blockieren.

// WRONG — joining inside an async stage on the common pool
CompletableFuture.supplyAsync(() -> {
  Integer x = anotherFuture().join();                 // blocks a pool thread
  return x * 2;
});

// RIGHT — compose instead of join
anotherFuture().thenApply(x -> x * 2);

Wenn Sie innerhalb einer Async-Stufe nach .get() greifen, wollten Sie eigentlich thenCompose/thenApply verwenden.

Einen eigenen Executor verwenden

Der Common-Pool-Standard ist für kurze CPU-Arbeit geeignet. Für I/O oder alles, was blockieren könnte, verwenden Sie Ihren eigenen:

ExecutorService io = Executors.newFixedThreadPool(50, namedFactory("io"));
ExecutorService cpu = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), namedFactory("cpu"));

CompletableFuture.supplyAsync(this::loadFromDb, io)
    .thenApplyAsync(this::transform, cpu)
    .thenAcceptAsync(this::sendToClient, io);

Jeder Schritt läuft auf dem richtigen Pool. Der Common Pool bleibt frei für parallelStream und andere Framework-Nutzung. Diese Art der Mischung ist der Kern eines gut verhaltenden asynchronen Java.

Ein praktisches Beispiel: eine kleine asynchrone Pipeline

Das folgende Programm ruft einen „Benutzer" und ein „Profil" parallel ab, kombiniert sie, setzt eine Frist und erholt sich von einem Fehlerpfad.

java— editable, runs on the server

Was aus dem Lauf zu entnehmen ist:

  • Abschnitt 1 verwendete thenCombine auf zwei unabhängigen Abrufen. Sie liefen parallelname (50 ms) und age (80 ms) waren bereits in Bearbeitung, bevor der Kombinator angehängt wurde. Das kombinierte Future schloss kurz nach dem Abschluss des langsameren ab. Das ist die Parallelität: Eine asynchrone Pipeline wartet nicht auf jeden Schritt, sie kombiniert die Schritte als Graphen.
  • Abschnitt 2 verwendete thenCompose, um Schritte zu verketten, bei denen jeder Schritt selbst asynchron ist. thenApply hätte ein CompletableFuture<CompletableFuture<String>> ergeben — nutzlos. thenCompose macht es flach, so wie flatMap es für Streams und Optional tut.
  • Abschnitt 3 verwendete allOf über eine Liste und dann thenApply, um die Werte herauszuziehen. Das allOf selbst gibt Void zurück; das Ernte der Ergebnisse erfolgt in einem separaten Stream über die (jetzt abgeschlossenen) Futures mit join(). Die join()-Aufrufe blockieren hier nicht, weil das allOf bereits abgeschlossen ist.
  • Abschnitt 4 zeigte exceptionally beim Erholen von einer geworfenen Aufgabe. Das vorgelagerte Future schlug fehl; das nachgelagerte Future gab den Fallback-String zurück. Ohne exceptionally (oder handle) würde der Fehler als CompletionException zu .join() weitergegeben.
  • Abschnitt 5 verwendete orTimeout, um eine 100-ms-Frist auf eine 500-ms-Aufgabe anzuwenden. Das Future schloss mit TimeoutException ab; das join warf es erneut innerhalb von CompletionException. Dies ist die richtige Form für „Ich möchte dieses Ergebnis, aber nur, wenn es schnell genug ankommt."
  • Abschnitt 6 verwendete handle, um in einem einzigen Schritt auf Erfolg/Fehler zu verzweigen. handle läuft immer und erhält sowohl (value, ex) — eines ist null. Nützlich, wenn Sie ein einheitliches Ende der Pipeline möchten, unabhängig davon, ob die Arbeit erfolgreich war.

Nächste Schritte

Das nächste Kapitel, Java Fork/Join, behandelt den ForkJoinPool — den Work-Stealing-Pool, der parallele Streams und den CompletableFuture-Common-Pool unterstützt, und das richtige Werkzeug für Teile-und-Herrsche-CPU-Arbeit.

Übungen

Übung
Sie schreiben `CompletableFuture.supplyAsync(() -> { Integer x = otherFuture().get(); return x * 2; })`. Im Lambda rufen Sie `.get()` auf einem anderen Future auf, das demselben Standard-Pool übergeben wurde. Welches Risiko besteht?
Sie schreiben `CompletableFuture.supplyAsync(() -> { Integer x = otherFuture().get(); return x * 2; })`. Im Lambda rufen Sie `.get()` auf einem anderen Future auf, das demselben Standard-Pool übergeben wurde. Welches Risiko besteht?
Was this page helpful?