back to all blogsSee all blog posts

Negative acknowledgment and more for MicroProfile Reactive Messaging in 24.0.0.2

image of author
David Mueller on Feb 27, 2024
Post available in languages:

The 24.0.0.2 release includes new capabilities for MicroProfile Reactive Messaging and MicroProfile Streams Operators, including negative acknowledgment, emitters, and backpressure support. It also introduces a new Open Liberty guide to help you build true-to-production integration tests for your applications.

In Open Liberty 24.0.0.2:

View the list of fixed bugs in 24.0.0.2.

For a list of past security vulnerability fixes, reference the Security vulnerability (CVE) list.

Develop and run your apps using 24.0.0.2

If you’re using Maven, include the following in your pom.xml file:

<plugin>
    <groupId>io.openliberty.tools</groupId>
    <artifactId>liberty-maven-plugin</artifactId>
    <version>3.10</version>
</plugin>

Or for Gradle, include the code in your build.gradle file:

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath 'io.openliberty.tools:liberty-gradle-plugin:3.8'
    }
}
apply plugin: 'liberty'

Or if you’re using container images:

FROM icr.io/appcafe/open-liberty

Or take a look at our Downloads page.

If you’re using IntelliJ IDEA, Visual Studio Code or Eclipse IDE, you can also take advantage of our open source Liberty developer tools to enable effective development, testing, debugging, and application management all from within your IDE.

Negative acknowledgment and more for MicroProfile Reactive Messaging 3.0 and MicroProfile Streams Operators 3.0

MicroProfile Reactive Messaging 3.0 introduces a number of new features and changes from MicroProfile Reactive Messaging 1.0, including negative acknowledgments, emitters, and backpressure support. These features are compatible with Jakarta EE 9 and later.

An application that uses MicroProfile Reactive Messaging or MicroProfile Streams Operators is typically composed of CDI beans that consume, produce, and process messages that are passed along reactive streams. These messages can be internal to the application, or they can be sent and received through external message brokers. MicroProfile Reactive Messaging uses MicroProfile Streams Operators to pass messages through channels between methods or to messaging solutions, such as Kafka, to provide resilient storage of messages.

To use both features together, add the Microprofile Reactive Messaging 3.0 feature to your server.xml file:

<feature>mpReactiveMessaging-3.0</feature>

Enabling MicroProfile Reactive Messaging 3.0 also automatically enables the MicroProfile Reactive Streams Operators 3.0 feature.

To use MicroProfile Reactive Streams Operators 3.0 without MicroProfile Reactive Messaging, add the Microprofile Reactive Streams Operators 3.0 feature to your server.xml file:

<feature>mpReactiveStreams-3.0</feature>

Negative acknowledgments

In MicroProfile Reactive Messaging 1.0, messages could be only positively acknowledged. If a problem existed with the payload or if exceptional behavior occurred, no mechanism was available to indicate or to handle the problem if it occurred within the stream. The new negative acknowledgment (nack) support can send or handle these events.

The following example explicitly negatively acknowledges an incoming message:

@Incoming("data")
@Outgoing("out")
public Message<String> process(Message<String> m) {
    String s = m.getPayload();
    if (s.equalsIgnoreCase("b")) {
        // we cannot fail, we must nack explicitly.
        m.nack(new IllegalArgumentException("b"));
        return null;
    }
    return m.withPayload(s.toUpperCase());
}

The method signature of accepting a message without defining an acknowledgment strategy defaults to the MANUAL strategy. Your code is then responsible for negatively (ack()) or positively (nack()) acknowledging the message. In the previous example, the message can be acknowledged downstream of the outgoing channel.

The following example throws an exception that causes a negative acknowledgment:

@Incoming("data")
@Outgoing("out")
public String process(String s) {
    if (s.equalsIgnoreCase("b")) {
        throw new IllegalArgumentException("b");
    }
    return s.toUpperCase();
}

The method signature of accepting a payload without defining an acknowledgment strategy defaults to the POST_PROCESSING strategy. The implementation handles ack() and nack() calls on the message after the method or chain completes. The upstream data receives the negative acknowledgment with the reason of IllegalArgumentException. In the case that an exception is thrown, the implementation invokes the nack() call on the message.

Emitters

MicroProfile Reactive Messaging 1.0 did not offer a clear way to integrate imperative code, such as RESTful resources and beans, because Reactive Messaging put and took messages onto a channel according to the Outgoing or Incoming annotations. In version 3.0, emitters provide a bridge across the two models.

The following example injects emitters into a RESTful resource by using CDI to put messages onto a given channel:

@Inject
@Channel(CHANNEL_NAME)
Emitter<String> emitter;

@POST
@Path("/payload")
public CompletionStage<Void> emitPayload(String payload){
    CompletionStage<Void> cs = emitter.send(payload);
    return cs;
}

@POST
@Path("/message")
public CompletionStage<Void> emitPayload(String payload){
    CompletableFuture<Void> ackCf = new CompletableFuture<>();
    emitter.send(Message.of(payload,
        () -> {
            ackCf.complete(null);
            return CompletableFuture.completedFuture(null);
        },
        t -> {
            ackCf.completeExceptionally(t);
            return CompletableFuture.completedFuture(null);
        }));
    return ackCf;
}

When you define emitters, you specify the type of Object that is sent as either the payload or the contents of the message.

If an emitter sends a payload, MicroProfile Reactive Messaging automatically handles the invocation of ack() and nack() calls on the message. However, if the emitter sends a message, the sending code must handle whether the message is either negatively or positively acknowledged downstream.

Backpressure support

Backpressure support handles messages or payloads that are emitted faster than they are consumed. A backpressure strategy defines application behaviour in this circumstance. In the following example, the buffer holds up to 300 messages and throws an exception if it is full when a new message is emitted:

@Inject @Channel("myChannel")
@OnOverflow(value=OnOverflow.Strategy.BUFFER, bufferSize=300)
private Emitter<String> emitter;

public void publishMessage() {
  emitter.send("a");
  emitter.send("b");
  emitter.complete();
}

You can define the following backpressure strategies:

  • BUFFER - Use a buffer, with a size determined by the value of bufferSize, if set. Otherwise, the size is the value of the mp.messaging.emitter.default-buffer-size MicroProfile Config property, if it exists. If neither of these values is defined, the default size is 128. If the buffer is full, an exception is thrown from the send method. This is the default strategy if no other strategy is defined.

  • DROP - Drops the most recent value if the downstream can’t keep up. Any new values that are emitted by the emitter are ignored.

  • FAIL - Propagates a failure in case the downstream can’t keep up. No more values are emitted.

  • LATEST- Keeps only the latest value, dropping any previous value if the downstream can’t keep up.

  • NONE - Ignores the backpressure signals and leave it to the downstream consumer to implement a strategy.

  • THROW_EXCEPTION - Throws an exception from the send method if the downstream can’t keep up.

  • UNBOUNDED_BUFFER - Use an unbounded buffer. The application might run out of memory if values are continually added faster than they are consumed.

New Liberty-kafka connector options

Open Liberty provides a Kafka connector for use with MicroProfile Reactive Messaging to send and receive messages by using Kafka as a messaging intermediary. Two new options are added to the connector in this release, fast.ack and context.service. These options are set as MicroProfile Config properties.

Configure incoming acknowledgment with fast.ack

The fast.ack boolean attribute determines the acknowledgment behavior of the connector for incoming channels.

  • true: an acknowledgment is reported as complete as soon as the Kafka Connector receives the acknowledgment signal.

  • false: an acknowledgment is not reported as complete until the partition offset is committed to the Kafka broker. If an error occurs during this process, the acknowledgment is reported as failed.

fast.ack is defined as an attribute on either the liberty-kafka connector or an incoming channel. If it is specified on an outgoing channel, it is ignored.

mp.messaging.connector.liberty-kafka.fast.ack=false

mp.messaginging.incoming.foo.connector=liberty-kafka
mp.messaginging.incoming.foo.fast.ack=true

In this example, the connector sets the standard value that is used by all channels. However, the foo channel uses the true value because channel attributes take precedence over connector attributes.

For mpReactiveMessaging-1.0, the default value of the option is false. For mpReactiveMessaging-3.0, the default value of the option is true,

Manage asynchronous tasks with context.service

The context.service attribute specifies the context service that is used for asynchronous tasks. The value of the context.service attribute is a reference to the id attribute of a context.service instance that is defined in server.xml file.

In the following server.xml file, three different context services are defined with unique IDs.

<contextService id=rst/>
<contextService id=uvw/>
<contextService id=xyz/>

In the application’s microprofile-config.properties file, the first context service is set on the connector. The application has three channels. The def channel does not specify its own context.service instance, so it uses the one that is defined on the connector. The second and third channels define and use their own services.

mp.messaging.connector.liberty-kafka.context.service=rst

mp.messaging.incoming.def.connector=liberty-kafka
mp.messaging.incoming.foo.connector=liberty-kafka
mp.messaging.incoming.foo.context.service=uvw
mp.messaging.outgoing.bar.connector=liberty-kafka
mp.messaging.outgoing.bar.context.service=xyz

If a version of the Jakarta Concurrency feature is enabled in the server.xml file, the default context service is used. If this feature is not enabled, the built-in Open Liberty context service is used with a set list of context types to capture and apply around asynchronous tasks.

For more information, see:

New guide: "Building true-to-production integration tests with Testcontainers"

A new guide, Building true-to-production integration tests with Testcontainers, is published under the Test category. In this guide, you’ll learn how to write true-to-production integration tests for Java microservices by using Testcontainers and JUnit. You can also run this guide in a hosted environment, without having to worry about local setup and prerequisites. To access the cloud-hosted version of this guide, click the Run in cloud button in the guide code pane.

Get Open Liberty 24.0.0.2 now