JAX-RS 2.1 (part of Java EE 8) now supports returning a CompletionStage to mark a request as eligible for asynchronous processing. This is in addition to the AsyncResponse API, which has been available since JAX-RS 2.0 (Java EE 7)
Even the
ClientAPI has added support for reactive-style programming by providing support forCompletionStageAPI, but this blog will focus on the server-side support
The advantage this approach has over the AsyncResponse-based API is that it is richer and allows you to create asynchronous pipelines. Let’s look at an example — available on GitHub. It is simple and slightly contrived, but hopefully, it should help get the point across:
@Path("cabs")
public class CabBookingResource {
@Resource
ManagedExecutorService mes;
@GET
@Path("{id}")
public CompletionStage<String> getCab(@PathParam("id") final String name) {
System.out.println("HTTP request handled by thread " + Thread.currentThread().getName());
final CompletableFuture<Boolean> validateUserTask = new CompletableFuture<>();
CompletableFuture<String> searchDriverTask = validateUserTask.thenComposeAsync(
new Function<Boolean, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(Boolean t) {
System.out.println("User validated ? " + t);
return CompletableFuture.supplyAsync(() -> searchDriver(), mes);
}
}, mes);
final CompletableFuture<String> notifyUserTask = searchDriverTask.thenApplyAsync(
(driver) -> notifyUser(driver), mes);
mes.execute(new Runnable() {
@Override
public void run() {
try {
validateUserTask.complete(validateUser(name));
} catch (Exception ex) {
Logger.getLogger(CabBookingResource.class.getName()).log(Level.SEVERE, null, ex);
}
}
});
return notifyUserTask;
}
boolean validateUser(String id) {
System.out.println("searchDriverTask handled by thread " + Thread.currentThread().getName());
System.out.println("validating user " + id);
try {
Thread.sleep(1500);
} catch (InterruptedException ex) {
Logger.getLogger(CabBookingResource.class.getName()).log(Level.SEVERE, null, ex);
}
return true;
}
String searchDriver() {
System.out.println("searchDriverTask handled by thread " + Thread.currentThread().getName());
try {
Thread.sleep(2500);
} catch (InterruptedException ex) {
Logger.getLogger(CabBookingResource.class.getName()).log(Level.SEVERE, null, ex);
}
return "abhishek";
}
String notifyUser(String info) {
System.out.println("searchDriverTask handled by thread " + Thread.currentThread().getName());
return "Your driver is " + info + " and the OTP is " + (new Random().nextInt(999) + 1000);
}
}
- It starts with an HTTP
GETto/booking/cabs/<user>, which invokes thegetCabmethod:- The method returns a
CompletionStageand returns immediately - The thread which served the request is now freed up
- The method returns a
- And then it's about creating the asynchronous pipeline:
- We orchestrate the tasks for user validation and driver search using
thenComposeAsync– this gives aCompletableFuturei.e. thesearchDriverTask - We then supply a
Functionwhich takes the driver (returned by the above step) and invokes thenotifyUsermethod – this is theCompletionStagewhich we actually return i.e.notifyUserTask– this is obviously executed later on, all we did was compose the sequence
- We orchestrate the tasks for user validation and driver search using
- Once the process is completed (delays are introduced using
Thread.sleep()), the response is sent back to the user – internally, ourCompletableFuturecompletes
To Run Using Docker
Refer to the README.
Further Reading
- Java EE 8 blogs
- JAX-RS eBook

