Asynchronous REST with JAX-RS and MicroProfile

JAX-RS and MicroProfile Rest Client make writing and consuming RESTful services both easy and powerful, especially when using their asynchronous capabilities. In this post, we’ll look at how to use asynchronous JAX-RS resource methods on the server side. There are limited use cases in which async is suitable on the server, so we’ll try to cover those cases where it is most useful. Async on the client can be very beneficial. We’ll cover the advantages, and we’ll also go over the different ways of implementing an asynchronous client including the JAX-RS 2.0 async APIs, the JAX-RS 2.1 reactive APIs, and the MicroProfile Rest Client 1.1 type-safe, async APIs.

We’ll follow a sample travel planning service at: https://github.com/OpenLiberty/sample-async-rest

The application makes RESTful calls to an airline service, hotel service, and car rental service in order to make reservations and determine the total cost of the trip.

Async on the server (JAX-RS 2.0 and 2.1)

A JAX-RS resource method can be made asynchronous by using the @Suspended annotation on a parameter of type, AsyncResponse. Usually, the resource method would return void since the actual response would be returned using the AsyncResponse.resume(Object) method. Here is an example:

@Path("/airline")
public class AirlineService {

    private static ExecutorService executor = Executors.newFixedThreadPool(10);

    @Path("/cost")
    @GET
    public void getReservation(@QueryParam("from") String from,
                               @QueryParam("to") String to,
                               @QueryParam("start") LocalDate start,
                               @QueryParam("end") LocalDate end,
                               @Suspended AsyncResponse ar) {

        executor.execute( () -> {
            Reservation r = new Reservation();
            r.setDestination(to);
            r.setInitialLocation(from);
            r.setStartDate(start);
            r.setReturnDate(end);
            r.setCost(DBHelper.getCostOfAirlineReservation(from, to, start, end));

            ar.resume(r);
        });

    }
}

So what does this actually do? First, notice that there is a fixed thread pool, limited to 10 threads (this is for simplicity - a better practice would be to make the thread pool size configurable with MicroProfile Config so that you could change this setting without recompiling). We might choose to limit the size of the thread pool to prevent overloading the back-end resource, a database in this case.

Next, notice that the getReservation resource method doesn’t actually return anything. Instead it creates a new Reservation in a Runnable (via lambda) that makes a database call and then returns the reservation to the client through the AsyncResponse object. This means that the thread invoking the getReservation method can return immediately to handle new requests. The client won’t see a response, however, until the call to the Reservation has been constructed and passed to the resume method, which might take a little while depending on how long the database request might take (DBHelper…​). This example is essentially throttling access to the database.

Let’s suppose that the database is running slowly for some reason. If we implement this method synchronously, we might run into a situation where the database starts rejecting work, leading to exceptions thrown back to the client. By implementing this asynchronously, we limit the connections to 10; additional requests would be queued until they could eventually be executed by the executor service. This is ideal so long as the database can eventually catch up with the work.

In practice, it is my opinion that there are relatively few use cases where asynchronous methods on the server are more effective than their synchronous counterparts. Throttling access to another resource (as above) is one use case. Others might include where the thread factory has created threads optimized for the response (i.e. they already contain data on a ThreadLocal). Perhaps there is a performance benefit on some application servers; Open Liberty uses a well-tuned, global thread pool, there aren’t a lot of performance benefits to using async methods on the server.

But the client is a different story…​

Async on the client using callbacks (JAX-RS 2.0)

Let’s say that your service returns a result that is composed of the results of a few other remote services. You could invoke those remote services serially, but that really only makes sense if your thread pool is limited. Instead, you could invoke the services asynchronously, using more threads, but saving time for the entire call. Here is an example:

@Path("/jaxrs20/travel")
public class MyTravelPlanningService {

    private static WebTarget airlineWebTarget = ClientBuilder.newClient()
                                                             .target(App.AIRLINE_URL);

    private static WebTarget hotelWebTarget = ClientBuilder.newClient()
                                                           .target(App.HOTEL_URL);

    private static WebTarget carRentalWebTarget = ClientBuilder.newClient()
                                                               .target(App.RENTAL_URL);

    private class ReservationInvocationCallback implements InvocationCallback<Reservation> {
        private final CountDownLatch latch;
        private final DoubleAdder costAdder;
        Throwable throwable;

        ReservationInvocationCallback(CountDownLatch latch, DoubleAdder adder) {
            this.latch = latch;
            this.costAdder = adder;
        }

        @Override
        public void completed(Reservation r) {
            costAdder.add(r.getCost());
            latch.countDown();
        }

        @Override
        public void failed(Throwable t) {
            t.printStackTrace();
            throwable = t;
            latch.countDown();
        }
    }

    @Path("/cost")
    @GET
    public double getCostOfTravel(@QueryParam("from") String from,
                                  @QueryParam("to") String to,
                                  @QueryParam("startDate") LocalDate start,
                                  @QueryParam("returnDate") LocalDate end) {

        final CountDownLatch latch = new CountDownLatch(3);
        final DoubleAdder cost = new DoubleAdder();
        ReservationInvocationCallback callback = new ReservationInvocationCallback(latch, cost);
        airlineWebTarget.queryParam("from", from)
                        .queryParam("to", to)
                        .queryParam("start", start)
                        .queryParam("end", end)
                        .request()
                        .async()
                        .get(callback);
        hotelWebTarget.queryParam("location", to)
                      .queryParam("start", start)
                      .queryParam("end", end)
                      .request()
                      .async()
                      .get(callback);
        carRentalWebTarget.queryParam("location", to)
                          .queryParam("start", start)
                          .queryParam("end", end)
                          .request()
                          .async()
                          .get(callback);

        try {
            latch.await();
        } catch (InterruptedException ex) {
            throw new WebApplicationException(ex, 500);
        }
        if (callback.throwable != null) {
            callback.throwable.printStackTrace();
            throw new WebApplicationException("Failure in downstream service",
                callback.throwable, 500);
        }
        return cost.doubleValue();
    }
}

This slightly more complex example uses an InvocationCallback which will be notified when the async response has completed (either through the completed method if the response is successful, or the failed method if not). We tell the client to invoke the services asynchronously by invoking the async() method on the Invocation.Builder object that is returned from the request() method. That returns an instance of AsyncInvoker. From there, we use an instance of the callback to asynchronously invoke three different services. We’ll only end up waiting for as long as the longest of those three services. Very efficient!

In a failure case we log the exception and then propagate it back to the client. Also, we are caching and re-using the WebTarget for each remote service. This avoids a lot of object creation in the JAX-RS implementation code, improving overall performance.

This works well, but JAX-RS 2.1 gives us another option: a reactive client API.

Async on the client using Reactive APIs (JAX-RS 2.1)

JAX-RS 2.1 adds support for reactive APIs. Out of the box, JAX-RS 2.1 supports a CompletionStage return type. This allows users to string together a chain of stages that can be completed asynchronously. JAX-RS 2.1 also allows users to extend the reactive capabilities of the client by using other reactive providers such as RxJava, Guava, etc. For simplicity and brevity, we will only cover the CompletionStage approach here. My colleague, John Koehler is writing a blog post that will provide more information on reactive extensions. Stay tuned!

Similar to the async() method in JAX-RS 2.0 (which is still available in 2.1), we get an instance of a CompletionStageRxInvoker by using the rx() method on the Invocation.Builder. The CompletionStageRxInvoker has methods similar to the AsyncInvoker, but returns CompletionStage rather than Future. Also note that these methods do not take an InvocationCallback either.

So, if we were to re-write the JAX-RS 2.0 client example using the reactive client in JAX-RS 2.1, it would look something like this:

@Path("/jaxrs21/travel")
public class MyTravelPlanningService {

    private static WebTarget airlineWebTarget = ClientBuilder.newClient()
                                                             .target(App.AIRLINE_URL);

    private static WebTarget hotelWebTarget = ClientBuilder.newClient()
                                                           .target(App.HOTEL_URL);

    private static WebTarget carRentalWebTarget = ClientBuilder.newClient()
                                                               .target(App.RENTAL_URL);


    @Path("/cost")
    @GET
    public double getCostOfTravel(@QueryParam("from") String from,
                                  @QueryParam("to") String to,
                                  @QueryParam("startDate") LocalDate start,
                                  @QueryParam("returnDate") LocalDate end) {

        final CountDownLatch latch = new CountDownLatch(3);
        final DoubleAdder cost = new DoubleAdder();
        final AtomicReference<Throwable> throwable = new AtomicReference<>();

        BiConsumer<Reservation, Throwable> consumer = (r, t) -> {
            if (t != null) {
                throwable.set(t);
            } else {
                cost.add(r.getCost());
            }
            latch.countDown();
        };

        airlineWebTarget.queryParam("from", from)
                        .queryParam("to", to)
                        .queryParam("start", start)
                        .queryParam("end", end)
                        .request()
                        .rx()
                        .get(Reservation.class)
                        .whenCompleteAsync(consumer);

        hotelWebTarget.queryParam("location", to)
                      .queryParam("start", start)
                      .queryParam("end", end)
                      .request()
                      .rx()
                      .get(Reservation.class)
                      .whenCompleteAsync(consumer);

        carRentalWebTarget.queryParam("location", to)
                          .queryParam("start", start)
                          .queryParam("end", end)
                          .request()
                          .rx()
                          .get(Reservation.class)
                          .whenCompleteAsync(consumer);
        try {
            latch.await();
        } catch (InterruptedException ex) {
            throw new WebApplicationException(ex, 500);
        }

        Throwable t = throwable.get();
        if (t != null) {
            throw new WebApplicationException("Failure in downstream service",
                                              t, 500);
        }
        return cost.doubleValue();
    }
}

Functionally, the JAX-RS 2.1 reactive client example here is not much different from the JAX-RS 2.0 async client, but I think the reactive example is cleaner and easier to understand. And less code to maintain is also nice!

Speaking of less code…​ hopefully by now you’ve heard about the MicroProfile Rest Client. It is a proxy-based, type safe client API for RESTful services. Starting in MP Rest Client 1.1, you can make asynchronous calls using Java 8’s CompletionStage. Let’s check out that approach:

Async on the client using type-safe interfaces (MicroProfile Rest Client 1.1)

MicroProfile Rest Client takes a different approach to accessing remote RESTful services - a more type-safe approach where an interface, annotated similar to a JAX-RS resource, represents a remote service. Invoking methods on the client interface would be similar to invoking methods on a service hosted locally - with clever usage of providers like ResponseExceptionMapper, ParamConverterProvider, MessageBodyReader, MessageBodyWriter, etc. which allows us to design a service interface that indeed acts like it is local.

In order for a Rest Client method to be executed asynchronously, it must return a CompletionStage. Note that in MP Rest Client 1.2, it is also possible to use MP Fault Tolerance’s @Asynchronous annotation. This functionality is outside the scope of this post, but the jist is that if your interface method returns a Future, then the MP Fault Tolerance implementation will invoke the method on a separate thread, immediately returning a Future. You can find more information about the Fault Tolerance APIs at the project site.

So in our travel planning example, you might want to create client interfaces like:

@Path("/airline")
@RegisterProvider(LocalDateParamConverter.class)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public interface AirlineServiceClient {

    @Path("/cost")
    @GET
    CompletionStage<Reservation> getReservation(@QueryParam("from") String from,
                                                @QueryParam("to") String to,
                                                @QueryParam("start") LocalDate startDate,
                                                @QueryParam("end") LocalDate endDate);
}

And you would invoke the client like this:

@Path("/mpRest/travel")
public class MyTravelPlanningService {
    private final static String BASE_URI = "http://localhost:" + App.PORT + App.CONTEXT_ROOT;

    private final static AirlineServiceClient AIRLINE_CLIENT = RestClientBuilder.newBuilder()
                                                                                .baseUri(URI.create(BASE_URI))
                                                                                .build(AirlineServiceClient.class);
    private final static HotelServiceClient HOTEL_CLIENT = RestClientBuilder.newBuilder()
                                                                            .baseUri(URI.create(BASE_URI))
                                                                            .build(HotelServiceClient.class);
    private final static CarRentalServiceClient CAR_RENTAL_CLIENT = RestClientBuilder.newBuilder()
                                                                                     .baseUri(URI.create(BASE_URI))
                                                                                     .build(CarRentalServiceClient.class);

    @Path("/cost")
    @GET
    public double getCostOfTravel(@QueryParam("from") String from,
                                  @QueryParam("to") String to,
                                  @QueryParam("startDate") LocalDate start,
                                  @QueryParam("returnDate") LocalDate end) {
        final CountDownLatch latch = new CountDownLatch(3);
        final DoubleAdder cost = new DoubleAdder();
        final AtomicReference<Throwable> throwable = new AtomicReference<>();

        BiConsumer<Reservation, Throwable> consumer = (r, t) -> {
            if (t != null) {
                throwable.set(t);
            } else {
                cost.add(r.getCost());
            }
            latch.countDown();
        };

        AIRLINE_CLIENT.getReservation(from, to, start, end)
                      .whenCompleteAsync(consumer);

        HOTEL_CLIENT.getReservation(to, start, end)
                    .whenCompleteAsync(consumer);

        CAR_RENTAL_CLIENT.getReservation(to, start, end)
                         .whenCompleteAsync(consumer);

        try {
            latch.await();
        } catch (InterruptedException ex) {
            throw new WebApplicationException(ex, 500);
        }

        Throwable t = throwable.get();
        if (t != null) {
            throw new WebApplicationException("Failure in downstream service",
                                              t, 500);
        }
        return cost.doubleValue();
    }
}

This is even more clean than the JAX-RS 2.1 reactive client API!

The MP Rest Client approach also solves a problem of what to do with ThreadLocal objects that might be required for providers on the outbound request. Let’s say that we have a ThreadLocal object in our application that determines whether a given customer is a preferred loyalty club member. If so, we might add a ClientRequestFilter that would create a custom HTTP header to indicate that to the remote airline, hotel, or car rental service - something like Loyalty-ID: 1234 that might be used by the remote service to upgrade the reservation. The problem is that if the ClientRequestFilter checks the ThreadLocal object for the loyalty ID number on the asynchronous thread, it won’t be there - it was associated with the calling thread, not the async thread. This picture might help clarify the problem.

asyncREST thread local problem

Notice that the Loyalty ID is set on the calling thread, but that ID isn’t propagated to the async threads that are making the requests to the remote services. That’s where the AsyncInvocationInterceptor come into play. This is a provider type introduced in MP Rest Client 1.1 that allows users to propagate ThreadLocal objects - or really anything that is associated with the calling thread. This interface contains two methods (a third method is added in MP Rest Client 1.2 for removing contexts): prepareContext() and applyContext(). The former is invoked on the calling thread prior to "swapping" threads. The latter is invoked on the async thread.

asyncREST thread local solution

The AsyncInvocationInterceptor must be created by a AsyncInvocationInterceptorFactory like so:

public class LoyaltyAsyncInvocationInterceptorFactory
    implements AsyncInvocationInterceptorFactory {

    @Override
    public AsyncInvocationInterceptor newInterceptor() {
        return new AsyncInvocationInterceptor() {

            String loyaltyId;

            @Override
            public void prepareContext() {
                loyaltyId = App.LOYALTY_ID_THREADLOCAL.get();
            }

            @Override
            public void applyContext() {
                App.LOYALTY_ID_THREADLOCAL.set(loyaltyId);
            }};
    }
}

Let’s assume that loyalty members get a 10% discount when booking hotels online. Now let’s try invoking our travel planning site with a loyalty ID:

asyncREST curl output

With MP Rest Client we can get our loyalty discount and save about $200!

Summary

It’s getting easier to do things asynchronously with REST and Java. JAX-RS and MicroProfile give you some powerful tools in this space. Hopefully now you are more prepared to write and consumer RESTful services asynchronously.

If you’ve got any questions or run into any problems, please let us know. You can reach me on Twitter at @AndrewMcCright.

Thanks!

Share this post: