Consuming RESTful services using the reactive JAX-RS client

duration 25 minutes

Prerequisites:

Learn how to use a reactive JAX-RS client to asynchronously invoke RESTful microservices over HTTP.

What you’ll learn

First, you’ll learn how to create a reactive JAX-RS client application by using the default reactive JAX-RS client APIs. You will then learn how to take advantage of the RxJava reactive extensions with a pluggable reactive JAX-RS client provider that’s published by Eclipse Jersey. The JAX-RS client is an API used to communicate with RESTful web services. The API makes it easy to consume a web service by using the HTTP protocol, which means that you can efficiently implement client-side applications. The reactive client extension to JAX-RS is an API that enables you to use the reactive programming model when using the JAX-RS client.

Reactive programming is an extension of asynchronous programming and focuses on the flow of data through data streams. Reactive applications process data when it becomes available and respond to requests as soon as processing is complete. The request to the application and response from the application are decoupled so that the application is not blocked from responding to other requests in the meantime. Because reactive applications can run faster than synchronous applications, they provide a much smoother user experience.

The application in this guide demonstrates how the JAX-RS client accesses remote RESTful services by using asynchronous method calls. You’ll first look at the supplied client application that uses the JAX-RS default CompletionStage-based provider. Then, you’ll modify the client application to use Jersey’s RxJava provider, which is an alternative JAX-RS reactive provider. Both Jersey and Apache CXF provide third-party reactive libraries for RxJava and were tested for use in Open Liberty.

The application that you will be working with consists of three microservices, system, inventory, and query. Every 15 seconds, the system microservice calculates and publishes an event that contains its current average system load. The inventory microservice subscribes to that information so that it can keep an updated list of all the systems and their current system loads.

Reactive Query Service

The microservice that you will modify is the query service. It communicates with the inventory service to determine which system has the highest system load and which system has the lowest system load.

The system and inventory microservices use MicroProfile Reactive Messaging to send and receive the system load events. If you want to learn more about reactive messaging, see the Creating reactive Java microservices guide.

Additional prerequisites

You need to have Docker installed. For installation instructions, refer to the official Docker documentation. You will build and run the microservices in Docker containers. An installation of Apache Kafka is provided in another Docker container.

Getting started

The fastest way to work through this guide is to clone the Git repository and use the projects that are provided inside:

git clone https://github.com/openliberty/guide-reactive-rest-client.git
cd guide-reactive-rest-client

The start directory contains the starting project that you will build upon.

The finish directory contains the finished project that you will build.

Before you begin, make sure you have all the necessary prerequisites.

Creating a web client using the default JAX-RS API

Navigate to the start directory to begin.

JAX-RS provides a default reactive provider that you can use to create a reactive REST client using the CompletionStage interface.

Create an InventoryClient class, which retrieves inventory data, and a QueryResource class, which queries data from the inventory service.

Create the InventoryClient interface.
query/src/main/java/io/openliberty/guides/query/client/InventoryClient.java

InventoryClient.java

 1// tag::copyright[]
 2/*******************************************************************************
 3 * Copyright (c) 2020, 2024 IBM Corporation and others.
 4 * All rights reserved. This program and the accompanying materials
 5 * are made available under the terms of the Eclipse Public License 2.0
 6 * which accompanies this distribution, and is available at
 7 * http://www.eclipse.org/legal/epl-2.0/
 8 *
 9 * SPDX-License-Identifier: EPL-2.0
10 *******************************************************************************/
11// end::copyright[]
12package io.openliberty.guides.query.client;
13
14import java.util.List;
15import java.util.Properties;
16import java.util.concurrent.CompletionStage;
17
18import jakarta.enterprise.context.RequestScoped;
19import jakarta.inject.Inject;
20import jakarta.ws.rs.core.MediaType;
21import jakarta.ws.rs.client.ClientBuilder;
22import jakarta.ws.rs.client.WebTarget;
23import jakarta.ws.rs.core.GenericType;
24import jakarta.ws.rs.core.HttpHeaders;
25import org.eclipse.microprofile.config.inject.ConfigProperty;
26
27@RequestScoped
28public class InventoryClient {
29
30    @Inject
31    @ConfigProperty(name = "INVENTORY_BASE_URI", defaultValue = "http://localhost:9085")
32    private String baseUri;
33
34
35    public List<String> getSystems() {
36        return ClientBuilder.newClient()
37                            .target(baseUri)
38                            .path("/inventory/systems")
39                            .request()
40                            .header(HttpHeaders.CONTENT_TYPE,
41                                    MediaType.APPLICATION_JSON)
42                            .get(new GenericType<List<String>>() { });
43    }
44
45    // tag::getSystem[]
46    public CompletionStage<Properties> getSystem(String hostname) {
47        return ClientBuilder.newClient()
48                            .target(baseUri)
49                            .path("/inventory/systems")
50                            .path(hostname)
51                            .request()
52                            .header(HttpHeaders.CONTENT_TYPE,
53                                    MediaType.APPLICATION_JSON)
54                            // tag::rx[]
55                            .rx()
56                            // end::rx[]
57                            .get(Properties.class);
58    }
59    // end::getSystem[]
60}

The getSystem() method returns the CompletionStage interface. This interface represents a unit or stage of a computation. When the associated computation completes, the value can be retrieved. The rx() method calls the CompletionStage interface. It retrieves the CompletionStageRxInvoker class and allows these methods to function correctly with the CompletionStage interface return type.

Create the QueryResource class.
query/src/main/java/io/openliberty/guides/query/QueryResource.java

QueryResource.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2020, 2024 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License 2.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-2.0/
  8 *
  9 * SPDX-License-Identifier: EPL-2.0
 10 *******************************************************************************/
 11// end::copyright[]
 12package io.openliberty.guides.query;
 13
 14import java.math.BigDecimal;
 15import java.util.List;
 16import java.util.Map;
 17import java.util.Properties;
 18import java.util.concurrent.ConcurrentHashMap;
 19import java.util.concurrent.CountDownLatch;
 20import java.util.concurrent.TimeUnit;
 21
 22import jakarta.enterprise.context.ApplicationScoped;
 23import jakarta.inject.Inject;
 24import jakarta.ws.rs.GET;
 25import jakarta.ws.rs.Path;
 26import jakarta.ws.rs.Produces;
 27import jakarta.ws.rs.core.MediaType;
 28
 29import io.openliberty.guides.query.client.InventoryClient;
 30
 31@ApplicationScoped
 32@Path("/query")
 33public class QueryResource {
 34
 35    @Inject
 36    private InventoryClient inventoryClient;
 37
 38    // tag::systemLoad[]
 39    @GET
 40    @Path("/systemLoad")
 41    @Produces(MediaType.APPLICATION_JSON)
 42    // tag::systemLoadMethod[]
 43    public Map<String, Properties> systemLoad() {
 44    // end::systemLoadMethod[]
 45        List<String> systems = inventoryClient.getSystems();
 46        // tag::countdownlatch[]
 47        CountDownLatch remainingSystems = new CountDownLatch(systems.size());
 48        // end::countdownlatch[]
 49        final Holder systemLoads = new Holder();
 50
 51        for (String system : systems) {
 52            // tag::getsystem[]
 53            inventoryClient.getSystem(system)
 54            // end::getsystem[]
 55                           // tag::thenAcceptAsync[]
 56                           .thenAcceptAsync(p -> {
 57                                if (p != null) {
 58                                    systemLoads.updateValues(p);
 59                                }
 60                                // tag::countdown1[]
 61                                remainingSystems.countDown();
 62                                // end::countdown1[]
 63                           })
 64                           // end::thenAcceptAsync[]
 65                           // tag::exceptionally[]
 66                           .exceptionally(ex -> {
 67                                // tag::countdown2[]
 68                                remainingSystems.countDown();
 69                                // end::countdown2[]
 70                                ex.printStackTrace();
 71                                return null;
 72                           });
 73                           // end::exceptionally[]
 74        }
 75
 76        // Wait for all remaining systems to be checked
 77        try {
 78            // tag::await[]
 79            remainingSystems.await(30, TimeUnit.SECONDS);
 80            // end::await[]
 81        } catch (InterruptedException e) {
 82            e.printStackTrace();
 83        }
 84
 85        return systemLoads.getValues();
 86    }
 87    // end::systemLoad[]
 88
 89    private class Holder {
 90        // tag::volatile[]
 91        private volatile Map<String, Properties> values;
 92        // end::volatile[]
 93
 94        Holder() {
 95            // tag::concurrentHashMap[]
 96            this.values = new ConcurrentHashMap<String, Properties>();
 97            // end::concurrentHashMap[]
 98            init();
 99        }
100
101        public Map<String, Properties> getValues() {
102            return this.values;
103        }
104
105        public void updateValues(Properties p) {
106            final BigDecimal load = (BigDecimal) p.get("systemLoad");
107
108            this.values.computeIfPresent("lowest", (key, curr_val) -> {
109                BigDecimal lowest = (BigDecimal) curr_val.get("systemLoad");
110                return load.compareTo(lowest) < 0 ? p : curr_val;
111            });
112            this.values.computeIfPresent("highest", (key, curr_val) -> {
113                BigDecimal highest = (BigDecimal) curr_val.get("systemLoad");
114                return load.compareTo(highest) > 0 ? p : curr_val;
115            });
116        }
117
118        private void init() {
119            // Initialize highest and lowest values
120            this.values.put("highest", new Properties());
121            this.values.put("lowest", new Properties());
122            this.values.get("highest").put("hostname", "temp_max");
123            this.values.get("lowest").put("hostname", "temp_min");
124            this.values.get("highest")
125                .put("systemLoad", new BigDecimal(Double.MIN_VALUE));
126            this.values.get("lowest")
127                .put("systemLoad", new BigDecimal(Double.MAX_VALUE));
128        }
129    }
130}

The systemLoad endpoint asynchronously processes the data that is retrieved by the InventoryClient interface and serves that data after all of the services respond. The thenAcceptAsync() and exceptionally() methods together behave like an asynchronous try-catch block. The data is processed in the thenAcceptAsync() method only after the CompletionStage interface finishes retrieving it. When you return a CompletionStage type in the resource, it doesn’t necessarily mean that the computation completed and the response was built.

A CountDownLatch object is used to track how many asynchronous requests are being waited on. After each thread is completed, the countdown() methodcounts the CountDownLatch object down towards 0. This means that the value returns only after the thread that’s retrieving the value is complete.The await() method stops and waits until all of the requests are complete. While the countdown completes, the main thread is free to perform other tasks. In this case, no such task is present.

Building and running the application

The system, inventory, and query microservices will be built in Docker containers. If you want to learn more about Docker containers, check out the Containerizing microservices guide.

Start your Docker environment.

To build the application, run the Maven install and package goals from the command-line session in the start directory:

mvn -pl models install
mvn package

Run the following commands to containerize the microservices:

docker build -t system:1.0-SNAPSHOT system/.
docker build -t inventory:1.0-SNAPSHOT inventory/.
docker build -t query:1.0-SNAPSHOT query/.

Next, use the provided script to start the application in Docker containers. The script creates a network for the containers to communicate with each other. It creates containers for Kafka and all of the microservices in the project.

.\scripts\startContainers.bat
./scripts/startContainers.sh

The microservices will take some time to become available. See the http://localhost:9085/health and http://localhost:9080/health URLs to confirm that the inventory and query microservices are up and running. Once the microservices are up and running, you can access the application by making requests to the query/systemLoad endpoint at the http://localhost:9080/query/systemLoad URL.

When the service is ready, you see an output similar to the following example. This example was formatted for readability:

{
    "highest": {
        "hostname":"30bec2b63a96",
        ”systemLoad": 6.1
    },
    "lowest": {
        "hostname":"55ec2b63a96",
        ”systemLoad": 0.1
    }
}

The JSON output contains a highest attribute that represents the system with the highest load. Similarly, the lowest attribute represents the system with the lowest load. The JSON output for each of these attributes contains the hostname and systemLoad of the system.

When you are done checking out the application, run the following command to stop the query microservice. Leave the system and inventory services running because they will be used when the application is rebuilt later in the guide:

docker stop query

Updating the web client to use an alternative reactive provider

Although JAX-RS provides the default reactive provider that returns CompletionStage types, you can alternatively use another provider that supports other reactive frameworks like RxJava. The Apache CXF and Eclipse Jersey projects produce such providers. You’ll now update the web client to use the Jersey reactive provider for RxJava. With this updated reactive provider, you can write clients that use RxJava objects instead of clients that use only the CompletionStage interface. These custom objects provide a simpler and faster way for you to create scalable RESTful services with a CompletionStage interface.

Replace the Maven configuration file.
query/pom.xml

pom.xml

  1<?xml version='1.0' encoding='utf-8'?>
  2<project xmlns="http://maven.apache.org/POM/4.0.0"
  3    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5    <modelVersion>4.0.0</modelVersion>
  6
  7    <groupId>io.openliberty.guides</groupId>
  8    <artifactId>query</artifactId>
  9    <version>1.0-SNAPSHOT</version>
 10    <packaging>war</packaging>
 11
 12    <properties>
 13        <maven.compiler.source>11</maven.compiler.source>
 14        <maven.compiler.target>11</maven.compiler.target>
 15        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 16        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 17        <!-- Liberty configuration -->
 18        <liberty.var.http.port>9080</liberty.var.http.port>
 19        <liberty.var.default.https.port>9443</liberty.var.default.https.port>
 20    </properties>
 21
 22    <dependencies>
 23        <!-- Provided dependencies -->
 24        <dependency>
 25            <groupId>jakarta.platform</groupId>
 26            <artifactId>jakarta.jakartaee-api</artifactId>
 27            <version>10.0.0</version>
 28            <scope>provided</scope>
 29        </dependency>
 30        <dependency>
 31            <groupId>jakarta.enterprise.concurrent</groupId>
 32            <artifactId>jakarta.enterprise.concurrent-api</artifactId>
 33            <version>3.0.3</version>
 34            <scope>provided</scope>
 35        </dependency>
 36        <dependency>
 37            <groupId>jakarta.validation</groupId>
 38            <artifactId>jakarta.validation-api</artifactId>
 39            <version>3.0.2</version>
 40            <scope>provided</scope>
 41        </dependency>
 42        <dependency>
 43            <groupId>org.eclipse.microprofile</groupId>
 44            <artifactId>microprofile</artifactId>
 45            <version>6.1</version>
 46            <type>pom</type>
 47            <scope>provided</scope>
 48        </dependency>
 49        <!-- Required dependencies -->
 50        <dependency>
 51            <groupId>io.openliberty.guides</groupId>
 52            <artifactId>models</artifactId>
 53            <version>1.0-SNAPSHOT</version>
 54        </dependency>
 55        <!-- Reactive dependencies -->
 56        <!-- tag::jerseyClient[] -->
 57        <dependency>
 58            <groupId>org.glassfish.jersey.core</groupId>
 59            <artifactId>jersey-client</artifactId>
 60            <version>3.1.8</version>
 61        </dependency>
 62        <!-- end::jerseyClient[] -->
 63        <!-- tag::jerseyRxjava[] -->
 64        <dependency>
 65            <groupId>org.glassfish.jersey.ext.rx</groupId>
 66            <artifactId>jersey-rx-client-rxjava</artifactId>
 67            <version>3.1.8</version>
 68        </dependency>
 69        <!-- end::jerseyRxjava[] -->
 70        <!-- tag::jerseyRxjava2[] -->
 71        <dependency>
 72            <groupId>org.glassfish.jersey.ext.rx</groupId>
 73            <artifactId>jersey-rx-client-rxjava2</artifactId>
 74            <version>3.1.8</version>
 75        </dependency>
 76        <!-- end::jerseyRxjava2[] -->
 77        <!-- For tests -->
 78        <dependency>
 79            <groupId>org.testcontainers</groupId>
 80            <artifactId>mockserver</artifactId>
 81            <version>1.20.1</version>
 82            <scope>test</scope>
 83        </dependency>
 84        <dependency>
 85            <groupId>org.mock-server</groupId>
 86            <artifactId>mockserver-client-java</artifactId>
 87            <version>5.15.0</version>
 88            <scope>test</scope>
 89        </dependency>
 90        <dependency>
 91            <groupId>org.junit.jupiter</groupId>
 92            <artifactId>junit-jupiter</artifactId>
 93            <version>5.11.0</version>
 94            <scope>test</scope>
 95        </dependency>
 96        <dependency>
 97            <groupId>org.testcontainers</groupId>
 98            <artifactId>junit-jupiter</artifactId>
 99            <version>1.20.1</version>
100            <scope>test</scope>
101        </dependency>
102        <dependency>
103            <groupId>org.glassfish.jersey.ext</groupId>
104            <artifactId>jersey-proxy-client</artifactId>
105            <version>3.1.8</version>
106            <scope>test</scope>
107        </dependency>
108        <dependency>
109            <groupId>org.glassfish.jersey.media</groupId>
110            <artifactId>jersey-media-json-jackson</artifactId>
111            <version>3.1.8</version>
112            <scope>test</scope>
113        </dependency>
114        <dependency>
115            <groupId>org.glassfish.jersey.inject</groupId>
116            <artifactId>jersey-hk2</artifactId>
117            <version>3.1.8</version>
118                <scope>test</scope>
119        </dependency>
120        <dependency>
121            <groupId>org.slf4j</groupId>
122            <artifactId>slf4j-api</artifactId>
123            <version>2.0.16</version>
124            <scope>test</scope>
125        </dependency>
126        <dependency>
127            <groupId>org.slf4j</groupId>
128            <artifactId>slf4j-simple</artifactId>
129            <version>2.0.16</version>
130            <scope>test</scope>
131        </dependency>
132        <dependency>
133            <groupId>com.fasterxml.jackson.core</groupId>
134            <artifactId>jackson-core</artifactId>
135            <version>2.17.2</version>
136            <scope>test</scope>
137        </dependency>
138    </dependencies>
139
140    <build>
141        <finalName>${project.artifactId}</finalName>
142        <plugins>
143            <plugin>
144                <groupId>org.apache.maven.plugins</groupId>
145                <artifactId>maven-war-plugin</artifactId>
146                <version>3.4.0</version>
147                <configuration>
148                    <packagingExcludes>pom.xml</packagingExcludes>
149                </configuration>
150            </plugin>
151
152            <!-- Liberty plugin -->
153            <plugin>
154                <groupId>io.openliberty.tools</groupId>
155                <artifactId>liberty-maven-plugin</artifactId>
156                <version>3.10.3</version>
157                <configuration>
158                    <containerRunOpts>
159                        -e INVENTORY_BASE_URI=http://mock-server:1080
160                        --network=reactive-app
161                    </containerRunOpts>
162                </configuration>
163            </plugin>
164
165            <!-- Plugin to run unit tests -->
166            <plugin>
167                <groupId>org.apache.maven.plugins</groupId>
168                <artifactId>maven-surefire-plugin</artifactId>
169                <version>3.5.0</version>
170            </plugin>
171
172            <!-- Plugin to run integration tests -->
173            <plugin>
174                <groupId>org.apache.maven.plugins</groupId>
175                <artifactId>maven-failsafe-plugin</artifactId>
176                <version>3.5.0</version>
177                <executions>
178                    <execution>
179                        <id>integration-test</id>
180                        <goals>
181                            <goal>integration-test</goal>
182                        </goals>
183                    </execution>
184                    <execution>
185                        <id>verify</id>
186                        <goals>
187                            <goal>verify</goal>
188                        </goals>
189                    </execution>
190                </executions>
191            </plugin>
192        </plugins>
193    </build>
194</project>

The jersey-rx-client-rxjava and jersey-rx-client-rxjava2 dependencies provide the RxInvokerProvider classes, which are registered to the jersey-client ClientBuilder class.

Update the client to accommodate the custom object types that you are trying to return. You’ll need to register the type of object that you want inside the client invocation.

Replace the InventoryClient interface.
query/src/main/java/io/openliberty/guides/query/client/InventoryClient.java

InventoryClient.java

 1// tag::copyright[]
 2/*******************************************************************************
 3 * Copyright (c) 2020, 2024 IBM Corporation and others.
 4 * All rights reserved. This program and the accompanying materials
 5 * are made available under the terms of the Eclipse Public License 2.0
 6 * which accompanies this distribution, and is available at
 7 * http://www.eclipse.org/legal/epl-2.0/
 8 *
 9 * SPDX-License-Identifier: EPL-2.0
10 *******************************************************************************/
11// end::copyright[]
12package io.openliberty.guides.query.client;
13
14import java.util.List;
15import java.util.Properties;
16
17import jakarta.enterprise.context.RequestScoped;
18import jakarta.inject.Inject;
19import jakarta.ws.rs.client.ClientBuilder;
20import jakarta.ws.rs.core.GenericType;
21import jakarta.ws.rs.core.HttpHeaders;
22import jakarta.ws.rs.core.MediaType;
23
24import org.eclipse.microprofile.config.inject.ConfigProperty;
25import org.glassfish.jersey.client.rx.rxjava.RxObservableInvoker;
26import org.glassfish.jersey.client.rx.rxjava.RxObservableInvokerProvider;
27
28import rx.Observable;
29
30@RequestScoped
31public class InventoryClient {
32
33    @Inject
34    @ConfigProperty(name = "INVENTORY_BASE_URI", defaultValue = "http://localhost:9085")
35    private String baseUri;
36
37    public List<String> getSystems() {
38        return ClientBuilder.newClient()
39                            .target(baseUri)
40                            .path("/inventory/systems")
41                            .request()
42                            .header(HttpHeaders.CONTENT_TYPE,
43                                    MediaType.APPLICATION_JSON)
44                            .get(new GenericType<List<String>>() { });
45    }
46
47    // tag::getSystem[]
48    public Observable<Properties> getSystem(String hostname) {
49        return ClientBuilder.newClient()
50                            .target(baseUri)
51                            // tag::register[]
52                            .register(RxObservableInvokerProvider.class)
53                            // end::register[]
54                            .path("/inventory/systems")
55                            .path(hostname)
56                            .request()
57                            .header(HttpHeaders.CONTENT_TYPE,
58                                    MediaType.APPLICATION_JSON)
59                            // tag::rx[]
60                            .rx(RxObservableInvoker.class)
61                            // end::rx[]
62                            .get(new GenericType<Properties>() { });
63    }
64    // end::getSystem[]
65}

The return type of the getSystem() method is now an Observable object instead of a CompletionStage interface. Observable is a collection of data that waits to be subscribed to before it can release any data and is part of RxJava. The rx() method now needs to contain RxObservableInvoker.class as an argument. This argument calls the specific invoker, RxObservableInvoker, for the Observable class that’s provided by Jersey.

In the getSystem() method, the register(RxObservableInvokerProvider) method call registers the RxObservableInvoker class,which means that the client can recognize the invoker provider.

In some scenarios, a producer might generate more data than the consumers can handle. JAX-RS can deal with cases like these by using the RxJava Flowable class with backpressure. To learn more about RxJava and backpressure, see JAX-RS reactive extensions with RxJava backpressure.

Updating the REST resource to support the reactive JAX-RS client

Now that the client methods return the Observable class, you must update the resource to accommodate these changes.

Replace the QueryResource class.
query/src/main/java/io/openliberty/guides/query/QueryResource.java

QueryResource.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2020, 2024 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License 2.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-2.0/
  8 *
  9 * SPDX-License-Identifier: EPL-2.0
 10 *******************************************************************************/
 11// end::copyright[]
 12package io.openliberty.guides.query;
 13
 14import java.math.BigDecimal;
 15import java.util.List;
 16import java.util.Map;
 17import java.util.Properties;
 18import java.util.concurrent.ConcurrentHashMap;
 19import java.util.concurrent.CountDownLatch;
 20import java.util.concurrent.TimeUnit;
 21
 22import jakarta.enterprise.context.ApplicationScoped;
 23import jakarta.inject.Inject;
 24import jakarta.ws.rs.GET;
 25import jakarta.ws.rs.Path;
 26import jakarta.ws.rs.Produces;
 27import jakarta.ws.rs.core.MediaType;
 28
 29import io.openliberty.guides.query.client.InventoryClient;
 30
 31@ApplicationScoped
 32@Path("/query")
 33public class QueryResource {
 34
 35    @Inject
 36    private InventoryClient inventoryClient;
 37
 38    // tag::systemload[]
 39    @GET
 40    @Path("/systemLoad")
 41    @Produces(MediaType.APPLICATION_JSON)
 42    public Map<String, Properties> systemLoad() {
 43        // tag::getSystems[]
 44        List<String> systems = inventoryClient.getSystems();
 45        // end::getSystems[]
 46        // tag::countdownlatch[]
 47        CountDownLatch remainingSystems = new CountDownLatch(systems.size());
 48        // end::countdownlatch[]
 49        // tag::holder[]
 50        final Holder systemLoads = new Holder();
 51        // end::holder[]
 52        for (String system : systems) {
 53            // tag::getSystem[]
 54            inventoryClient.getSystem(system)
 55            // end::getSystem[]
 56                            // tag::subscribe[]
 57                           .subscribe(p -> {
 58                                if (p != null) {
 59                                    systemLoads.updateValues(p);
 60                                }
 61                                // tag::countdown[]
 62                                remainingSystems.countDown();
 63                                // end::countdown[]
 64                           }, e -> {
 65                                // tag::countdown2[]
 66                                remainingSystems.countDown();
 67                                // end::countdown2[]
 68                                e.printStackTrace();
 69                           });
 70                           // end::subscribe[]
 71        }
 72
 73        // Wait for all remaining systems to be checked
 74        try {
 75            // tag::await[]
 76            remainingSystems.await(30, TimeUnit.SECONDS);
 77            // end::await[]
 78        } catch (InterruptedException e) {
 79            e.printStackTrace();
 80        }
 81
 82        return systemLoads.getValues();
 83    }
 84    // end::systemload[]
 85
 86    // tag::holderClass[]
 87    private class Holder {
 88        // tag::volatile[]
 89        private volatile Map<String, Properties> values;
 90        // end::volatile[]
 91
 92        Holder() {
 93            // tag::concurrentHashMap[]
 94            this.values = new ConcurrentHashMap<String, Properties>();
 95            // end::concurrentHashMap[]
 96            init();
 97        }
 98
 99        public Map<String, Properties> getValues() {
100            return this.values;
101        }
102
103        // tag::updateValues[]
104        public void updateValues(Properties p) {
105            final BigDecimal load = (BigDecimal) p.get("systemLoad");
106
107            this.values.computeIfPresent("lowest", (key, curr_val) -> {
108                BigDecimal lowest = (BigDecimal) curr_val.get("systemLoad");
109                return load.compareTo(lowest) < 0 ? p : curr_val;
110            });
111            this.values.computeIfPresent("highest", (key, curr_val) -> {
112                BigDecimal highest = (BigDecimal) curr_val.get("systemLoad");
113                return load.compareTo(highest) > 0 ? p : curr_val;
114            });
115        }
116        // end::updateValues[]
117
118        private void init() {
119            // Initialize highest and lowest values
120            this.values.put("highest", new Properties());
121            this.values.put("lowest", new Properties());
122            this.values.get("highest").put("hostname", "temp_max");
123            this.values.get("lowest").put("hostname", "temp_min");
124            this.values.get("highest")
125                .put("systemLoad", new BigDecimal(Double.MIN_VALUE));
126            this.values.get("lowest")
127                .put("systemLoad", new BigDecimal(Double.MAX_VALUE));
128        }
129    }
130    // end::holderClass[]
131}

The goal of the systemLoad() method is to return the system with the largest load and the system with the smallest load. The systemLoad endpoint first gets all of the hostnames by calling the getSystems() method. Then it loops through the hostnames and calls the getSystem() method on each one.

Instead of using the thenAcceptAsync() method, Observable uses the subscribe() method to asynchronously process data. Thus, any necessary data processing happens inside the subscribe() method. In this case, the necessary data processing is saving the data in the temporary Holder class. The Holder class is used to store the value that is returned from the client because values cannot be returned inside the subscribe() method. The highest and lowest load systems are updated in the updateValues() method.

Rebuilding and running the application

Run the Maven install and package goals from the command-line session in the start directory:

mvn -pl query package

Run the following command to containerize the query microservice:

docker build -t query:1.0-SNAPSHOT query/.

Next, use the provided script to restart the query service in a Docker container.

.\scripts\startQueryContainer.bat
./scripts/startQueryContainer.sh

See the http://localhost:9080/health URL to confirm that the query microservice is up and running. Once the query microservice is up and running, you can access the application by making requests to the query/systemLoad endpoint at the http://localhost:9080/query/systemLoad URL.

Switching to a reactive programming model freed up the thread that was handling your request to query/systemLoad. While the client request is being handled, the thread can handle other work.

When you are done checking out the application, run the following script to stop the application:

.\scripts\stopContainers.bat
./scripts/stopContainers.sh

Testing the query microservice

A few tests are included for you to test the basic functionality of the query microservice. If a test failure occurs, then you might have introduced a bug into the code.

Create the QueryServiceIT class.
query/src/test/java/it/io/openliberty/guides/query/QueryServiceIT.java

QueryServiceIT.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2020, 2024 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License 2.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-2.0/
  8 *
  9 * SPDX-License-Identifier: EPL-2.0
 10 *******************************************************************************/
 11// end::copyright[]
 12package it.io.openliberty.guides.query;
 13
 14import static org.junit.jupiter.api.Assertions.assertEquals;
 15
 16import java.net.Socket;
 17import java.nio.file.Paths;
 18import java.time.Duration;
 19import java.util.Map;
 20import java.util.Properties;
 21
 22import org.glassfish.jersey.client.ClientConfig;
 23import org.glassfish.jersey.client.JerseyClient;
 24import org.glassfish.jersey.client.JerseyClientBuilder;
 25import org.glassfish.jersey.client.JerseyWebTarget;
 26import org.glassfish.jersey.client.proxy.WebResourceFactory;
 27import org.junit.jupiter.api.AfterAll;
 28import org.junit.jupiter.api.BeforeAll;
 29import org.junit.jupiter.api.BeforeEach;
 30import org.junit.jupiter.api.Test;
 31import org.junit.runner.Description;
 32import org.junit.runners.model.Statement;
 33import org.mockserver.client.MockServerClient;
 34import org.mockserver.model.HttpRequest;
 35import org.mockserver.model.HttpResponse;
 36import org.slf4j.Logger;
 37import org.slf4j.LoggerFactory;
 38import org.testcontainers.containers.GenericContainer;
 39import org.testcontainers.containers.MockServerContainer;
 40import org.testcontainers.containers.Network;
 41import org.testcontainers.containers.output.Slf4jLogConsumer;
 42import org.testcontainers.containers.wait.strategy.Wait;
 43import org.testcontainers.images.builder.ImageFromDockerfile;
 44import org.testcontainers.utility.DockerImageName;
 45
 46public class QueryServiceIT {
 47
 48    private static Logger logger = LoggerFactory.getLogger(QueryServiceIT.class);
 49
 50    public static QueryResourceClient client;
 51
 52    private static boolean isServiceRunning;
 53    private static Network network = createNetwork();
 54
 55    private static String testHost1 =
 56        "{"
 57            + "\"hostname\" : \"testHost1\","
 58            + "\"systemLoad\" : 1.23"
 59        + "}";
 60    private static String testHost2 =
 61        "{"
 62            + "\"hostname\" : \"testHost2\","
 63            + "\"systemLoad\" : 3.21"
 64        + "}";
 65    private static String testHost3 =
 66        "{" + "\"hostname\" : \"testHost3\","
 67            + "\"systemLoad\" : 2.13"
 68        + "}";
 69
 70    private static ImageFromDockerfile queryImage =
 71        new ImageFromDockerfile("query:1.0-SNAPSHOT")
 72            .withDockerfile(Paths.get("./Dockerfile"));
 73
 74    public static final DockerImageName MOCKSERVER_IMAGE =
 75        DockerImageName.parse("mockserver/mockserver")
 76            .withTag("mockserver-"
 77                + MockServerClient.class.getPackage().getImplementationVersion());
 78
 79    public static MockServerContainer mockServer =
 80        new MockServerContainer(MOCKSERVER_IMAGE)
 81            .withNetworkAliases("mock-server")
 82            .withNetwork(network);
 83
 84    public static MockServerClient mockClient;
 85
 86    private static GenericContainer<?> queryContainer =
 87        new GenericContainer(queryImage)
 88            .withNetwork(network)
 89            .withExposedPorts(9080)
 90            .waitingFor(Wait.forLogMessage("^.*CWWKF0011I.*$", 1))
 91            .withStartupTimeout(Duration.ofMinutes(3))
 92            .withLogConsumer(new Slf4jLogConsumer(logger))
 93            .dependsOn(mockServer);
 94
 95    private static QueryResourceClient createRestClient(String urlPath) {
 96        ClientConfig config = new ClientConfig();
 97        JerseyClient jerseyClient = JerseyClientBuilder.createClient(config);
 98        JerseyWebTarget target = jerseyClient.target(urlPath);
 99        return WebResourceFactory.newResource(QueryResourceClient.class, target);
100    }
101
102    private static boolean isServiceRunning(String host, int port) {
103        try {
104            Socket socket = new Socket(host, port);
105            socket.close();
106            return true;
107        } catch (Exception e) {
108            return false;
109        }
110    }
111
112    private static Network createNetwork() {
113        if (isServiceRunning("localhost", 9080)) {
114            isServiceRunning = true;
115            return new Network() {
116
117                @Override
118                public Statement apply(Statement base, Description description) {
119                    return null;
120                }
121
122                @Override
123                public String getId() {
124                    return "reactive-app";
125                }
126
127                @Override
128                public void close() {
129                }
130            };
131        } else {
132            isServiceRunning = false;
133            return Network.newNetwork();
134        }
135    }
136
137    @BeforeAll
138    public static void startContainers() {
139        mockServer.start();
140        mockClient = new MockServerClient(
141            mockServer.getHost(),
142            mockServer.getServerPort());
143        String urlPath;
144        if (isServiceRunning) {
145            System.out.println("Testing with mvn liberty:devc");
146            urlPath = "http://localhost:9080";
147        } else {
148            System.out.println("Testing with mvn verify");
149            queryContainer.withEnv(
150                "INVENTORY_BASE_URI",
151                "http://mock-server:" + MockServerContainer.PORT);
152            queryContainer.start();
153            urlPath = "http://"
154                      + queryContainer.getHost()
155                      + ":" + queryContainer.getFirstMappedPort();
156        }
157
158        System.out.println("Creating REST client with: " + urlPath);
159        client = createRestClient(urlPath);
160    }
161
162    @BeforeEach
163    public void setup() throws InterruptedException {
164        mockClient.when(HttpRequest.request()
165                        .withMethod("GET")
166                        .withPath("/inventory/systems"))
167                    .respond(HttpResponse.response()
168                        .withStatusCode(200)
169                        .withBody("[\"testHost1\","
170                                  + "\"testHost2\","
171                                  + "\"testHost3\"]")
172                        .withHeader("Content-Type", "application/json"));
173
174        mockClient.when(HttpRequest.request()
175                        .withMethod("GET")
176                        .withPath("/inventory/systems/testHost1"))
177                    .respond(HttpResponse.response()
178                        .withStatusCode(200)
179                        .withBody(testHost1)
180                        .withHeader("Content-Type", "application/json"));
181
182        mockClient.when(HttpRequest.request()
183                        .withMethod("GET")
184                        .withPath("/inventory/systems/testHost2"))
185                    .respond(HttpResponse.response()
186                        .withStatusCode(200)
187                        .withBody(testHost2)
188                        .withHeader("Content-Type", "application/json"));
189
190        mockClient.when(HttpRequest.request()
191                        .withMethod("GET")
192                        .withPath("/inventory/systems/testHost3"))
193                    .respond(HttpResponse.response()
194                        .withStatusCode(200)
195                        .withBody(testHost3)
196                        .withHeader("Content-Type", "application/json"));
197    }
198
199    @AfterAll
200    public static void stopContainers() {
201        if (!isServiceRunning) {
202            queryContainer.stop();
203        }
204        mockClient.close();
205        mockServer.stop();
206        network.close();
207    }
208
209    // tag::testSystemLoad[]
210    @Test
211    public void testSystemLoad() {
212        Map<String, Properties> response = client.systemLoad();
213        assertEquals(
214            "testHost2",
215            response.get("highest").get("hostname"),
216            "Returned highest system load incorrect"
217        );
218        assertEquals(
219            "testHost1",
220            response.get("lowest").get("hostname"),
221            "Returned lowest system load incorrect"
222        );
223    }
224    // end::testSystemLoad[]
225}

The testSystemLoad() test case verifies that the query service can correctly calculate the highest and lowest system loads.

Running the tests

Navigate to the query directory, then verify that the tests pass by running the Maven verify goal:

cd query
mvn verify
export TESTCONTAINERS_RYUK_DISABLED=true
cd query
mvn verify

For more information about disabling Ryuk, see the Testcontainers custom configuration document.

When the tests succeed, you see output similar to the following example:

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running it.io.openliberty.guides.query.QueryServiceIT
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.88 s - in it.io.openliberty.guides.query.QueryServiceIT

Results:

Tests run: 1, Failures: 0, Errors: 0, Skipped: 0

Great work! You’re done!

You modified an application to make HTTP requests by using a reactive JAX-RS client with Open Liberty and Jersey’s RxJava provider.

Guide Attribution

Consuming RESTful services using the reactive JAX-RS client by Open Liberty is licensed under CC BY-ND 4.0

Copy file contents
Copied to clipboard

Prerequisites:

Nice work! Where to next?

What did you think of this guide?

Extreme Dislike Dislike Like Extreme Like

What could make this guide better?

Raise an issue to share feedback

Create a pull request to contribute to this guide

Need help?

Ask a question on Stack Overflow

Like Open Liberty? Star our repo on GitHub.

Star