Integrating RESTful services with a reactive system

duration 25 minutes

Prerequisites:

Learn how to integrate RESTful Java microservices with a reactive system by using MicroProfile Reactive Messaging.

What you’ll learn

You will learn how to integrate RESTful Java microservices with a reactive system by using MicroProfile Reactive Messaging. RESTful Java microservices don’t use reactive concepts, so you will learn how to bridge the gap between the two using the RxJava library. In this guide, you will modify two microservices in an application so that when a user hits the RESTful endpoint, the microservice generates producer events.

The application in this guide consists of two microservices, system and inventory. The following diagram illustrates the application:

Reactive system inventory

Every 15 seconds, the system microservice calculates and publishes events that contain its current average system load. The inventory microservice subscribes to that information so that it can keep an updated list of all the systems and their current system loads. The current inventory of systems can be accessed via the /systems REST endpoint.

You will update the inventory microservice to subscribe to a PUT request response. This PUT request response accepts a specific system property in the request body, queries that system property on the system microservice, and provides the response. You will also update the system microservice to handle receiving and sending events that are produced by the new endpoint. You will configure new channels to handle the events that are sent and received by the new endpoint. To learn more about how the reactive Java services that are used in this guide work, check out the Creating reactive Java microservices guide.

Additional prerequisites

You need to have Docker installed. For installation instructions, refer to the official Docker documentation. You will build and run the microservices in Docker containers. An installation of Apache Kafka is provided in another Docker container.

Getting started

The fastest way to work through this guide is to clone the Git repository and use the projects that are provided inside:

git clone https://github.com/openliberty/guide-microprofile-reactive-messaging-rest-integration.git
cd guide-microprofile-reactive-messaging-rest-integration

The start directory contains the starting project that you will build upon.

The finish directory contains the finished project that you will build.

Before you begin, make sure you have all the necessary prerequisites.

Adding a REST endpoint that produces events

Navigate to the start directory to begin.

The inventory microservice records and stores the average system load information from all of the connected system microservices. However, the inventory microservice does not contain an accessible REST endpoint to control the sending or receiving of reactive messages. Add the /data RESTful endpoint to the inventory service by replacing the InventoryResource class with an updated version of the class.

Replace the InventoryResource class.
inventory/src/main/java/io/openliberty/guides/inventory/InventoryResource.java

InventoryResource.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2020, 2024 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License 2.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-2.0/
  8 *
  9 * SPDX-License-Identifier: EPL-2.0
 10 *******************************************************************************/
 11// end::copyright[]
 12package io.openliberty.guides.inventory;
 13
 14import java.util.List;
 15import java.util.Optional;
 16import java.util.Properties;
 17import java.util.logging.Logger;
 18import java.util.stream.Collectors;
 19
 20import jakarta.enterprise.context.ApplicationScoped;
 21import jakarta.inject.Inject;
 22import jakarta.ws.rs.Consumes;
 23import jakarta.ws.rs.DELETE;
 24import jakarta.ws.rs.GET;
 25import jakarta.ws.rs.PUT;
 26import jakarta.ws.rs.Path;
 27import jakarta.ws.rs.PathParam;
 28import jakarta.ws.rs.Produces;
 29import jakarta.ws.rs.core.MediaType;
 30import jakarta.ws.rs.core.Response;
 31
 32import org.eclipse.microprofile.reactive.messaging.Incoming;
 33import org.eclipse.microprofile.reactive.messaging.Outgoing;
 34import org.reactivestreams.Publisher;
 35
 36import io.openliberty.guides.models.PropertyMessage;
 37import io.openliberty.guides.models.SystemLoad;
 38import io.reactivex.rxjava3.core.BackpressureStrategy;
 39import io.reactivex.rxjava3.core.Flowable;
 40import io.reactivex.rxjava3.core.FlowableEmitter;
 41
 42
 43@ApplicationScoped
 44//tag::inventoryEndPoint[]
 45@Path("/inventory")
 46//end::inventoryEndPoint[]
 47public class InventoryResource {
 48
 49    private static Logger logger = Logger.getLogger(InventoryResource.class.getName());
 50    // tag::flowableEmitterDecl[]
 51    private FlowableEmitter<String> propertyNameEmitter;
 52    // end::flowableEmitterDecl[]
 53
 54    @Inject
 55    private InventoryManager manager;
 56
 57    @GET
 58    @Path("/systems")
 59    @Produces(MediaType.APPLICATION_JSON)
 60    public Response getSystems() {
 61        List<Properties> systems = manager.getSystems()
 62                                          .values()
 63                                          .stream()
 64                                          .collect(Collectors.toList());
 65        return Response.status(Response.Status.OK)
 66                       .entity(systems)
 67                       .build();
 68    }
 69
 70    @GET
 71    @Path("/systems/{hostname}")
 72    @Produces(MediaType.APPLICATION_JSON)
 73    public Response getSystem(@PathParam("hostname") String hostname) {
 74        Optional<Properties> system = manager.getSystem(hostname);
 75        if (system.isPresent()) {
 76            return Response.status(Response.Status.OK)
 77                           .entity(system)
 78                           .build();
 79        }
 80        return Response.status(Response.Status.NOT_FOUND)
 81                       .entity("hostname does not exist.")
 82                       .build();
 83    }
 84
 85    // tag::updateSystemProperty[]
 86    // tag::annotatedPut[]
 87    @PUT
 88    // end::annotatedPut[]
 89    // tag::putPath[]
 90    @Path("/data")
 91    // end::putPath[]
 92    @Produces(MediaType.APPLICATION_JSON)
 93    @Consumes(MediaType.TEXT_PLAIN)
 94    public Response updateSystemProperty(String propertyName) {
 95        logger.info("updateSystemProperty: " + propertyName);
 96        // tag::flowableEmitter[]
 97        propertyNameEmitter.onNext(propertyName);
 98        // end::flowableEmitter[]
 99        return Response
100                 .status(Response.Status.OK)
101                 .entity("Request successful for the " + propertyName + " property\n")
102                 .build();
103    }
104    // end::updateSystemProperty[]
105
106    @DELETE
107    @Produces(MediaType.APPLICATION_JSON)
108    public Response resetSystems() {
109        manager.resetSystems();
110        return Response.status(Response.Status.OK)
111                       .build();
112    }
113
114    // tag::updateStatus[]
115    // tag::systemLoad[]
116    @Incoming("systemLoad")
117    // end::systemLoad[]
118    public void updateStatus(SystemLoad sl)  {
119        String hostname = sl.hostname;
120        if (manager.getSystem(hostname).isPresent()) {
121            manager.updateCpuStatus(hostname, sl.loadAverage);
122            logger.info("Host " + hostname + " was updated: " + sl);
123        } else {
124            manager.addSystem(hostname, sl.loadAverage);
125            logger.info("Host " + hostname + " was added: " + sl);
126        }
127    }
128    // end::updateStatus[]
129
130    // tag::propertyMessage[]
131    @Incoming("addSystemProperty")
132    // end::propertyMessage[]
133    public void getPropertyMessage(PropertyMessage pm)  {
134        logger.info("getPropertyMessage: " + pm);
135        String hostId = pm.hostname;
136        if (manager.getSystem(hostId).isPresent()) {
137            manager.updatePropertyMessage(hostId, pm.key, pm.value);
138            logger.info("Host " + hostId + " was updated: " + pm);
139        } else {
140            manager.addSystem(hostId, pm.key, pm.value);
141            logger.info("Host " + hostId + " was added: " + pm);
142        }
143    }
144
145    // tag::sendPropertyName[]
146    // tag::OutgoingPropertyName[]
147    @Outgoing("requestSystemProperty")
148    // end::OutgoingPropertyName[]
149    public Publisher<String> sendPropertyName() {
150        // tag::flowableCreate[]
151        Flowable<String> flowable = Flowable.<String>create(emitter ->
152            this.propertyNameEmitter = emitter, BackpressureStrategy.BUFFER);
153        // end::flowableCreate[]
154        return flowable;
155    }
156    // end::sendPropertyName[]
157}

The updateSystemProperty() method creates the /data endpoint that accepts PUT requests with a system property name in the request body. The propertyNameEmitter variable is an RxJava Emitter interface that sends the property name request to the event stream, which is Apache Kafka in this case.

The sendPropertyName() method contains the Flowable.create() RxJava method, which associates the emitter to a publisher that is responsible for publishing events to the event stream. The publisher in this example is then connected to the @Outgoing("requestSystemProperty") channel, which you will configure later in the guide. MicroProfile Reactive Messaging takes care of assigning the publisher to the channel.

The Flowable.create() method also allows the configuration of a BackpressureStrategy object, which controls what the publisher does if the emitted events can’t be consumed by the subscriber. In this example, the publisher used the BackpressureStrategy.BUFFER strategy. With this strategy, the publisher can buffer events until the subscriber can consume them.

When the inventory service receives a request, it adds the system property name from the request body to the propertyNameEmitter FlowableEmitter interface. The property name sent to the emitter is then sent to the publisher. The publisher sends the event to the event channel by using the configured BackpressureStrategy object when necessary.

Adding an event processor to a reactive service

The system microservice is the producer of the messages that are published to the Kafka messaging system as a stream of events. Every 15 seconds, the system microservice publishes events that contain its calculation of the average system load, which is its CPU usage, for the last minute. Replace the SystemService class to add message processing of the system property request from the inventory microservice and publish it to the Kafka messaging system.

Replace the SystemService class.
system/src/main/java/io/openliberty/guides/system/SystemService.java

SystemService.java

 1// tag::copyright[]
 2/*******************************************************************************
 3 * Copyright (c) 2020, 2024 IBM Corporation and others.
 4 * All rights reserved. This program and the accompanying materials
 5 * are made available under the terms of the Eclipse Public License 2.0
 6 * which accompanies this distribution, and is available at
 7 * http://www.eclipse.org/legal/epl-2.0/
 8 *
 9 * SPDX-License-Identifier: EPL-2.0
10 *******************************************************************************/
11// end::copyright[]
12package io.openliberty.guides.system;
13
14import java.lang.management.ManagementFactory;
15import java.lang.management.OperatingSystemMXBean;
16import java.net.InetAddress;
17import java.net.UnknownHostException;
18import java.util.concurrent.TimeUnit;
19import java.util.logging.Logger;
20
21import jakarta.enterprise.context.ApplicationScoped;
22
23import org.eclipse.microprofile.reactive.messaging.Incoming;
24import org.eclipse.microprofile.reactive.messaging.Outgoing;
25import org.reactivestreams.Publisher;
26
27import io.openliberty.guides.models.PropertyMessage;
28import io.openliberty.guides.models.SystemLoad;
29import io.reactivex.rxjava3.core.Flowable;
30
31@ApplicationScoped
32public class SystemService {
33
34    private static Logger logger = Logger.getLogger(SystemService.class.getName());
35
36    private static final OperatingSystemMXBean OS_MEAN =
37            ManagementFactory.getOperatingSystemMXBean();
38    private static String hostname = null;
39
40    private static String getHostname() {
41        if (hostname == null) {
42            try {
43                return InetAddress.getLocalHost().getHostName();
44            } catch (UnknownHostException e) {
45                return System.getenv("HOSTNAME");
46            }
47        }
48        return hostname;
49    }
50
51    // tag::publishSystemLoad[]
52    @Outgoing("systemLoad")
53    // end::publishSystemLoad[]
54    // tag::sendSystemLoad[]
55    public Publisher<SystemLoad> sendSystemLoad() {
56        // tag::flowableInterval[]
57        return Flowable.interval(15, TimeUnit.SECONDS)
58                       .map((interval -> new SystemLoad(getHostname(),
59                             OS_MEAN.getSystemLoadAverage())));
60        // end::flowableInterval[]
61    }
62    // end::sendSystemLoad[]
63
64    // tag::propertyRequest[]
65    @Incoming("propertyRequest")
66    // end::propertyRequest[]
67    // tag::propertyResponse[]
68    @Outgoing("propertyResponse")
69    // end::propertyResponse[]
70    // tag::sendProperty[]
71    public PropertyMessage sendProperty(String propertyName) {
72        logger.info("sendProperty: " + propertyName);
73        if (propertyName == null || propertyName.isEmpty()) {
74            logger.warning(propertyName == null ? "Null" : "An empty string"
75                + " is not System property.");
76            return null;
77        }
78        return new PropertyMessage(getHostname(),
79                       propertyName,
80                       System.getProperty(propertyName, "unknown"));
81    }
82    // end::sendProperty[]
83}

A new method that is named sendProperty() receives a system property name from the inventory microservice over the @Incoming("propertyRequest") channel. The method calculates the requested property in real time and publishes it back to Kafka over the @Outgoing("propertyResponse") channel. In this scenario, the sendProperty() method acts as a processor. Next, you’ll configure the channels that you need.

Configuring the MicroProfile Reactive Messaging connectors for Kafka

The system and inventory microservices each have a MicroProfile Config property file in which the properties of their incoming and outgoing channels are defined. These properties include the names of channels, the topics in the Kafka messaging system, and the associated message serializers and deserializers. To complete the message loop created in the previous sections, four channels must be added and configured.

Replace the inventory/microprofile-config.properties file.
inventory/src/main/resources/META-INF/microprofile-config.properties

inventory/microprofile-config.properties

 1# Kafka connection details
 2# tag::kafkaConfig[]
 3mp.messaging.connector.liberty-kafka.bootstrap.servers=kafka:9092
 4# end::kafkaConfig[]
 5
 6# systemLoad stream
 7# tag::systemLoad[]
 8# tag::kafka1[]
 9mp.messaging.incoming.systemLoad.connector=liberty-kafka
10# end::kafka1[]
11# tag::topic1[]
12mp.messaging.incoming.systemLoad.topic=system.load
13# end::topic1[]
14# tag::deserializer1[]
15mp.messaging.incoming.systemLoad.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
16# end::deserializer1[]
17# tag::deserializerVal1[]
18mp.messaging.incoming.systemLoad.value.deserializer=io.openliberty.guides.models.SystemLoad$SystemLoadDeserializer
19# end::deserializerVal1[]
20# tag::group1[]
21mp.messaging.incoming.systemLoad.group.id=system-load-status
22# end::group1[]
23# end::systemLoad[]
24
25# addSystemProperty stream
26# tag::addSystemProperty[]
27# tag::kafka3[]
28mp.messaging.incoming.addSystemProperty.connector=liberty-kafka
29# end::kafka3[]
30# tag::topic3[]
31mp.messaging.incoming.addSystemProperty.topic=add.system.property
32# end::topic3[]
33# tag::deserializer3[]
34mp.messaging.incoming.addSystemProperty.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
35# end::deserializer3[]
36# tag::deserializerVal3[]
37mp.messaging.incoming.addSystemProperty.value.deserializer=io.openliberty.guides.models.PropertyMessage$PropertyMessageDeserializer
38# end::deserializerVal3[]
39# tag::group3[]
40mp.messaging.incoming.addSystemProperty.group.id=sys-property
41# end::group3[]
42# end::addSystemProperty[]
43
44# requestSystemProperty stream
45# tag::requestSystemProperty[]
46# tag::kafka4[]
47mp.messaging.outgoing.requestSystemProperty.connector=liberty-kafka
48# end::kafka4[]
49# tag::topic4[]
50mp.messaging.outgoing.requestSystemProperty.topic=request.system.property
51# end::topic4[]
52# tag::serializer4[]
53mp.messaging.outgoing.requestSystemProperty.key.serializer=org.apache.kafka.common.serialization.StringSerializer
54# end::serializer4[]
55# tag::serializerVal4[]
56mp.messaging.outgoing.requestSystemProperty.value.serializer=org.apache.kafka.common.serialization.StringSerializer
57# end::serializerVal4[]
58# end::requestSystemProperty[]

The newly created RESTful endpoint requires two new channels that move the requested messages between the system and inventory microservices. The inventory microservice microprofile-config.properties file now has two new channels, requestSystemProperty and addSystemProperty. The requestSystemProperty channel handles sending the system property request, and the addSystemProperty channel handles receiving the system property response.

Replace the system/microprofile-config.properties file.
system/src/main/resources/META-INF/microprofile-config.properties

system/microprofile-config.properties

 1# Kafka connection details
 2# tag::kafkaConfig[]
 3mp.messaging.connector.liberty-kafka.bootstrap.servers=kafka:9092
 4# end::kafkaConfig[]
 5
 6# systemLoad stream
 7# tag::systemLoad[]
 8# tag::kafka1[]
 9mp.messaging.outgoing.systemLoad.connector=liberty-kafka
10# end::kafka1[]
11# tag::topic1[]
12mp.messaging.outgoing.systemLoad.topic=system.load
13# end::topic1[]
14# tag::serializer1[]
15mp.messaging.outgoing.systemLoad.key.serializer=org.apache.kafka.common.serialization.StringSerializer
16# end::serializer1[]
17# tag::serializerVal1[]
18mp.messaging.outgoing.systemLoad.value.serializer=io.openliberty.guides.models.SystemLoad$SystemLoadSerializer
19# end::serializerVal1[]
20# end::systemLoad[]
21
22# propertyResponse stream
23# tag::propertyResponse[]
24# tag::kafka3[]
25mp.messaging.outgoing.propertyResponse.connector=liberty-kafka
26# end::kafka3[]
27# tag::topic3[]
28mp.messaging.outgoing.propertyResponse.topic=add.system.property
29# end::topic3[]
30# tag::serializer3[]
31mp.messaging.outgoing.propertyResponse.key.serializer=org.apache.kafka.common.serialization.StringSerializer
32# end::serializer3[]
33# tag::serializerVal3[]
34mp.messaging.outgoing.propertyResponse.value.serializer=io.openliberty.guides.models.PropertyMessage$PropertyMessageSerializer
35# end::serializerVal3[]
36# end::propertyResponse[]
37
38# propertyRequest stream
39# tag::propertyRequest[]
40# tag::kafka4[]
41mp.messaging.incoming.propertyRequest.connector=liberty-kafka
42# end::kafka4[]
43# tag::topic4[]
44mp.messaging.incoming.propertyRequest.topic=request.system.property
45# end::topic4[]
46# tag::deserializer4[]
47mp.messaging.incoming.propertyRequest.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
48# end::deserializer4[]
49# tag::deserializerVal4[]
50mp.messaging.incoming.propertyRequest.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
51# end::deserializerVal4[]
52# tag::group4[]
53mp.messaging.incoming.propertyRequest.group.id=property-name
54# end::group4[]
55# end::propertyRequest[]

Replace the system microservice microprofile-config.properties file to add the two new propertyRequest and propertyResponse channels. The propertyRequest channel handles receiving the property request, and the propertyResponse channel handles sending the property response.

Building and running the application

Build the system and inventory microservices using Maven and then run them in Docker containers.

Start your Docker environment. Dockerfiles are provided for you to use.

To build the application, run the Maven install and package goals from the command line in the start directory:

mvn -pl models install
mvn package

Run the following commands to containerize the microservices:

docker build -t system:1.0-SNAPSHOT system/.
docker build -t inventory:1.0-SNAPSHOT inventory/.

Next, use the provided script to start the application in Docker containers. The script creates a network for the containers to communicate with each other. It also creates containers for Kafka and the microservices in the project. For simplicity, the script starts one instance of the system service.

.\scripts\startContainers.bat
./scripts/startContainers.sh

Testing the application

The application might take some time to become available. After the application is up and running, you can access it by making a GET request to the /systems endpoint of the inventory service.

Visit the http://localhost:9085/health URL to confirm that the inventory microservice is up and running.

When both the liveness and readiness health checks are up, go the http://localhost:9085/inventory/systems URL to access the inventory microservice. You see the CPU systemLoad property for all the systems:

{
   "hostname":"30bec2b63a96",
   "systemLoad":1.44
}

You can revisit the http://localhost:9085/inventory/systems URL after a while and the value of the systemLoad property for the systems is changed.

Make a PUT request on the http://localhost:9085/inventory/data URL to add the value of a particular system property to the set of existing properties. For example, run the following curl command:

If curl is unavailable on your computer, use another client such as Postman, which allows requests to be made with a graphical interface.

curl -X PUT -d "os.name" http://localhost:9085/inventory/data --header "Content-Type:text/plain"

In this example, the PUT request with the os.name system property in the request body on the http://localhost:9085/inventory/data URL adds the os.name system property for your system.

You see the following output:

Request successful for the os.name property

The system service is available so the request to the service is successful and returns a 200 response code.

You can revisit the http://localhost:9085/inventory/systems URL and see the os.name system property value is now included with the previous values:

{
   "hostname":"30bec2b63a96",
   "os.name":"Linux",
   "systemLoad":1.44
}

Tearing down the environment

Run the following script to stop the application:

.\scripts\stopContainers.bat
./scripts/stopContainers.sh

Running multiple system instances

system/microprofile-config.properties

 1# Kafka connection details
 2# tag::kafkaConfig[]
 3mp.messaging.connector.liberty-kafka.bootstrap.servers=kafka:9092
 4# end::kafkaConfig[]
 5
 6# systemLoad stream
 7# tag::systemLoad[]
 8# tag::kafka1[]
 9mp.messaging.outgoing.systemLoad.connector=liberty-kafka
10# end::kafka1[]
11# tag::topic1[]
12mp.messaging.outgoing.systemLoad.topic=system.load
13# end::topic1[]
14# tag::serializer1[]
15mp.messaging.outgoing.systemLoad.key.serializer=org.apache.kafka.common.serialization.StringSerializer
16# end::serializer1[]
17# tag::serializerVal1[]
18mp.messaging.outgoing.systemLoad.value.serializer=io.openliberty.guides.models.SystemLoad$SystemLoadSerializer
19# end::serializerVal1[]
20# end::systemLoad[]
21
22# propertyResponse stream
23# tag::propertyResponse[]
24# tag::kafka3[]
25mp.messaging.outgoing.propertyResponse.connector=liberty-kafka
26# end::kafka3[]
27# tag::topic3[]
28mp.messaging.outgoing.propertyResponse.topic=add.system.property
29# end::topic3[]
30# tag::serializer3[]
31mp.messaging.outgoing.propertyResponse.key.serializer=org.apache.kafka.common.serialization.StringSerializer
32# end::serializer3[]
33# tag::serializerVal3[]
34mp.messaging.outgoing.propertyResponse.value.serializer=io.openliberty.guides.models.PropertyMessage$PropertyMessageSerializer
35# end::serializerVal3[]
36# end::propertyResponse[]
37
38# propertyRequest stream
39# tag::propertyRequest[]
40# tag::kafka4[]
41mp.messaging.incoming.propertyRequest.connector=liberty-kafka
42# end::kafka4[]
43# tag::topic4[]
44mp.messaging.incoming.propertyRequest.topic=request.system.property
45# end::topic4[]
46# tag::deserializer4[]
47mp.messaging.incoming.propertyRequest.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
48# end::deserializer4[]
49# tag::deserializerVal4[]
50mp.messaging.incoming.propertyRequest.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
51# end::deserializerVal4[]
52# tag::group4[]
53mp.messaging.incoming.propertyRequest.group.id=property-name
54# end::group4[]
55# end::propertyRequest[]

This application has only one instance of the system service. The inventory service collects system properties of all system services in the application. As an exercise, start multiple system services to see how the application handles it. When you start the system instances, you must provide a unique group.id through the MP_MESSAGING_INCOMING_PROPERTYREQUEST_GROUP_ID environment variable.

Great work! You’re done!

You successfully integrated a RESTful microservice with a reactive system by using MicroProfile Reactive Messaging.

Guide Attribution

Integrating RESTful services with a reactive system by Open Liberty is licensed under CC BY-ND 4.0

Copy file contents
Copied to clipboard

Prerequisites:

Nice work! Where to next?

What did you think of this guide?

Extreme Dislike Dislike Like Extreme Like

What could make this guide better?

Raise an issue to share feedback

Create a pull request to contribute to this guide

Need help?

Ask a question on Stack Overflow

Like Open Liberty? Star our repo on GitHub.

Star