Consuming RESTful services asynchronously with template interfaces

duration 15 minutes

Prerequisites:

Learn how to use MicroProfile Rest Client to invoke RESTful microservices asynchronously over HTTP.

What you’ll learn

You will learn how to build a MicroProfile Rest Client to access remote RESTful services using asynchronous method calls. You’ll update the template interface for a MicroProfile Rest Client to use the CompletionStage return type. The template interface maps to the remote service that you want to call. A CompletionStage interface allows you to work with the result of your remote service call asynchronously.

What is asynchronous programming?

Imagine asynchronous programming as a restaurant. After you’re seated, a waiter takes your order. Then, you must wait a few minutes for your food to be prepared. While your food is being prepared, your waiter may take more orders or serve other tables. After your food is ready, your waiter brings out the food to your table. However, in a synchronous model, the waiter must wait for your food to be prepared before serving any other customers. This method blocks other customers from placing orders or receiving their food.

You can perform lengthy operations, such as input/output (I/O), without blocking with asynchronous methods. The I/O operation can occur in the background and a callback notifies the caller to continue its computation when the original request is complete. As a result, the original thread frees up so it can handle other work rather than wait for the I/O to complete. Revisiting the restaurant analogy, food is prepared asynchronously in the kitchen and your waiter is freed up to attend to other tables.

In the context of REST clients, HTTP request calls can be time consuming. The network might be slow, or maybe the upstream service is overwhelmed and can’t respond quickly. These lengthy operations can block the execution of your thread when it’s in use and prevent other work from being completed.

The application in this guide consists of three microservices, system, inventory, and query. Every 15 seconds the system microservice calculates and publishes an event that contains its average system load. The inventory microservice subscribes to that information so that it can keep an updated list of all the systems and their current system loads.

Reactive Inventory System

The microservice that you will modify is the query service. It communicates with the inventory service to determine which system has the highest system load and which system has the lowest system load.

The system and inventory microservices use MicroProfile Reactive Messaging to send and receive the system load events. If you want to learn more about reactive messaging, see the Creating Reactive Java Microservices guide.

Additional prerequisites

Before you begin, Docker needs to be installed. For installation instructions, refer to the official Docker documentation. You will build and run the microservices in Docker containers. An installation of Apache Kafka is provided in another Docker container.

Getting started

The fastest way to work through this guide is to clone the Git repository and use the projects that are provided inside:

git clone https://github.com/openliberty/guide-microprofile-rest-client-async.git
cd guide-microprofile-rest-client-async

The start directory contains the starting project that you will build upon.

The finish directory contains the finished project that you will build.

Before you begin, make sure you have all the necessary prerequisites.

Updating the template interface of a REST client to use asynchronous methods

Navigate to the start directory to begin.

The query service uses a MicroProfile Rest Client to access the inventory service. You will update the methods in the template interface for this client to be asynchronous.

Replace the InventoryClient interface.
query/src/main/java/io/openliberty/guides/query/client/InventoryClient.java

InventoryClient.java

 1// tag::copyright[]
 2/*******************************************************************************
 3 * Copyright (c) 2020 IBM Corporation and others.
 4 * All rights reserved. This program and the accompanying materials
 5 * are made available under the terms of the Eclipse Public License v1.0
 6 * which accompanies this distribution, and is available at
 7 * http://www.eclipse.org/legal/epl-v10.html
 8 *
 9 * Contributors:
10 *     IBM Corporation - Initial implementation
11 *******************************************************************************/
12// end::copyright[]
13package io.openliberty.guides.query.client;
14
15import java.util.List;
16import java.util.Properties;
17import java.util.concurrent.CompletionStage;
18
19import javax.ws.rs.GET;
20import javax.ws.rs.Path;
21import javax.ws.rs.PathParam;
22import javax.ws.rs.Produces;
23import javax.ws.rs.core.MediaType;
24
25import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
26
27@Path("/inventory")
28@RegisterRestClient(configKey = "InventoryClient", baseUri = "http://localhost:9085")
29public interface InventoryClient extends AutoCloseable {
30
31    @GET
32    @Path("/systems")
33    @Produces(MediaType.APPLICATION_JSON)
34    public List<String> getSystems();
35
36    // tag::getSystem[]
37    @GET
38    @Path("/systems/{hostname}")
39    @Produces(MediaType.APPLICATION_JSON)
40    public CompletionStage<Properties> getSystem(@PathParam("hostname") String hostname);
41    // end::getSystem[]
42
43}

The changes involve the getSystem method. Change the return type to CompletionStage<Properties> to make the method asynchronous. The method now has the return type of CompletionStage<Properties> so you aren’t able to directly manipulate the Properties inner type. As you will see in the next section, you’re able to indirectly use the Properties by chaining callbacks.

Updating a REST resource to asynchronously handle HTTP requests

To reduce the processing time, you will update the /query/systemLoad endpoint to asynchronously send the requests. Multiple client requests will be sent synchronously in a loop. The asynchronous calls do not block the program so the endpoint needs to ensure that all calls are completed and all returned data is processed before proceeding.

Replace the QueryResource class.
query/src/main/java/io/openliberty/guides/query/QueryResource.java

QueryResource.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2020 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License v1.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-v10.html
  8 *
  9 * Contributors:
 10 *     IBM Corporation - Initial implementation
 11 *******************************************************************************/
 12// end::copyright[]
 13package io.openliberty.guides.query;
 14
 15import java.math.BigDecimal;
 16import java.util.List;
 17import java.util.Map;
 18import java.util.Properties;
 19import java.util.concurrent.ConcurrentHashMap;
 20import java.util.concurrent.CountDownLatch;
 21import java.util.concurrent.TimeUnit;
 22
 23import javax.enterprise.context.ApplicationScoped;
 24import javax.inject.Inject;
 25import javax.ws.rs.GET;
 26import javax.ws.rs.Path;
 27import javax.ws.rs.Produces;
 28import javax.ws.rs.core.MediaType;
 29
 30import org.eclipse.microprofile.rest.client.inject.RestClient;
 31
 32import io.openliberty.guides.query.client.InventoryClient;
 33
 34@ApplicationScoped
 35@Path("/query")
 36public class QueryResource {
 37    
 38    @Inject
 39    @RestClient
 40    private InventoryClient inventoryClient;
 41
 42    // tag::systemLoad[]
 43    @GET
 44    @Path("/systemLoad")
 45    @Produces(MediaType.APPLICATION_JSON)
 46    public Map<String, Properties> systemLoad() {
 47        // tag::getSystems[]
 48        List<String> systems = inventoryClient.getSystems();
 49        // end::getSystems[]
 50        // tag::countdown1[]
 51        CountDownLatch remainingSystems = new CountDownLatch(systems.size());
 52        // end::countdown1[]
 53        final Holder systemLoads = new Holder();
 54
 55        for (String system : systems) {
 56            // tag::getSystem[]
 57            inventoryClient.getSystem(system)
 58            // end::getSystem[]
 59                           // tag::thenAcceptAsync[]
 60                           .thenAcceptAsync(p -> {
 61                                if (p != null) {
 62                                    systemLoads.updateValues(p);
 63                                }
 64                                // tag::countdown2[]
 65                                remainingSystems.countDown();
 66                                // end::countdown2[]
 67                           })
 68                           // end::thenAcceptAsync[]
 69                           // tag::exceptionally[]
 70                           .exceptionally(ex -> {
 71                                ex.printStackTrace();
 72                                // tag::countdown3[]
 73                                remainingSystems.countDown();
 74                                // end::countdown3[]
 75                                return null;
 76                           });
 77                           // end::exceptionally[]
 78        }
 79
 80        // Wait for all remaining systems to be checked
 81        try {
 82            // tag::await[]
 83            remainingSystems.await(30, TimeUnit.SECONDS);
 84            // end::await[]
 85        } catch (InterruptedException e) {
 86            e.printStackTrace();
 87        }
 88
 89        return systemLoads.getValues();
 90    }
 91    // end::systemLoad[]
 92
 93    // tag::holder[]
 94    private class Holder {
 95        // tag::volatile[]
 96        private volatile Map<String, Properties> values;
 97        // end::volatile[]
 98
 99        public Holder() {
100            // tag::concurrentHashMap[]
101            this.values = new ConcurrentHashMap<String, Properties>();
102            // end::concurrentHashMap[]
103            init();
104        }
105
106        public Map<String, Properties> getValues() {
107            return this.values;
108        }
109
110        public void updateValues(Properties p) {
111            final BigDecimal load = (BigDecimal) p.get("systemLoad");
112
113            this.values.computeIfPresent("lowest", (key, curr_val) -> {
114                BigDecimal lowest = (BigDecimal) curr_val.get("systemLoad");
115                return load.compareTo(lowest) < 0 ? p : curr_val;
116            });
117            this.values.computeIfPresent("highest", (key, curr_val) -> {
118                BigDecimal highest = (BigDecimal) curr_val.get("systemLoad");
119                return load.compareTo(highest) > 0 ? p : curr_val;
120            });
121        }
122
123        private void init() {
124            // Initialize highest and lowest values
125            this.values.put("highest", new Properties());
126            this.values.put("lowest", new Properties());
127            this.values.get("highest").put("hostname", "temp_max");
128            this.values.get("lowest").put("hostname", "temp_min");
129            this.values.get("highest").put("systemLoad", new BigDecimal(Double.MIN_VALUE));
130            this.values.get("lowest").put("systemLoad", new BigDecimal(Double.MAX_VALUE));
131        }
132    }
133    // end::holder[]
134}

First, the systemLoad endpoint first gets all the hostnames by calling getSystems(). In the getSystem() method, multiple requests are sent asynchronously to the inventory service for each hostname. When the requests return, the thenAcceptAsync() method processes the returned data with the CompletionStage<Properties> interface.

The CompletionStage<Properties> interface represents a unit of computation. After a computation is complete, it can either be finished or it can be chained with more CompletionStage<Properties> interfaces using the thenAcceptAsync() method. Exceptions are handled in a callback that is provided to the exceptionally() method, which behaves like a catch block. When you return a CompletionStage<Properties> type in the resource, it doesn’t necessarily mean that the computation completed and the response was built. JAX-RS responds to the caller after the computation completes.

In the systemLoad() method a CountDownLatch object is used to track asynchronous requests. The countDown() method is called whenever a request is complete. When the CountDownLatch is at zero, it indicates that all asynchronous requests are complete. By using the await() method of the CountDownLatch, the program waits for all the asynchronous requests to be complete. When all asynchronous requests are complete, the program resumes execution with all required data processed.

A Holder class is used to wrap a variable called values that has the volatile keyword. The values variable is instantiated as a ConcurrentHashMap object. Together, the volatile keyword and ConcurrentHashMap type allow the Holder class to store system information and safely access it asynchronously from multiple threads.

Building and running the application

You will build and run the system, inventory, and query microservices in Docker containers. You can learn more about containerizing microservices with Docker in the Containerizing microservices guide.

Start your Docker environment. Dockerfiles are provided for you to use.

To build the application, run the Maven install and package goals from the command-line session in the start directory:

mvn -pl models install
mvn package

Run the following commands to containerize the microservices:

docker build -t system:1.0-SNAPSHOT system/.
docker build -t inventory:1.0-SNAPSHOT inventory/.
docker build -t query:1.0-SNAPSHOT query/.

Next, use the provided startContainers script to start the application in Docker containers. The script creates containers for Kafka, Zookeeper, and all of the microservices in the project, in addition to a network for the containers to communicate with each other. The script also creates three instances of the system microservice.

.\scripts\startContainers.bat
./scripts/startContainers.sh

The services take some time to become available. You can access the application by making requests to the query/systemLoad endpoint by going to the http://localhost:9080/query/systemLoad URL.

When the service is ready, you see an output similar to the following example which was formatted for readability.

{
    "highest": {
        "hostname" : "8841bd7d6fcd",
        "systemLoad" : 6.96
    },
    "lowest": {
        "hostname" : "37140ec44c9b",
        "systemLoad" : 6.4
    }
}

Switching to an asynchronous programming model freed up the thread that handles requests to the inventory service. While requests process, the thread can handle other work or requests. In the /query/systemLoad endpoint, multiple systems are read and compared at once.

When you are done checking out the application, run the following script to stop the application:

.\scripts\stopContainers.bat
./scripts/stopContainers.sh

Testing the query microservice

You will create an endpoint test to test the basic functionality of the query microservice. If a test failure occurs, then you might have introduced a bug into the code.

Create the QueryServiceIT class.
query/src/test/java/it/io/openliberty/guides/query/QueryServiceIT.java

The testLoads() test case verifies that the query service can calculate the highest and lowest system loads.

QueryServiceIT.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2020 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License v1.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-v10.html
  8 *
  9 * Contributors:
 10 *     IBM Corporation - Initial implementation
 11 *******************************************************************************/
 12// end::copyright[]
 13package it.io.openliberty.guides.query;
 14
 15import static org.junit.jupiter.api.Assertions.assertEquals;
 16
 17import java.util.Map;
 18import java.util.Properties;
 19import org.junit.jupiter.api.BeforeAll;
 20import org.junit.jupiter.api.Test;
 21import org.microshed.testing.jaxrs.RESTClient;
 22import org.microshed.testing.jupiter.MicroShedTest;
 23import org.microshed.testing.SharedContainerConfig;
 24import org.mockserver.model.HttpRequest;
 25import org.mockserver.model.HttpResponse;
 26
 27import io.openliberty.guides.query.QueryResource;
 28
 29@MicroShedTest
 30@SharedContainerConfig(AppContainerConfig.class)
 31public class QueryServiceIT {
 32
 33    @RESTClient
 34    public static QueryResource queryResource;
 35
 36    private static String testHost1 = 
 37        "{" + 
 38            "\"hostname\" : \"testHost1\"," +
 39            "\"systemLoad\" : 1.23" +
 40        "}";
 41    private static String testHost2 = 
 42        "{" + 
 43            "\"hostname\" : \"testHost2\"," +
 44            "\"systemLoad\" : 3.21" +
 45        "}";
 46    private static String testHost3 =
 47        "{" + 
 48            "\"hostname\" : \"testHost3\"," +
 49            "\"systemLoad\" : 2.13" +
 50        "}";
 51
 52    @BeforeAll
 53    public static void setup() throws InterruptedException {
 54        AppContainerConfig.mockClient.when(HttpRequest.request()
 55                                         .withMethod("GET")
 56                                         .withPath("/inventory/systems"))
 57                                     .respond(HttpResponse.response()
 58                                         .withStatusCode(200)
 59                                         .withBody("[\"testHost1\"," + 
 60                                                    "\"testHost2\"," +
 61                                                    "\"testHost3\"]")
 62                                         .withHeader("Content-Type", "application/json"));
 63
 64        AppContainerConfig.mockClient.when(HttpRequest.request()
 65                                         .withMethod("GET")
 66                                         .withPath("/inventory/systems/testHost1"))
 67                                     .respond(HttpResponse.response()
 68                                         .withStatusCode(200)
 69                                         .withBody(testHost1)
 70                                         .withHeader("Content-Type", "application/json"));
 71
 72        AppContainerConfig.mockClient.when(HttpRequest.request()
 73                                         .withMethod("GET")
 74                                         .withPath("/inventory/systems/testHost2"))
 75                                     .respond(HttpResponse.response()
 76                                         .withStatusCode(200)
 77                                         .withBody(testHost2)
 78                                         .withHeader("Content-Type", "application/json"));
 79
 80        AppContainerConfig.mockClient.when(HttpRequest.request()
 81                                         .withMethod("GET")
 82                                         .withPath("/inventory/systems/testHost3"))
 83                                     .respond(HttpResponse.response()
 84                                         .withStatusCode(200)
 85                                         .withBody(testHost3)
 86                                         .withHeader("Content-Type", "application/json"));
 87    }
 88
 89    // tag::testLoads[]
 90    @Test
 91    public void testLoads() {
 92        Map<String, Properties> response = queryResource.systemLoad();
 93
 94        assertEquals(
 95            "testHost2",
 96            response.get("highest").get("hostname"),
 97            "Returned highest system load incorrect"
 98        );
 99        assertEquals(
100            "testHost1",
101            response.get("lowest").get("hostname"),
102            "Returned lowest system load incorrect"
103        );
104    }
105    // end::testLoads[]
106
107}

Running the tests

Navigate to the query directory, then verify that the tests pass by using the Maven verify goal:

mvn verify

When the tests succeed, you see output similar to the following example:

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running it.io.openliberty.guides.query.QueryServiceIT
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 32.123 s - in it.io.openliberty.guides.query.QueryServiceIT

Results:

Tests run: 1, Failures: 0, Errors: 0, Skipped: 0

Great work! You’re done!

You have just modified an application to make asynchronous HTTP requests using Open Liberty and MicroProfile Rest Client.

Guide Attribution

Consuming RESTful services asynchronously with template interfaces by Open Liberty is licensed under CC BY-ND 4.0

Copy file contents
Copied to clipboard

Prerequisites:

Nice work! Where to next?

What did you think of this guide?

Extreme Dislike Dislike Like Extreme Like

What could make this guide better?

Raise an issue to share feedback

Create a pull request to contribute to this guide

Need help?

Ask a question on Stack Overflow

Like Open Liberty? Star our repo on GitHub.

Star