Consuming RESTful services using the reactive JAX-RS client

duration 25 minutes
New

Prerequisites:

Learn how to use a reactive JAX-RS client to asynchronously invoke RESTful microservices over HTTP.

What you’ll learn

You’ll first learn how to create a reactive JAX-RS client application using the default JAX-RS reactive provider APIs. You will then learn how to improve the application to take advantage of the RxJava reactive extensions with a pluggable reactive provider that’s published by Eclipse Jersey. The JAX-RS client is an API used to communicate with RESTful web services. The API makes it easy to consume a web service that is exposed by using the HTTP protocol, which means that you can efficiently implement client-side applications. The reactive client extension to JAX-RS is an API that enables you to use the reactive programming model when using the JAX-RS client.

Reactive programming is an extension of asynchronous programming and focuses on the flow of data through data streams. Reactive applications process data when it becomes available and respond to requests as soon as processing is complete. The request to the application and response from the application are decoupled so that the application is not blocked from responding to other requests in the meantime. Because reactive applications can run faster than synchronous applications, they provide a much smoother user experience.

The application in this guide demonstrates how the JAX-RS client accesses remote RESTful services by using asynchronous method calls. You’ll first look at the supplied client application that uses the JAX-RS default CompletionStage-based provider. Then, you’ll modify the client application to use Jersey’s RxJava provider, which is an alternative JAX-RS reactive provider. Both Jersey and Apache CXF provide third-party reactive libraries for RxJava and were tested for use in Open Liberty.

The application that you will be working with consists of three microservices, system, inventory, and query. Every 15 seconds, the system microservice calculates and publishes an event that contains its current average system load. The inventory microservice subscribes to that information so that it can keep an updated list of all the systems and their current system loads.

Reactive Query Service

The microservice that you will modify is the query service. It communicates with the inventory service to determine which system has the highest system load and which system has the lowest system load.

The system and inventory microservices use MicroProfile Reactive Messaging to send and receive the system load events. If you want to learn more about reactive messaging, see the Creating reactive Java microservices guide.

Additional prerequisites

You need to have Docker installed. For installation instructions, refer to the official Docker documentation. You will build and run the microservices in Docker containers. An installation of Apache Kafka is provided in another Docker container.

Getting started

The fastest way to work through this guide is to clone the Git repository and use the projects that are provided inside:

git clone https://github.com/openliberty/guide-reactive-rest-client.git
cd guide-reactive-rest-client

The start directory contains the starting project that you will build upon.

The finish directory contains the finished project that you will build.

Creating a web client using the default JAX-RS API

InventoryClient.java

 1// tag::copyright[]
 2/*******************************************************************************
 3 * Copyright (c) 2020 IBM Corporation and others.
 4 * All rights reserved. This program and the accompanying materials
 5 * are made available under the terms of the Eclipse Public License v1.0
 6 * which accompanies this distribution, and is available at
 7 * http://www.eclipse.org/legal/epl-v10.html
 8 *
 9 * Contributors:
10 *     IBM Corporation - Initial implementation
11 *******************************************************************************/
12// end::copyright[]
13package io.openliberty.guides.query.client;
14
15import java.util.List;
16import java.util.Properties;
17import java.util.concurrent.CompletionStage;
18
19import javax.enterprise.context.RequestScoped;
20import javax.inject.Inject;
21import javax.ws.rs.core.MediaType;
22import javax.ws.rs.client.ClientBuilder;
23import javax.ws.rs.client.Invocation;
24import javax.ws.rs.client.WebTarget;
25import javax.ws.rs.core.GenericType;
26import javax.ws.rs.core.HttpHeaders;
27import org.eclipse.microprofile.config.inject.ConfigProperty;
28
29@RequestScoped
30public class InventoryClient {
31
32    @Inject
33    @ConfigProperty(name = "INVENTORY_BASE_URI", defaultValue = "http://localhost:9085")
34    private String baseUri;
35
36    public List<String> getSystems() {
37        return ClientBuilder.newClient()
38                            .target(baseUri)
39                            .path("/inventory/systems")
40                            .request()
41                            .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
42                            .get(new GenericType<List<String>>(){});
43    }
44
45    // tag::getSystem[]
46    public CompletionStage<Properties> getSystem(String hostname) {
47        return ClientBuilder.newClient()
48                            .target(baseUri)
49                            .path("/inventory/systems")
50                            .path(hostname)
51                            .request()
52                            .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
53                            // tag::rx[]
54                            .rx()
55                            // end::rx[]
56                            .get(Properties.class);
57    }
58    // end::getSystem[]
59}

Navigate to the start directory to begin.

JAX-RS provides a default reactive provider that you can use to create a reactive REST client using the CompletionStage interface.

Create an InventoryClient class, which is used to retrieve inventory data, and a QueryResource class, which queries data from the inventory service.

Create the InventoryClient interface.
query/src/main/java/io/openliberty/guides/query/client/InventoryClient.java

The getSystem() method returns the CompletionStage interface. This interface represents a unit or stage of a computation. When the associated computation completes, the value can be retrieved. The rx() method calls the CompletionStage interface. It retrieves the CompletionStageRxInvoker class and allows these methods to function correctly with the CompletionStage interface return type.

Create the QueryResource class.
query/src/main/java/io/openliberty/guides/query/QueryResource.java

QueryResource.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2020 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License v1.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-v10.html
  8 *
  9 * Contributors:
 10 *     IBM Corporation - Initial implementation
 11 *******************************************************************************/
 12// end::copyright[]
 13package io.openliberty.guides.query;
 14
 15import java.math.BigDecimal;
 16import java.util.List;
 17import java.util.Map;
 18import java.util.Properties;
 19import java.util.concurrent.ConcurrentHashMap;
 20import java.util.concurrent.CountDownLatch;
 21import java.util.concurrent.TimeUnit;
 22
 23import javax.enterprise.context.ApplicationScoped;
 24import javax.inject.Inject;
 25import javax.ws.rs.GET;
 26import javax.ws.rs.Path;
 27import javax.ws.rs.Produces;
 28import javax.ws.rs.core.MediaType;
 29
 30import io.openliberty.guides.query.client.InventoryClient;
 31
 32@ApplicationScoped
 33@Path("/query")
 34public class QueryResource {
 35
 36    @Inject
 37    private InventoryClient inventoryClient;
 38
 39    // tag::systemLoad[]
 40    @GET
 41    @Path("/systemLoad")
 42    @Produces(MediaType.APPLICATION_JSON)
 43    // tag::systemLoadMethod[]
 44    public Map<String, Properties> systemLoad() {
 45    // end::systemLoadMethod[]
 46        List<String> systems = inventoryClient.getSystems();
 47        // tag::countdownlatch[]
 48        CountDownLatch remainingSystems = new CountDownLatch(systems.size());
 49        // end::countdownlatch[]
 50        final Holder systemLoads = new Holder();
 51
 52        for (String system : systems) {
 53            // tag::getsystem[]
 54            inventoryClient.getSystem(system)
 55            // end::getsystem[]
 56                           // tag::thenAcceptAsync[]
 57                           .thenAcceptAsync(p -> {
 58                                if (p != null) {
 59                                    systemLoads.updateValues(p);
 60                                }
 61                                // tag::countdown1[]
 62                                remainingSystems.countDown();
 63                                // end::countdown1[]
 64                           })
 65                           // end::thenAcceptAsync[]
 66                           // tag::exceptionally[]
 67                           .exceptionally(ex -> {
 68                                // tag::countdown2[]
 69                                remainingSystems.countDown();
 70                                // end::countdown2[]
 71                                ex.printStackTrace();
 72                                return null;
 73                           });
 74                           // end::exceptionally[]
 75        }
 76
 77        // Wait for all remaining systems to be checked
 78        try {
 79            // tag::await[]
 80            remainingSystems.await(30, TimeUnit.SECONDS);
 81            // end::await[]
 82        } catch (InterruptedException e) {
 83            e.printStackTrace();
 84        }
 85
 86        return systemLoads.getValues();
 87    }
 88    // end::systemLoad[]
 89
 90    private class Holder {
 91        // tag::volatile[]
 92        private volatile Map<String, Properties> values;
 93        // end::volatile[]
 94
 95        public Holder() {
 96            // tag::concurrentHashMap[]
 97            this.values = new ConcurrentHashMap<String, Properties>();
 98            // end::concurrentHashMap[]
 99            init();
100        }
101
102        public Map<String, Properties> getValues() {
103            return this.values;
104        }
105
106        public void updateValues(Properties p) {
107            final BigDecimal load = (BigDecimal) p.get("systemLoad");
108
109            this.values.computeIfPresent("lowest", (key, curr_val) -> {
110                BigDecimal lowest = (BigDecimal) curr_val.get("systemLoad");
111                return load.compareTo(lowest) < 0 ? p : curr_val;
112            });
113            this.values.computeIfPresent("highest", (key, curr_val) -> {
114                BigDecimal highest = (BigDecimal) curr_val.get("systemLoad");
115                return load.compareTo(highest) > 0 ? p : curr_val;
116            });
117        }
118
119        private void init() {
120            // Initialize highest and lowest values
121            this.values.put("highest", new Properties());
122            this.values.put("lowest", new Properties());
123            this.values.get("highest").put("hostname", "temp_max");
124            this.values.get("lowest").put("hostname", "temp_min");
125            this.values.get("highest").put("systemLoad", new BigDecimal(Double.MIN_VALUE));
126            this.values.get("lowest").put("systemLoad", new BigDecimal(Double.MAX_VALUE));
127        }
128    }
129}

The systemLoad endpoint asynchronously processes the data that is retrieved by the InventoryClient interface and serves that data after all of the services respond. The thenAcceptAsync() and exceptionally() methods together behave like an asynchronous try-catch block. The data is processed in the thenAcceptAsync() method only after the CompletionStage interface finishes retrieving it. When you return a CompletionStage type in the resource, it doesn’t necessarily mean that the computation completed and the response was built.

A CountDownLatch object is used to track how many asynchronous requests are being waited on. After each thread is completed, the countdown() method counts the CountDownLatch object down towards 0. This means that the value returns only after the thread that’s retrieving the value is complete. The await() method stops and waits until all of the requests are complete. While the countdown completes, the main thread is free to perform other tasks. In this case, no such task is present.

Building and running the application

The system, inventory, and query microservices will be built in Docker containers. If you want to learn more about Docker containers, check out the Containerizing microservices guide.

Start your Docker environment.

To build the application, run the Maven install and package goals from the command-line session in the start directory:

mvn -pl models install
mvn package

Run the following command to download or update to the latest openliberty/open-liberty:kernel-java8-openj9-ubi Docker image:

docker pull openliberty/open-liberty:kernel-java8-openj9-ubi

Run the following commands to containerize the microservices:

docker build -t system:1.0-SNAPSHOT system/.
docker build -t inventory:1.0-SNAPSHOT inventory/.
docker build -t query:1.0-SNAPSHOT query/.

Next, use the provided script to start the application in Docker containers. The script creates a network for the containers to communicate with each other. It creates containers for Kafka, Zookeeper, and all of the microservices in the project.

./scripts/startContainers.sh
.\scripts\startContainers.bat

The services will take some time to become available. You can access the application by making requests to the query/systemLoad endpoint at the http://localhost:9080/query/systemLoad URL.

When the service is ready, you see an output similar to the following example. This example was formatted for readability:

{
    "highest": {
        "hostname":"30bec2b63a96",
        ”systemLoad": 6.1
    },
    "lowest": {
        "hostname":"55ec2b63a96",
        ”systemLoad": 0.1
    }
}

The JSON output contains a highest attribute that represents the system with the highest load. Similarly, the lowest attribute represents the system with the lowest load. The JSON output for each of these attributes contains the hostname and systemLoad of the system.

When you are done checking out the application, run the following command to stop the query microservice. Leave the system and inventory services running because they will be used when the application is rebuilt later in the guide:

docker stop query

Updating the web client to use an alternative reactive provider

pom.xml

  1<?xml version='1.0' encoding='utf-8'?>
  2<project xmlns="http://maven.apache.org/POM/4.0.0"
  3    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5    <modelVersion>4.0.0</modelVersion>
  6
  7    <groupId>io.openliberty.guides</groupId>
  8    <artifactId>query</artifactId>
  9    <version>1.0-SNAPSHOT</version>
 10    <packaging>war</packaging>
 11
 12    <properties>
 13        <maven.compiler.source>1.8</maven.compiler.source>
 14        <maven.compiler.target>1.8</maven.compiler.target>
 15        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 16        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 17        <!-- Liberty configuration -->
 18        <liberty.var.default.http.port>9080</liberty.var.default.http.port>
 19        <liberty.var.default.https.port>9443</liberty.var.default.https.port>
 20    </properties>
 21
 22    <dependencies>
 23        <!-- Provided dependencies -->
 24        <dependency>
 25            <groupId>jakarta.platform</groupId>
 26            <artifactId>jakarta.jakartaee-api</artifactId>
 27            <version>8.0.0</version>
 28            <scope>provided</scope>
 29        </dependency>
 30        <dependency>
 31            <groupId>javax.enterprise.concurrent</groupId>
 32            <artifactId>javax.enterprise.concurrent-api</artifactId>
 33            <version>1.1</version>
 34            <scope>provided</scope>
 35        </dependency>
 36        <dependency>
 37            <groupId>javax.validation</groupId>
 38            <artifactId>validation-api</artifactId>
 39            <version>2.0.1.Final</version>
 40            <scope>provided</scope>
 41        </dependency>
 42        <dependency>
 43            <groupId>org.eclipse.microprofile</groupId>
 44            <artifactId>microprofile</artifactId>
 45            <version>3.3</version>
 46            <type>pom</type>
 47            <scope>provided</scope>
 48        </dependency>
 49        <!-- Required dependencies -->
 50        <dependency>
 51            <groupId>io.openliberty.guides</groupId>
 52            <artifactId>models</artifactId>
 53            <version>1.0-SNAPSHOT</version>
 54        </dependency>
 55        <!-- Reactive dependencies -->
 56        <!-- tag::jerseyClient[] -->
 57        <dependency>
 58            <groupId>org.glassfish.jersey.core</groupId>
 59            <artifactId>jersey-client</artifactId>
 60            <version>2.30</version>
 61        </dependency>
 62        <!-- end::jerseyClient[] -->
 63        <!-- tag::jerseyRxjava[] -->
 64        <dependency>
 65            <groupId>org.glassfish.jersey.ext.rx</groupId>
 66            <artifactId>jersey-rx-client-rxjava</artifactId>
 67            <version>2.30</version>
 68        </dependency>
 69        <!-- end::jerseyRxjava[] -->
 70        <!-- tag::jerseyRxjava2[] -->
 71        <dependency>
 72            <groupId>org.glassfish.jersey.ext.rx</groupId>
 73            <artifactId>jersey-rx-client-rxjava2</artifactId>
 74            <version>2.30</version>
 75        </dependency>
 76        <!-- end::jerseyRxjava2[] -->
 77        <!-- For tests -->
 78        <dependency>
 79            <groupId>org.microshed</groupId>
 80            <artifactId>microshed-testing-liberty</artifactId>
 81            <version>0.9</version>
 82            <scope>test</scope>
 83        </dependency>
 84        <dependency>
 85            <groupId>org.testcontainers</groupId>
 86            <artifactId>mockserver</artifactId>
 87            <version>1.12.5</version>
 88            <scope>test</scope>
 89        </dependency>
 90        <dependency>
 91            <groupId>org.mock-server</groupId>
 92            <artifactId>mockserver-client-java</artifactId>
 93            <version>5.10.0</version>
 94            <scope>test</scope>
 95        </dependency>
 96        <dependency>
 97            <groupId>org.junit.jupiter</groupId>
 98            <artifactId>junit-jupiter</artifactId>
 99            <version>5.6.2</version>
100            <scope>test</scope>
101        </dependency>
102    </dependencies>
103
104    <build>
105        <finalName>${project.artifactId}</finalName>
106        <plugins>
107            <plugin>
108                <groupId>org.apache.maven.plugins</groupId>
109                <artifactId>maven-war-plugin</artifactId>
110                <version>3.2.3</version>
111                <configuration>
112                    <packagingExcludes>pom.xml</packagingExcludes>
113                </configuration>
114            </plugin>
115
116            <!-- Liberty plugin -->
117            <plugin>
118                <groupId>io.openliberty.tools</groupId>
119                <artifactId>liberty-maven-plugin</artifactId>
120                <version>3.2.2</version>
121            </plugin>
122
123            <!-- Plugin to run unit tests -->
124            <plugin>
125                <groupId>org.apache.maven.plugins</groupId>
126                <artifactId>maven-surefire-plugin</artifactId>
127                <version>2.22.2</version>
128            </plugin>
129
130            <!-- Plugin to run integration tests -->
131            <plugin>
132                <groupId>org.apache.maven.plugins</groupId>
133                <artifactId>maven-failsafe-plugin</artifactId>
134                <version>2.22.2</version>
135                <executions>
136                    <execution>
137                        <id>integration-test</id>
138                        <goals>
139                            <goal>integration-test</goal>
140                        </goals>
141                    </execution>
142                    <execution>
143                        <id>verify</id>
144                        <goals>
145                            <goal>verify</goal>
146                        </goals>
147                    </execution>
148                </executions>
149            </plugin>
150        </plugins>
151    </build>
152</project>

Although JAX-RS provides the default reactive provider that returns CompletionStage types, you can alternatively use another provider that supports other reactive frameworks like RxJava. The Apache CXF and Eclipse Jersey projects produce such providers. You’ll now update the web client to use the Jersey reactive provider for RxJava. With this updated reactive provider, you can write clients that use RxJava objects instead of clients that use only the CompletionStage interface. These custom objects provide a simpler and faster way for you to create scalable RESTful services with a CompletionStage interface.

Replace the Maven configuration file.
query/pom.xml

The jersey-rx-client-rxjava and jersey-rx-client-rxjava2 dependencies provide the RxInvokerProvider classes, which are registered to the jersey-client ClientBuilder class.

Update the client to accommodate the custom object types that you are trying to return. You’ll need to register the type of object that you want inside the client invocation.

Replace the InventoryClient interface.
query/src/main/java/io/openliberty/guides/query/client/InventoryClient.java

InventoryClient.java

 1// tag::copyright[]
 2/*******************************************************************************
 3 * Copyright (c) 2020 IBM Corporation and others.
 4 * All rights reserved. This program and the accompanying materials
 5 * are made available under the terms of the Eclipse Public License v1.0
 6 * which accompanies this distribution, and is available at
 7 * http://www.eclipse.org/legal/epl-v10.html
 8 *
 9 * Contributors:
10 *     IBM Corporation - Initial implementation
11 *******************************************************************************/
12// end::copyright[]
13package io.openliberty.guides.query.client;
14
15import java.util.List;
16import java.util.Properties;
17
18import javax.enterprise.context.RequestScoped;
19import javax.inject.Inject;
20import javax.ws.rs.client.ClientBuilder;
21import javax.ws.rs.core.GenericType;
22import javax.ws.rs.core.HttpHeaders;
23import javax.ws.rs.core.MediaType;
24
25import org.eclipse.microprofile.config.inject.ConfigProperty;
26import org.glassfish.jersey.client.rx.rxjava.RxObservableInvoker;
27import org.glassfish.jersey.client.rx.rxjava.RxObservableInvokerProvider;
28
29import rx.Observable;
30
31@RequestScoped
32public class InventoryClient {
33
34    @Inject
35    @ConfigProperty(name = "INVENTORY_BASE_URI", defaultValue = "http://localhost:9085")
36    private String baseUri;
37
38
39    public List<String> getSystems() {
40        return ClientBuilder.newClient()
41                            .target(baseUri)
42                            .path("/inventory/systems")
43                            .request()
44                            .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
45                            .get(new GenericType<List<String>>(){});
46    }
47
48    // tag::getSystem[]
49    public Observable<Properties> getSystem(String hostname) {
50        return ClientBuilder.newClient()
51                            .target(baseUri)
52                            // tag::register[]
53                            .register(RxObservableInvokerProvider.class)
54                            // end::register[]
55                            .path("/inventory/systems")
56                            .path(hostname)
57                            .request()
58                            .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
59                            // tag::rx[]
60                            .rx(RxObservableInvoker.class)
61                            // end::rx[]
62                            .get(new GenericType<Properties>(){});
63    }
64    // end::getSystem[]
65}

The return type of the getSystem() method is now an Observable object instead of a CompletionStage interface. Observable is a collection of data that waits to be subscribed to before it can release any data and is part of RxJava. The rx() method now needs to contain RxObservableInvoker.class as an argument. This argument calls the specific invoker, RxObservableInvoker, for the Observable class that’s provided by Jersey. In the getSystem() method, the register(RxObservableInvokerProvider) method call registers the RxObservableInvoker class, which means that the client can recognize the invoker provider.

In some scenarios, a producer might generate more data than the consumers can handle. JAX-RS can deal with cases like these by using the RxJava Flowable class with backpressure. To learn more about RxJava and backpressure, see JAX-RS reactive extensions with RxJava backpressure.

Updating the REST resource to support the reactive JAX-RS client

QueryResource.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2020 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License v1.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-v10.html
  8 *
  9 * Contributors:
 10 *     IBM Corporation - Initial implementation
 11 *******************************************************************************/
 12// end::copyright[]
 13package io.openliberty.guides.query;
 14
 15import java.math.BigDecimal;
 16import java.util.List;
 17import java.util.Map;
 18import java.util.Properties;
 19import java.util.concurrent.ConcurrentHashMap;
 20import java.util.concurrent.CountDownLatch;
 21import java.util.concurrent.TimeUnit;
 22
 23import javax.enterprise.context.ApplicationScoped;
 24import javax.inject.Inject;
 25import javax.ws.rs.GET;
 26import javax.ws.rs.Path;
 27import javax.ws.rs.Produces;
 28import javax.ws.rs.core.MediaType;
 29
 30import io.openliberty.guides.query.client.InventoryClient;
 31
 32@ApplicationScoped
 33@Path("/query")
 34public class QueryResource {
 35
 36    @Inject
 37    private InventoryClient inventoryClient;
 38
 39    // tag::systemload[]
 40    @GET
 41    @Path("/systemLoad")
 42    @Produces(MediaType.APPLICATION_JSON)
 43    public Map<String, Properties> systemLoad() {
 44        // tag::getSystems[]
 45        List<String> systems = inventoryClient.getSystems();
 46        // end::getSystems[]
 47        // tag::countdownlatch[]
 48        CountDownLatch remainingSystems = new CountDownLatch(systems.size());
 49        // end::countdownlatch[]
 50        // tag::holder[]
 51        final Holder systemLoads = new Holder();
 52        // end::holder[]
 53        for (String system : systems) {
 54            // tag::getSystem[]
 55            inventoryClient.getSystem(system)
 56            // end::getSystem[]
 57                            // tag::subscribe[]
 58                           .subscribe(p -> {
 59                                if (p != null) {
 60                                    systemLoads.updateValues(p);
 61                                }
 62                                // tag::countdown[]
 63                                remainingSystems.countDown();
 64                                // end::countdown[]
 65                           }, e -> {
 66                                // tag::countdown2[]
 67                                remainingSystems.countDown();
 68                                // end::countdown2[]
 69                                e.printStackTrace();
 70                           });
 71                           // end::subscribe[]
 72        }
 73
 74        // Wait for all remaining systems to be checked
 75        try {
 76            // tag::await[]
 77            remainingSystems.await(30, TimeUnit.SECONDS);
 78            // end::await[]
 79        } catch (InterruptedException e) {
 80            e.printStackTrace();
 81        }
 82
 83        return systemLoads.getValues();
 84    }
 85    // end::systemload[]
 86
 87    // tag::holderClass[]
 88    private class Holder {
 89        // tag::volatile[]
 90        private volatile Map<String, Properties> values;
 91        // end::volatile[]
 92
 93        public Holder() {
 94            // tag::concurrentHashMap[]
 95            this.values = new ConcurrentHashMap<String, Properties>();
 96            // end::concurrentHashMap[]
 97            init();
 98        }
 99
100        public Map<String, Properties> getValues() {
101            return this.values;
102        }
103
104        // tag::updateValues[]
105        public void updateValues(Properties p) {
106            final BigDecimal load = (BigDecimal) p.get("systemLoad");
107
108            this.values.computeIfPresent("lowest", (key, curr_val) -> {
109                BigDecimal lowest = (BigDecimal) curr_val.get("systemLoad");
110                return load.compareTo(lowest) < 0 ? p : curr_val;
111            });
112            this.values.computeIfPresent("highest", (key, curr_val) -> {
113                BigDecimal highest = (BigDecimal) curr_val.get("systemLoad");
114                return load.compareTo(highest) > 0 ? p : curr_val;
115            });
116        }
117        // end::updateValues[]
118
119        private void init() {
120            // Initialize highest and lowest values
121            this.values.put("highest", new Properties());
122            this.values.put("lowest", new Properties());
123            this.values.get("highest").put("hostname", "temp_max");
124            this.values.get("lowest").put("hostname", "temp_min");
125            this.values.get("highest").put("systemLoad", new BigDecimal(Double.MIN_VALUE));
126            this.values.get("lowest").put("systemLoad", new BigDecimal(Double.MAX_VALUE));
127        }
128    }
129    // end::holderClass[]
130}

Now that the client methods return the Observable class, you must update the resource to accommodate these changes.

Replace the QueryResource class.
query/src/main/java/io/openliberty/guides/query/QueryResource.java

The goal of the systemLoad() method is to return the system with the largest load and the system with the smallest load. The systemLoad endpoint first gets all of the hostnames by calling the getSystems() method. Then it loops through the hostnames and calls the getSystem() method on each one.

Instead of using the thenAcceptAsync() method, Observable uses the subscribe() method to asynchronously process data. Thus, any necessary data processing happens inside the subscribe() method. In this case, the necessary data processing is saving the data in the temporary Holder class. The Holder class is used to store the value that is returned from the client because values cannot be returned inside the subscribe() method. The highest and lowest load systems are updated in the updateValues() method.

Rebuilding and running the application

Run the Maven install and package goals from the command-line session in the start directory:

mvn -pl query package

Run the following command to containerize the query microservice:

docker build -t query:1.0-SNAPSHOT query/.

Next, use the provided script to restart the query service in a Docker container.

./scripts/startQueryContainer.sh
.\scripts\startQueryContainer.bat

You can access the application by making requests to the query/systemLoad endpoint at the http://localhost:9080/query/systemLoad URL.

Switching to a reactive programming model freed up the thread that was handling your request to query/systemLoad. While the client request is being handled, the thread can handle other work.

When you are done checking out the application, run the following script to stop the application:

./scripts/stopContainers.sh
.\scripts\stopContainers.bat

Testing the query microservice

A few tests are included for you to test the basic functionality of the query microservice. If a test failure occurs, then you might have introduced a bug into the code.

Create the QueryServiceIT class.
query/src/test/java/it/io/openliberty/guides/query/QueryServiceIT.java

The testSystemLoad() test case verifies that the query service can correctly calculate the highest and lowest system loads.

QueryServiceIT.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2020 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License v1.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-v10.html
  8 *
  9 * Contributors:
 10 *     IBM Corporation - Initial implementation
 11 *******************************************************************************/
 12// end::copyright[]
 13package it.io.openliberty.guides.query;
 14
 15import static org.junit.jupiter.api.Assertions.assertEquals;
 16
 17import java.util.Map;
 18import java.util.Properties;
 19import org.junit.jupiter.api.BeforeAll;
 20import org.junit.jupiter.api.Test;
 21import org.microshed.testing.jaxrs.RESTClient;
 22import org.microshed.testing.jupiter.MicroShedTest;
 23import org.microshed.testing.SharedContainerConfig;
 24import org.mockserver.model.HttpRequest;
 25import org.mockserver.model.HttpResponse;
 26
 27import io.openliberty.guides.query.QueryResource;
 28
 29@MicroShedTest
 30@SharedContainerConfig(AppContainerConfig.class)
 31public class QueryServiceIT {
 32
 33    @RESTClient
 34    public static QueryResource queryResource;
 35
 36    private static String testHost1 =
 37        "{" +
 38            "\"hostname\" : \"testHost1\"," +
 39            "\"systemLoad\" : 1.23" +
 40        "}";
 41    private static String testHost2 =
 42        "{" +
 43            "\"hostname\" : \"testHost2\"," +
 44            "\"systemLoad\" : 3.21" +
 45        "}";
 46    private static String testHost3 =
 47        "{" +
 48            "\"hostname\" : \"testHost3\"," +
 49            "\"systemLoad\" : 2.13" +
 50        "}";
 51
 52    @BeforeAll
 53    public static void setup() throws InterruptedException {
 54        AppContainerConfig.mockClient.when(HttpRequest.request()
 55                                         .withMethod("GET")
 56                                         .withPath("/inventory/systems"))
 57                                     .respond(HttpResponse.response()
 58                                         .withStatusCode(200)
 59                                         .withBody("[\"testHost1\"," +
 60                                                    "\"testHost2\"," +
 61                                                    "\"testHost3\"]")
 62                                         .withHeader("Content-Type", "application/json"));
 63
 64        AppContainerConfig.mockClient.when(HttpRequest.request()
 65                                         .withMethod("GET")
 66                                         .withPath("/inventory/systems/testHost1"))
 67                                     .respond(HttpResponse.response()
 68                                         .withStatusCode(200)
 69                                         .withBody(testHost1)
 70                                         .withHeader("Content-Type", "application/json"));
 71
 72        AppContainerConfig.mockClient.when(HttpRequest.request()
 73                                         .withMethod("GET")
 74                                         .withPath("/inventory/systems/testHost2"))
 75                                     .respond(HttpResponse.response()
 76                                         .withStatusCode(200)
 77                                         .withBody(testHost2)
 78                                         .withHeader("Content-Type", "application/json"));
 79
 80        AppContainerConfig.mockClient.when(HttpRequest.request()
 81                                         .withMethod("GET")
 82                                         .withPath("/inventory/systems/testHost3"))
 83                                     .respond(HttpResponse.response()
 84                                         .withStatusCode(200)
 85                                         .withBody(testHost3)
 86                                         .withHeader("Content-Type", "application/json"));
 87    }
 88
 89    // tag::testSystemLoad[]
 90    @Test
 91    public void testSystemLoad() {
 92        Map<String, Properties> response = queryResource.systemLoad();
 93        assertEquals(
 94            "testHost2",
 95            response.get("highest").get("hostname"),
 96            "Returned highest system load incorrect"
 97        );
 98        assertEquals(
 99            "testHost1",
100            response.get("lowest").get("hostname"),
101            "Returned lowest system load incorrect"
102        );
103    }
104    // end::testSystemLoad[]
105
106}

Running the tests

Navigate to the query directory, then verify that the tests pass by running the Maven verify goal:

cd query
mvn verify

When the tests succeed, you see output similar to the following example:

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running it.io.openliberty.guides.query.QueryServiceIT
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.88 s - in it.io.openliberty.guides.query.QueryServiceIT

Results:

Tests run: 1, Failures: 0, Errors: 0, Skipped: 0

Great work! You’re done!

You modified an application to make HTTP requests by using a reactive JAX-RS client with Open Liberty and Jersey’s RxJava provider.

Guide Attribution

Consuming RESTful services using the reactive JAX-RS client by Open Liberty is licensed under CC BY-ND 4.0

Copied to clipboard
Copy code block
Copy file contents

Prerequisites:

Nice work! Where to next?

What did you think of this guide?

Extreme Dislike Dislike Like Extreme Like

What could make this guide better?

Raise an issue to share feedback

Create a pull request to contribute to this guide

Need help?

Ask a question on Stack Overflow

Like Open Liberty? Star our repo on GitHub.

Star