Acknowledging messages using MicroProfile Reactive Messaging

duration 20 minutes
New

Prerequisites:

Learn how to acknowledge messages by using MicroProfile Reactive Messaging.

What you’ll learn

MicroProfile Reactive Messaging provides a reliable way to handle messages in reactive applications. MicroProfile Reactive Messaging ensures that messages aren’t lost by requiring that messages that were delivered to the target server are acknowledged after they are processed. Every message that gets sent out must be acknowledged. This way, any messages that were delivered to the target service but not processed, for example, due to a system failure, can be identified and sent again.

The application in this guide consists of two microservices, system and inventory. Every 15 seconds, the system microservice calculates and publishes events that contain its current average system load. The inventory microservice subscribes to that information so that it can keep an updated list of all the systems and their current system loads. You can get the current inventory of systems by accessing the /systems REST endpoint. The following diagram depicts the application that is used in this guide:

Reactive system inventory

You will explore the acknowledgment strategies that are available with MicroProfile Reactive Messaging, and you’ll implement your own manual acknowledgment strategy. To learn more about how the reactive Java services used in this guide work, check out the Creating reactive Java microservices guide.

Additional prerequisites

You need to have Docker installed. For installation instructions, refer to the official Docker documentation. You will build and run the microservices in Docker containers. An installation of Apache Kafka is provided in another Docker container.

Getting started

The fastest way to work through this guide is to clone the Git repository and use the projects that are provided inside:

git clone https://github.com/openliberty/guide-microprofile-reactive-messaging-acknowledgment.git
cd guide-microprofile-reactive-messaging-acknowledgment

The start directory contains the starting project that you will build upon.

The finish directory contains the finished project that you will build.

Choosing an acknowledgment strategy

start/SystemService.java

 1// tag::copyright[]
 2/*******************************************************************************
 3 * Copyright (c) 2020 IBM Corporation and others.
 4 * All rights reserved. This program and the accompanying materials
 5 * are made available under the terms of the Eclipse Public License v1.0
 6 * which accompanies this distribution, and is available at
 7 * http://www.eclipse.org/legal/epl-v10.html
 8 *
 9 * Contributors:
10 *     IBM Corporation - Initial implementation
11 *******************************************************************************/
12// end::copyright[]
13package io.openliberty.guides.system;
14
15import java.lang.management.ManagementFactory;
16import java.lang.management.OperatingSystemMXBean;
17import java.net.InetAddress;
18import java.net.UnknownHostException;
19import java.util.concurrent.TimeUnit;
20import java.util.logging.Logger;
21
22import javax.enterprise.context.ApplicationScoped;
23
24import org.eclipse.microprofile.reactive.messaging.Incoming;
25import org.eclipse.microprofile.reactive.messaging.Outgoing;
26import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
27import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
28import org.reactivestreams.Publisher;
29
30import io.openliberty.guides.models.PropertyMessage;
31import io.openliberty.guides.models.SystemLoad;
32import io.reactivex.rxjava3.core.Flowable;
33
34@ApplicationScoped
35public class SystemService {
36
37    private static Logger logger = Logger.getLogger(SystemService.class.getName());
38
39    private static final OperatingSystemMXBean osMean =
40            ManagementFactory.getOperatingSystemMXBean();
41    private static String hostname = null;
42
43    private static String getHostname() {
44        if (hostname == null) {
45            try {
46                return InetAddress.getLocalHost().getHostName();
47            } catch (UnknownHostException e) {
48                return System.getenv("HOSTNAME");
49            }
50        }
51        return hostname;
52    }
53
54    @Outgoing("systemLoad")
55    public Publisher<SystemLoad> sendSystemLoad() {
56        return Flowable.interval(15, TimeUnit.SECONDS)
57                .map((interval -> new SystemLoad(getHostname(),
58                        osMean.getSystemLoadAverage())));
59    }
60
61    // tag::sendProperty[]
62    @Incoming("propertyRequest")
63    @Outgoing("propertyResponse")
64    public PublisherBuilder<PropertyMessage> sendProperty(String propertyName) {
65        logger.info("sendProperty: " + propertyName);
66        String propertyValue = System.getProperty(propertyName);
67        // tag::null[]
68        if (propertyValue == null) {
69            logger.warning(propertyName + " is not System property.");
70            // tag::returnEmpty[]
71            return ReactiveStreams.empty();
72            // end::returnEmpty[]
73        }
74        // end::null[]
75        // tag::validReturn[]
76        PropertyMessage message =
77                new PropertyMessage(getHostname(),
78                                    propertyName,
79                                    System.getProperty(propertyName, "unknown"));
80        return ReactiveStreams.of(message);
81        // end::validReturn[]
82    }
83    // end::sendProperty[]
84}

Messages must be acknowledged in reactive applications. Messages are either acknowledged explicitly, or messages are acknowledged implicitly by MicroProfile Reactive Messaging. Acknowledgment for incoming messages is controlled by the @Acknowledgment annotation in MicroProfile Reactive Messaging. If the @Acknowledgment annotation isn’t explicitly defined, then the default acknowledgment strategy applies, which depends on the method signature. Only methods that receive incoming messages and are annotated with the @Incoming annotation must acknowledge messages. Methods that are annotated only with the @Outgoing annotation don’t need to acknowledge messages because messages aren’t being received and MicroProfile Reactive Messaging requires only that received messages are acknowledged.

Almost all of the methods in this application that require message acknowledgment are assigned the POST_PROCESSING strategy by default. If the acknowledgment strategy is set to POST_PROCESSING, then MicroProfile Reactive Messaging acknowledges the message based on whether the annotated method emits data:

  • If the method emits data, the incoming message is acknowledged after the outgoing message is acknowledged.

  • If the method doesn’t emit data, the incoming message is acknowledged after the method or processing completes.

It’s important that the methods use the POST_PROCESSING strategy because it fulfills the requirement that a message isn’t acknowledged until after the message is fully processed. This processing strategy is beneficial in situations where messages must reliably not get lost. When the POST_PROCESSING acknowledgment strategy can’t be used, the MANUAL strategy can be used to fulfill the same requirement. In situations where message acknowledgment reliability isn’t important and losing messages is acceptable, the PRE_PROCESSING strategy might be appropriate.

The only method in the guide that doesn’t default to the POST_PROCESSING strategy is the sendProperty() method in the system service. The sendProperty() method receives property requests from the inventory service. For each property request, if the property that’s being requested is valid, then the method returns a property response with the value of the property. However, if the requested property doesn’t exist, the request is ignored and no property response is returned.

A key difference exists between when a property response is returned and when a property response isn’t returned. In the case where a property response is returned, the request doesn’t finish processing until the response is sent and safely stored by the Kafka broker. Only then is the incoming message acknowledged. However, in the case where the requested property doesn’t exist and a property response isn’t returned, the method finishes processing the request message so the message must be acknowledged immediately.

This case where a message either needs to be acknowledged immediately or some time later is one of the situations where the MANUAL acknowledgment strategy would be beneficial

Implementing the MANUAL acknowledgment strategy

finish/SystemService.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2020 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License v1.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-v10.html
  8 *
  9 * Contributors:
 10 *     IBM Corporation - Initial implementation
 11 *******************************************************************************/
 12// end::copyright[]
 13package io.openliberty.guides.system;
 14
 15import java.lang.management.ManagementFactory;
 16import java.lang.management.OperatingSystemMXBean;
 17import java.net.InetAddress;
 18import java.net.UnknownHostException;
 19import java.util.concurrent.TimeUnit;
 20import java.util.logging.Logger;
 21
 22import javax.enterprise.context.ApplicationScoped;
 23
 24import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
 25import org.eclipse.microprofile.reactive.messaging.Incoming;
 26import org.eclipse.microprofile.reactive.messaging.Message;
 27import org.eclipse.microprofile.reactive.messaging.Outgoing;
 28import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
 29import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
 30import org.reactivestreams.Publisher;
 31
 32import io.openliberty.guides.models.PropertyMessage;
 33import io.openliberty.guides.models.SystemLoad;
 34import io.reactivex.rxjava3.core.Flowable;
 35
 36@ApplicationScoped
 37public class SystemService {
 38
 39    private static Logger logger = Logger.getLogger(SystemService.class.getName());
 40
 41    private static final OperatingSystemMXBean osMean =
 42            ManagementFactory.getOperatingSystemMXBean();
 43    private static String hostname = null;
 44
 45    private static String getHostname() {
 46        if (hostname == null) {
 47            try {
 48                return InetAddress.getLocalHost().getHostName();
 49            } catch (UnknownHostException e) {
 50                return System.getenv("HOSTNAME");
 51            }
 52        }
 53        return hostname;
 54    }
 55
 56    @Outgoing("systemLoad")
 57    public Publisher<SystemLoad> sendSystemLoad() {
 58        return Flowable.interval(15, TimeUnit.SECONDS)
 59                .map((interval -> new SystemLoad(getHostname(),
 60                        osMean.getSystemLoadAverage())));
 61    }
 62
 63    // tag::sendProperty[]
 64    @Incoming("propertyRequest")
 65    @Outgoing("propertyResponse")
 66    // tag::ackAnnotation[]
 67    @Acknowledgment(Acknowledgment.Strategy.MANUAL)
 68    // end::ackAnnotation[]
 69    // tag::methodSignature[]
 70    public PublisherBuilder<Message<PropertyMessage>>
 71    sendProperty(Message<String> propertyMessage) {
 72    // end::methodSignature[]
 73        // tag::propertyValue[]
 74        String propertyName = propertyMessage.getPayload();
 75        String propertyValue = System.getProperty(propertyName, "unknown");
 76        // end::propertyValue[]
 77        logger.info("sendProperty: " + propertyValue);
 78        // tag::invalid[]
 79        if (propertyName == null || propertyName.isEmpty() || propertyValue == "unknown") {
 80            logger.warning("Provided property: " +
 81                    propertyName + " is not a system property");
 82            // tag::propertyMessageAck[]
 83            propertyMessage.ack();
 84            // end::propertyMessageAck[]
 85            // tag::emptyReactiveStream[]
 86            return ReactiveStreams.empty();
 87            // end::emptyReactiveStream[]
 88        }
 89        // end::invalid[]
 90        // tag::returnMessage[]
 91        Message<PropertyMessage> message = Message.of(
 92                new PropertyMessage(getHostname(),
 93                        propertyName,
 94                        propertyValue),
 95                propertyMessage::ack
 96        );
 97        return ReactiveStreams.of(message);
 98        // end::returnMessage[]
 99    }
100    // end::sendProperty[]
101}

Navigate to the start directory to begin.

Update the SystemService.sendProperty method to use the MANUAL acknowledgment strategy, which fits the method processing requirements better than the default PRE_PROCESSING strategy.

Replace the SystemService class.
system/src/main/java/io/openliberty/guides/system/SystemService.java

The sendProperty() method needs to manually acknowledge the incoming messages, so it is annotated with the @Acknowledgment(Acknowledgment.Strategy.MANUAL) annotation. This annotation sets the method up to expect an incoming message. To meet the requirements of acknowledgment, the method parameter is updated to receive and return a Message of type String, rather than just a String. Then, the message payload is extracted and checked for validity. One of the following outcomes occurs:

  • If the system property isn’t valid, the method acknowledges the incoming message and returns an empty reactive stream. The processing is complete.

  • If the system property is valid, the method creates a message with the value of the requested system property and sends it to the proper channel. The method acknowledges the incoming message only after the sent message is acknowledged.

Waiting for a message to be acknowledged

InventoryResource.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2020 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License v1.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-v10.html
  8 *
  9 * Contributors:
 10 *     IBM Corporation - Initial implementation
 11 *******************************************************************************/
 12// end::copyright[]
 13package io.openliberty.guides.inventory;
 14
 15import java.util.List;
 16import java.util.Optional;
 17import java.util.Properties;
 18import java.util.concurrent.CompletableFuture;
 19import java.util.concurrent.CompletionStage;
 20import java.util.logging.Logger;
 21import java.util.stream.Collectors;
 22
 23import javax.enterprise.context.ApplicationScoped;
 24import javax.inject.Inject;
 25import javax.ws.rs.Consumes;
 26import javax.ws.rs.DELETE;
 27import javax.ws.rs.GET;
 28import javax.ws.rs.PUT;
 29import javax.ws.rs.Path;
 30import javax.ws.rs.PathParam;
 31import javax.ws.rs.Produces;
 32import javax.ws.rs.core.MediaType;
 33import javax.ws.rs.core.Response;
 34
 35import org.eclipse.microprofile.reactive.messaging.Incoming;
 36import org.eclipse.microprofile.reactive.messaging.Message;
 37import org.eclipse.microprofile.reactive.messaging.Outgoing;
 38import org.reactivestreams.Publisher;
 39
 40import io.openliberty.guides.models.PropertyMessage;
 41import io.openliberty.guides.models.SystemLoad;
 42import io.reactivex.rxjava3.core.BackpressureStrategy;
 43import io.reactivex.rxjava3.core.Flowable;
 44import io.reactivex.rxjava3.core.FlowableEmitter;
 45
 46
 47@ApplicationScoped
 48@Path("/inventory")
 49public class InventoryResource {
 50
 51    private static Logger logger = Logger.getLogger(InventoryResource.class.getName());
 52    // tag::propertyNameEmitter[]
 53    private FlowableEmitter<Message<String>> propertyNameEmitter;
 54    // end::propertyNameEmitter[]
 55
 56    @Inject
 57    private InventoryManager manager;
 58
 59    @GET
 60    @Path("/systems")
 61    @Produces(MediaType.APPLICATION_JSON)
 62    public Response getSystems() {
 63        List<Properties> systems = manager.getSystems()
 64                .values()
 65                .stream()
 66                .collect(Collectors.toList());
 67        return Response
 68                .status(Response.Status.OK)
 69                .entity(systems)
 70                .build();
 71    }
 72
 73    @GET
 74    @Path("/systems/{hostname}")
 75    @Produces(MediaType.APPLICATION_JSON)
 76    public Response getSystem(@PathParam("hostname") String hostname) {
 77        Optional<Properties> system = manager.getSystem(hostname);
 78        if (system.isPresent()) {
 79            return Response
 80                    .status(Response.Status.OK)
 81                    .entity(system)
 82                    .build();
 83        }
 84        return Response
 85                .status(Response.Status.NOT_FOUND)
 86                .entity("hostname does not exist.")
 87                .build();
 88    }
 89
 90    // tag::updateSystemProperty[]
 91    @PUT
 92    @Path("/data")
 93    @Produces(MediaType.APPLICATION_JSON)
 94    @Consumes(MediaType.TEXT_PLAIN)
 95    /* This method sends a message and returns a CompletionStage that doesn't
 96        complete until the message is acknowledged. */
 97    // tag::USPHeader[]
 98    public CompletionStage<Response> updateSystemProperty(String propertyName) {
 99    // end::USPHeader[]
100        logger.info("updateSystemProperty: " + propertyName);
101        // First, create an incomplete CompletableFuture named "result".
102        // tag::CompletableFuture[]
103        CompletableFuture<Void> result = new CompletableFuture<>();
104        // end::CompletableFuture[]
105
106        // Create a message that holds the payload.
107        // tag::message[]
108        Message<String> message = Message.of(
109                // tag::payload[]
110                propertyName,
111                // end::payload[]
112                // tag::acknowledgeAction[]
113                () -> {
114                    /* This is the ack callback, which runs when the outgoing
115                        message is acknowledged. After the outgoing message is
116                        acknowledged, complete the "result" CompletableFuture. */
117                    result.complete(null);
118                    /* An ack callback must return a CompletionStage that says
119                        when it's complete. Asynchronous processing isn't necessary
120                        so a completed CompletionStage is returned to indicate that
121                        the work here is done. */
122                    return CompletableFuture.completedFuture(null);
123                }
124                // end::acknowledgeAction[]
125        );
126        // end::message[]
127
128        // Send the message
129        propertyNameEmitter.onNext(message);
130        /* Set up what happens when the message is acknowledged and the "result"
131            CompletableFuture is completed. When "result" completes, the Response
132            object is created with the status code and message. */
133        // tag::returnResult[]
134        return result.thenApply(a -> Response
135                .status(Response.Status.OK)
136                .entity("Request successful for the " + propertyName + " property\n")
137                .build());
138        // end::returnResult[]
139    }
140    // end::updateSystemProperty[]
141
142    @DELETE
143    @Produces(MediaType.APPLICATION_JSON)
144    public Response resetSystems() {
145        manager.resetSystems();
146        return Response
147                .status(Response.Status.OK)
148                .build();
149    }
150
151    // tag::updateStatus[]
152    // tag::systemLoadIncoming[]
153    @Incoming("systemLoad")
154    // end::systemLoadIncoming[]
155    public void updateStatus(SystemLoad sl)  {
156        String hostname = sl.hostname;
157        if (manager.getSystem(hostname).isPresent()) {
158            manager.updateCpuStatus(hostname, sl.loadAverage);
159            logger.info("Host " + hostname + " was updated: " + sl);
160        } else {
161            manager.addSystem(hostname, sl.loadAverage);
162            logger.info("Host " + hostname + " was added: " + sl);
163        }
164    }
165    // end::updateStatus[]
166
167    // tag::getPropertyMessage[]
168    // tag::addSystemPropertyIncoming[]
169    @Incoming("addSystemProperty")
170    // end::addSystemPropertyIncoming[]
171    public void getPropertyMessage(PropertyMessage pm)  {
172        logger.info("getPropertyMessage: " + pm);
173        String hostId = pm.hostname;
174        if (manager.getSystem(hostId).isPresent()) {
175            manager.updatePropertyMessage(hostId, pm.key, pm.value);
176            logger.info("Host " + hostId + " was updated: " + pm);
177        } else {
178            manager.addSystem(hostId, pm.key, pm.value);
179            logger.info("Host " + hostId + " was added: " + pm);
180        }
181    }
182    // end::getPropertyMessage[]
183
184    // tag::sendPropertyName[]
185    @Outgoing("requestSystemProperty")
186    // tag::SPMHeader[]
187    public Publisher<Message<String>> sendPropertyName() {
188    // end::SPMHeader[]
189        Flowable<Message<String>> flowable = Flowable.create(emitter ->
190                this.propertyNameEmitter = emitter, BackpressureStrategy.BUFFER);
191        return flowable;
192    }
193    // end::sendPropertyName[]
194}

The inventory service contains an endpoint that accepts PUT requests. When a PUT request that contains a system property is made to the inventory service, the inventory service sends a message to the system service. The message from the inventory service requests the value of the system property from the system service. Currently, a 200 response code is returned without confirming whether the sent message was acknowledged. Replace the inventory service to return a 200 response only after the outgoing message is acknowledged.

Replace the InventoryResource class.
inventory/src/main/java/io/openliberty/guides/inventory/InventoryResource.java

The sendPropertyName() method is updated to return a Message<String> instead of just a String. This return type allows the method to set a callback that runs after the outgoing message is acknowledged. In addition to updating the sendPropertyName() method, the propertyNameEmitter variable is updated to send a Message<String> type.

The updateSystemProperty() method now returns a CompletionStage object wrapped around a Response type. This return type allows for a response object to be returned after the outgoing message is acknowledged. The outgoing message is created with the requested property name as the payload and an acknowledgment callback to execute an action after the message is acknowledged. The method creates a CompletableFuture variable that returns a 200 response code after the variable is completed in the callback function.

Building and running the application

Build the system and inventory microservices using Maven and then run them in Docker containers.

Start your Docker environment. Dockerfiles are provided for you to use.

To build the application, run the Maven install and package goals from the command-line session in the start directory:

mvn -pl models install
mvn package

Run the following command to download or update to the latest openliberty/open-liberty:kernel-java8-openj9-ubi Docker image:

docker pull openliberty/open-liberty:kernel-java8-openj9-ubi

Run the following commands to containerize the microservices:

docker build -t system:1.0-SNAPSHOT system/.
docker build -t inventory:1.0-SNAPSHOT inventory/.

Next, use the provided script to start the application in Docker containers. The script creates a network for the containers to communicate with each other. It also creates containers for Kafka, Zookeeper, and the microservices in the project. For simplicity, the script starts one instance of the system service.

./scripts/startContainers.sh
.\scripts\startContainers.bat

Testing the application

After the application is up and running, you can access the application by making a GET request to the /systems endpoint of the inventory service.

Go to the http://localhost:9085/inventory/systems URL to access the inventory microservice. You see the CPU systemLoad property for all the systems:

{
   "hostname":"30bec2b63a96",
   "systemLoad":1.44
}

The system service sends messages to the inventory service every 15 seconds. The inventory service processes and acknowledges each incoming message, ensuring that no system message is lost.

If you revisit the http://localhost:9085/inventory/systems URL after a while, you notice that the CPU systemLoad property for the systems changed.

Make a PUT request to the http://localhost:9085/inventory/data URL to add the value of a particular system property to the set of existing properties. For example, run the following curl command:

curl -X PUT -d "os.name" http://localhost:9085/inventory/data --header "Content-Type:text/plain"

If curl is unavailable on your computer, use another client such as Postman, which allows requests to be made with a graphical interface.

In this example, the PUT request with the os.name system property in the request body on the http://localhost:9085/inventory/data URL adds the os.name system property for your system. The inventory service sends a message that contains the requested system property to the system service. The inventory service then waits until the message is acknowledged before it sends a response back.

You see the following output:

Request successful for the os.name property

The previous example response is confirmation that the sent request message was acknowledged.

You can revisit the http://localhost:9085/inventory/systems URL and see the os.name system property value is now included with the previous values:

{
   "hostname":"30bec2b63a96",
   "os.name":"Linux",
   "systemLoad":1.44
}

Tearing down the environment

Finally, run the following script to stop the application:

./scripts/stopContainers.sh
.\scripts\stopContainers.bat

Great work! You’re done!

You developed an application by using MicroProfile Reactive Messaging, Open Liberty, and Kafka.

Guide Attribution

Acknowledging messages using MicroProfile Reactive Messaging by Open Liberty is licensed under CC BY-ND 4.0

Copied to clipboard
Copy code block
Copy file contents

Prerequisites:

Nice work! Where to next?

What did you think of this guide?

Extreme Dislike Dislike Like Extreme Like

What could make this guide better?

Raise an issue to share feedback

Create a pull request to contribute to this guide

Need help?

Ask a question on Stack Overflow

Like Open Liberty? Star our repo on GitHub.

Star