JAX-RS 2.1 (part of Java EE 8) now supports returning a CompletionStage
to mark a request as eligible for asynchronous processing. This is in addition to the AsyncResponse
API, which has been available since JAX-RS 2.0 (Java EE 7)
Even the
Client
API has added support for reactive-style programming by providing support forCompletionStage
API, but this blog will focus on the server-side support
The advantage this approach has over the AsyncResponse
-based API is that it is richer and allows you to create asynchronous pipelines. Let’s look at an example — available on GitHub. It is simple and slightly contrived, but hopefully, it should help get the point across:
@Path("cabs")
public class CabBookingResource {
@Resource
ManagedExecutorService mes;
@GET
@Path("{id}")
public CompletionStage<String> getCab(@PathParam("id") final String name) {
System.out.println("HTTP request handled by thread " + Thread.currentThread().getName());
final CompletableFuture<Boolean> validateUserTask = new CompletableFuture<>();
CompletableFuture<String> searchDriverTask = validateUserTask.thenComposeAsync(
new Function<Boolean, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(Boolean t) {
System.out.println("User validated ? " + t);
return CompletableFuture.supplyAsync(() -> searchDriver(), mes);
}
}, mes);
final CompletableFuture<String> notifyUserTask = searchDriverTask.thenApplyAsync(
(driver) -> notifyUser(driver), mes);
mes.execute(new Runnable() {
@Override
public void run() {
try {
validateUserTask.complete(validateUser(name));
} catch (Exception ex) {
Logger.getLogger(CabBookingResource.class.getName()).log(Level.SEVERE, null, ex);
}
}
});
return notifyUserTask;
}
boolean validateUser(String id) {
System.out.println("searchDriverTask handled by thread " + Thread.currentThread().getName());
System.out.println("validating user " + id);
try {
Thread.sleep(1500);
} catch (InterruptedException ex) {
Logger.getLogger(CabBookingResource.class.getName()).log(Level.SEVERE, null, ex);
}
return true;
}
String searchDriver() {
System.out.println("searchDriverTask handled by thread " + Thread.currentThread().getName());
try {
Thread.sleep(2500);
} catch (InterruptedException ex) {
Logger.getLogger(CabBookingResource.class.getName()).log(Level.SEVERE, null, ex);
}
return "abhishek";
}
String notifyUser(String info) {
System.out.println("searchDriverTask handled by thread " + Thread.currentThread().getName());
return "Your driver is " + info + " and the OTP is " + (new Random().nextInt(999) + 1000);
}
}
- It starts with an HTTP
GET
to/booking/cabs/<user>
, which invokes thegetCab
method:- The method returns a
CompletionStage
and returns immediately - The thread which served the request is now freed up
- The method returns a
- And then it's about creating the asynchronous pipeline:
- We orchestrate the tasks for user validation and driver search using
thenComposeAsync
– this gives aCompletableFuture
i.e. thesearchDriverTask
- We then supply a
Function
which takes the driver (returned by the above step) and invokes thenotifyUser
method – this is theCompletionStage
which we actually return i.e.notifyUserTask
– this is obviously executed later on, all we did was compose the sequence
- We orchestrate the tasks for user validation and driver search using
- Once the process is completed (delays are introduced using
Thread.sleep()
), the response is sent back to the user – internally, ourCompletableFuture
complete
s
To Run Using Docker
Refer to the README.
Further Reading
- Java EE 8 blogs
- JAX-RS eBook