Java 8 Concurrency - CompletableFuture in practice
With CompletableFuture<T>
(and its interface, CompletionStage<T>
), Java 8 finally brings the concept of promises to Java.
Promises are the asynchronous alter ego of (synchronous) functions, and like functions, they can:
- return values – or more accurately, become fulfilled by a value,
- throw exceptions – or more accurately, complete with an exception,
- be combined to form a chain, a tree or a graph of operations.
In this article, I will show, using various examples, how to best use CompletableFuture
to leverage the full potential of promises.
Table of Contents
- Example: basic chaining
CompletableFuture<T>
’s API- Error handling
- Advanced usage
- Conclusion
- Other related articles
Example: basic chaining
Let’s first consider the below computation, assuming each operation is expensive (and therefore takes a noticeable amount of time):
and compare classic, synchronous function calls with asynchronous promises:
As you could see:
- the synchronous version took 7 seconds, while
- the asynchronous version took 5 seconds.
Indeed, if you look again at the tree representing what we are computing here, you immediately realise that x+1
and y
can be calculated in parallel, before they are added together to form the final result. This is the power of asynchronous computations.
WARNING:
Code examples in this article include calls to CompletableFuture.get
which blocks the main thread and waits for the CompletableFuture<T>
to finish. I do it to illustrate the behaviour of CompletableFuture<T>
more conveniently, but this is something you should avoid as much as possible in your production code.
Ideally, you should chain futures as much as you can (see below), be asynchronous “all the way”, and never wait, especially not in your main thread.
If you really have to wait, consider setting a timeout with this overload of CompletableFuture.get
and wait in a background thread to avoid blocking your entire application. CompletableFuture.getNow
can also be used and allows you to provide a default value if the future hasn’t completed yet. This is useful when your application cannot afford to wait any longer (e.g. when you need to meet a SLA even though a connection to a downstream system has timed out).
Never, ever “busy poll” in a loop using isDone
or be ready to face the consequences: 100% CPU usage doing… nothing!
CompletableFuture<T>
’s API
The supplyAsync
, thenApply
and thenCombine
methods used in the above example can be daunting at first, and it may not be immediately clear how to use CompletableFuture<T>
with its total of 59 methods!
However, if you look at the various functional abstractions available in Java 8, you will see that there is a direct correspondance between these and CompletableFuture<T>
’s methods, terminology-wise:
Start asynchronous operations
Indeed, you can start an asynchronous operation either:
- using a
Runnable
withrunAsync
, or - using a
Supplier
withsupplyAsync
:
Functional abstraction | Operation | Equivalent synchronous function | Start asynchronous operation with | Resulting promise |
---|---|---|---|---|
Runnable | void run(); | void procedure(); | CompletableFuture.runAsync(() -> runnable.run()) | CompletableFuture<Void> |
Supplier<T> | T get(); | T getter(); | CompletableFuture.supplyAsync(() -> supplier.get()) | CompletableFuture<T> |
Chain asynchronous operations
Similarly, you can chain asynchronous operations either:
- using a
Runnable
withthenRun
, or - using a
Consumer
withthenAccept
, or - using a
Function
withthenApply
:
Functional abstraction | Operation | Equivalent synchronous function | Chain asynchronous operation with | Resulting promise |
---|---|---|---|---|
Runnable | void run(); | void procedure(); | .thenRun(() -> runnable.run()) | CompletableFuture<Void> |
Consumer<T> | void accept(T t); | void setter(T t); | .thenAccept(t -> consumer.accept(t)) | CompletableFuture<Void> |
Function<T,R> | R apply(T t); | R function(T t); | .thenApply(t -> function.apply(t)) | CompletableFuture<R> |
Chaining CompletableFuture
s effectively is equivalent to attaching callbacks to the event “my future completed”. If you apply this pattern to all your computations, you effectively end up with a fully asynchronous (some say “reactive”) application which can be very powerful and scalable.
Async
vs. non-Async
methods
As per the Javadoc, you can fully control where the various operations are run:
- Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current
CompletableFuture
, or by any other caller of a completion method.- All async methods without an explicit
Executor
argument are performed using theForkJoinPool.commonPool()
(unless it does not support a parallelism level of at least two, in which case, a newThread
is created to run each task).
And you can therefore also provide a specific Executor
to all the Async
methods.
Here is the full listing of Async
and non-Async
methods returning a CompletableFuture
:
Non-Async method | Async method |
---|---|
acceptEither | acceptEitherAsync |
allOf (waits for all futures to complete) | N/A |
anyOf (waits for any future to complete) | N/A |
applyToEither | applyToEitherAsync |
completedFuture (converts a value in a future already completed with this value) | N/A |
exceptionally (handles an exception) | N/A |
handle | handleAsync |
runAfterBoth | runAfterBothAsync |
runAfterEither | runAfterEitherAsync |
N/A | runAsync (initialises a concurrent operation) |
N/A | supplyAsync (initialises a concurrent operation) |
thenAccept | thenAcceptAsync |
thenAcceptBoth | thenAcceptBothAsync |
thenApply | thenApplyAsync |
thenCombine | thenCombineAsync |
thenCompose | thenComposeAsync |
thenRun | thenRunAsync |
whenComplete | whenCompleteAsync |
Error handling
If you do anything worth doing asynchronously – complex distributed computation, network I/O with distant, potentially slow machines, etc. – chances are you code can (and will!) throw exceptions, so it is logical and critical to discuss how one would handle these asynchronously.
In the following paragraphs, we will look at what happens when a future throws an exception, and various ways to handle these:
- handle and return a default or error value
- handle and return a transformed future
- handle and propagate the exception
Example: when a future throws an exception
Let’s first see what happens if you call get
on a future which threw an exception:
As you can see in the above code example,
- the future completes with an exception –
isCompletedExceptionally()
returnstrue
- calling
get
re-throws the original exception wrapped in anExecutionException
– the original exception being still accessible viagetCause
.
Example: handle exception and return a default or error value
Depending on what you are doing, you may want to return a default or error value, e.g.: -1
for a computation supposed to return only positive values. You can achieve this asynchronously using exceptionally
and passing a function which converts from a Throwable
to your default / error value.
Note that, given the exception has been handled, isCompletedExceptionally()
now returns false
.
Example: handle exception and return a transformed future
Alternatively, you may want to combine a transformation for both the normal case and the error case. For example, if you are writing a web service, you could return an object representing a HTTP response with either:
- status code
200
/OK
and the expected result, or - status code
500
/Internal Server Error
and details on the error.
You can achieve this asynchronously using handle
and passing a function which converts:
- from your original type and a
Throwable
, - to your new type.
Note that, given the exception has been handled here too, isCompletedExceptionally()
returns false
.
Example: handle exception and propagate the exception
Finally, you may want to run some arbitrary code for both the normal case and the error case, e.g. to release some resource, to update some state, to log details, etc. but still either return the computed result or propagate the exception thrown. This is possible with whenComplete
.
Advanced usage
Example: long compute vs. slow store
In this example, let’s consider an expensive computation to perform. Given it is expensive and its result can be re-used later, we decide to cache it in a remote store which, to make things harder, can also potentially be slow, e.g.: it may suffer from latency spikes.
The application needs to serve the result to the end-user as soon as possible, so we both:
- re-calculate the value, and
- load the value from the store,
in two different futures in parallel, and then:
- return the first value we get back,
- cancel the remaining future, in order to save time and resources.
Below is the code for such a scenario, which can be implemented using applyToEitherAsync
:
You can see that applyToEitherAsync
is indeed triggered whenever either one of the two futures is done.
Note that CompletableFuture#cancel(boolean mayInterruptIfRunning)
, currently does not interrupt the targeted future, as per the Javadoc:
mayInterruptIfRunning - this value has no effect in this implementation because interrupts are not used to control processing.
However, this is something you could definitely implement in your own CompletionStage<T>
.
Also note that exceptions are propagated immediately, and the callback function passed to applyToEitherAsync
is then never executed. If this is undesirable behaviour, you can always chain whenComplete
to either the “failing future” or the “joining future”.
Example: multi-stages computations and futures’ synchronisation
In the next example, we perform a more complex computation for which we need to gather the intermediate results of all futures, and either aggregate these or compare them:
- we asynchronously compute integers from 1 to 5 – each integer generation takes 2 seconds
- we sum these together
- we asynchronously multiply the sum by 1, 2 and 3 – each multiplication takes 2 seconds
- we take the maximum.
Graphically, this would be represented as:
Let’s see how we would implement this with CompletableFuture
:
As you may have guessed, given we perform steps 1. (computing integers from 1 to 5) and 3. (multiplying the sum by 1, 2 and 3) asynchronously, the entire computation only takes 4 seconds, instead of 16 seconds if we were to perform the exact same computation using regular functions in one single thread.
Moreover, we synchronised futures and combined their intermediate results using various techniques:
- we summed integers from 1 to 5 using a reducer, combining the neutral element
CompletableFuture.completedFuture(0)
with other futures as they completed usingthenCombine
- we waited for all multiplications to complete using
allOf
andthenApply
-edInteger
’s natural order comparator to find the maximum value.
The resulting code is fairly clean, elegant and concise.
Conclusion
We thoroughly explored CompletableFuture<T>
’s API using various concrete examples.
Hopefully, this:
- made it easier for you to understand the API and how to use it,
- has convinced you of the usefulness of
CompletableFuture<T>
, - has made you more comfortable with asynchrony in general.
If you would rather read a bit more about CompletableFuture<T>
, below is a selection of articles which may be useful.
Finally, feel free to reach out if you have any comment or question about this article, in the below “Comments” section.
Other related articles
- Tomasz Nurkiewicz’s Java 8: Definitive guide to CompletableFuture (also available on DZone).
- Dennis Sosnoski’s JVM concurrency: Java 8 concurrency basics on IBM developerWorks.
- Maurice Naftalin’s Functional-Style Callbacks Using Java 8’s CompletableFuture on InfoQ.
- Scala’s futures and promises