back to all blogsSee all blog posts

Sending and receiving messages between microservices with MicroProfile Reactive Messaging

image of author image of author
Andrew Rouse and Gordon Hutchison on Sep 13, 2019
Post available in languages:

MicroProfile Reactive Messaging provides a very easy-to-use way to send, receive, and process messages and is well-suited to writing applications that process streams of events. With MicroProfile Reactive Messaging, you annotate application beans' methods and, under the covers, Liberty converts these to reactive streams-compatible publishers, subscribers and processors and connects them up to each other. It also provides a Connector API to allow your methods to be connected to external messaging systems.

MicroProfile Reactive Messaging is now available in Open Liberty 19.0.0.9.

To enable the feature include it in your server.xml feature list:

<featureManager>
  <feature>mpReactiveMessaging-1.0</feature>
  ...
</featureManager>

Reactive Messaging uses a model of annotated methods which are connected by named channels. For example:

Diagram showing MethodA connected to MethodB by channel foo and MethodB connected to MethodC by channel bar. MethodA is annotated @Outgoing("foo"). MethodB is annotated @Incoming("foo") and @Outgoing("bar"). MethodC is annotated @Incoming("bar").

Let’s give some examples of what these methods could look like.

The following method functions as a publisher which generates new messages. The method is called every time the next thing in the stream requests another message:

@Outgoing("foo")
public String methodA() {
   return "hello";
}

The following method functions as a processor, applying the same operation to each message in the stream. It is called once for each message in the stream:

@Incoming("foo")
@Outgoing("bar")
public String methodB(String input) {
    return input + " world";
}

The following method functions as a subscriber, performing processing for each message in the stream, but not producing any output of its own as it has no @Outgoing channel:

@Incoming("bar")
public void methodC(String input) {
    System.out.println("Message received: " + input);
}

I’ve only outlined the basic cases for producing, processing, and consuming messages within your application. There are other supported method signatures for each of these cases which allow for asynchronous processing, advanced message acknowledgement, and integration with other reactive streams libraries in the Reactive Messaging specification.

Sending and receiving messages between applications using connectors

Although sending messages within our application is nice, it’s more useful to be able to send and receive messages from other systems. For this, reactive messaging uses connectors. Connectors can be attached to one end of a channel in place of a method and are configured using MicroProfile Config.

Open Liberty includes the liberty-kafka connector for sending and receiving messages from an Apache Kafka broker.

We could remove methodA from our example above and instead receive messages from the foo topic on a Kafka broker by adding the following configuration to our application:

mp.messaging.incoming.foo.connector=liberty-kafka
mp.messaging.incoming.foo.bootstrap.servers=kafkabrokerhost:9092
mp.messaging.incoming.foo.group.id=foo-reader
mp.messaging.incoming.foo.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.foo.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Here, connector is a special property in reactive messaging which defines which connector to use. bootstrap.servers, group.id, key.deserializer, and value.deserializer are properties understood by the Kafka client API. This set of properties is the minimum that must be defined to receive messages from a Kafka topic.

Now, rather than receiving the same message over and over again from methodA, we will receive messages from the foo topic on the Kafka broker.

Similarly, we could replace methodC with an output to a Kafka broker with the following config:

mp.messaging.outgoing.bar.connector=liberty-kafka
mp.messaging.outgoing.bar.bootstrap.servers=kafkabrokerhost:9092
mp.messaging.outgoing.bar.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.bar.value.serializer=org.apache.kafka.common.serialization.StringSerializer

Now, our messages are outputted to the bar topic on the Kafka broker, rather than being consumed and logged by methodC.

Configuring a connector

The configuration properties can be set anywhere that’s read by MicroProfile Config, for example as system properties in Open Liberty’s bootstrap.properties file, or environment variables in Open Liberty’s server.env file. If you’re unfamiliar with MicroProfile Config, check out our tutorial guide.

The Reactive Messaging specification defines two ways to set configuration properties for a connector:

# To set a property only for the foo channel
mp.messaging.incoming.foo.[PROPERTY-NAME]=value1

# To set a property for every channel which uses the liberty-kafka connector
mp.messaging.connector.liberty-kafka.[PROPERTY-NAME]=value2

Properties supported by the liberty-kafka connector for incoming channels:

Property Name Description

topic

The name of the topic to subscribe to. Defaults to the name of the channel.

unacked.limit

The number outstanding unacknowledged messages. If this limit is reached, the connector will stop retrieving records from Kafka until some messages have been acknowledged. Defaults to the value of max.poll.records if set, or to 500.

<any other property>

All other properties are passed directly as config parameters to the KafkaConsumer API. A list of required and optional properties can be found in the Kafka documentation.

Properties supported by the liberty-kafka connector for outgoing channels:

Property Name Description

topic

The name of the topic to send messages to. Defaults to the name of the channel.

<any other property>

All other properties are passed directly as config parameters to the KafkaProducer API. A list of required and optional properties can be found in the Kafka documentation.

So, for example, a full set of properties to connect several channels to topics in IBM Public Cloud Event Streams (which requires encryption and authentication) might look like this:

# Config specific to foo
mp.messaging.incoming.foo.connector=liberty-kafka
mp.messaging.incoming.foo.group.id=foo-reader
mp.messaging.incoming.foo.topic=my-foo-topic
mp.messaging.incoming.foo.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.foo.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# Config specific to bar
mp.messaging.outgoing.bar.connector=liberty-kafka
mp.messaging.outgoing.bar.topic=my-bar-topic
mp.messaging.outgoing.bar.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.bar.value.serializer=org.apache.kafka.common.serialization.StringSerializer

# Config shared between all kafka connections
mp.messaging.connector.liberty-kafka.bootstrap.servers=broker-1-eventstreams.cloud.ibm.com:9093,broker-2-eventstreams.cloud.ibm.com:9093
mp.messaging.connector.liberty-kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="my-apikey";
mp.messaging.connector.liberty-kafka.sasl.mechanism=PLAIN
mp.messaging.connector.liberty-kafka.security.protocol=SASL_SSL
mp.messaging.connector.liberty-kafka.ssl.protocol=TLSv1.2

Some corresponding code for this example might be:

@Incoming("foo")
@Outgoing("bar")
public String toUpperCase(String input) {
    return input.toUpperCase();
}

Packaging applications that use the Kafka connector

When using the Kafka connector included in Open Liberty, you must include the the Kafka client API jar in your application or include it using a shared library.

If you’re building your application with Maven, you do this by adding these dependencies:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.3.0</version>
</dependency>

Messages and acknowledgement

To ensure that no messages are lost in the event of system failure, most messaging systems differentiate between a message being delivered to a system and the processing of that message by that system being complete. If the system goes down, any messages which have been sent but not completed can then be sent again.

Reactive messaging applications implement this using message acknowledgements. When all processing of that message has been completed (so that it wouldn’t need to be re-processed in the event of system failure) it must be acknowledged. Connectors can then communicate back to the messaging system which sent the message that it has been fully processed.

Let’s look at some examples of how acknowledgements work.

Acknowledging incoming messages

Here’s how the Open Liberty Kafka connector handles acknowledgements:

  • A message is received from Kafka.

  • The message is sent into the channel.

  • When the message is acknowledged, the Kafka partition offset for the consumer group is committed up to the ID of the message.

After the message has been acknowledged and the offset committed, if our application crashes or is restarted, we won’t process this message again. (This is a slight simplification, the connector does some further tracking to ensure that all prior messages from that partition have also been acknowledged before committing the partition offset, in case messages are acknowledged out of order.)

Acknowledging outgoing messages

Here’s how the Open Liberty Kafka connector handles acknowledgements:

  • A message is received from the channel.

  • The message is sent to the Kafka broker.

  • When the Kafka broker confirms that the message has been received, the message received from the channel is acknowledged.

Here, we make sure that the message isn’t acknowledged until its been safely stored in the target topic.

Processing and acknowledgements

Methods that process messages (i.e. they’re annotated with both @Incoming and @Outgoing) often receive one message, do some transformation on it, and create a new message.

In this case, the processor needs to ensure that when the new message is acknowledged, the received message is then acknowledged as well.

If this happens, a message can be received into the system, go through any number of processors, resulting in a new message sent out from the system. Then the acknowledgement can be sent back along the chain so that the originating system can be informed that the message has been processed.

Diagram showing a Message passing from a Connector to ChannelB and a Message passing from ChannelB to another Connector. Underneath

Manual vs. automatic acknowledgement

In simple cases, where there’s a 1:1 correlation between incoming and outgoing messages, Open Liberty handles linking up the acknowledgements like this for you. However, where it can’t be sure that this is the case, it defaults to acknowledging messages before they’re processed. The default acknowledgement policy for each method signature is listed in the specification.

This default ensures that all messages are acknowledged, however acknowledging messages before they’re fully processed means that if the system fails, the processing for that message isn’t retried, possibly resulting in lost messages.

To ensure that messages are not lost, you must:

  1. Check the specification to see whether reactive messaging does message acknowledgements before (pre-process) or after (post-process) the message processing, for your methods.

  2. If it does pre-processing, either:

    • change your method signature so that it does post-processing

    • use manual acknowledgement instead

Using manual acknowledgement

To use manual acknowledgement, you must do three things:

  1. Annotate the method with @Acknowledgement(MANUAL)

  2. Use a method signature which includes the Message

  3. Call Message.ack() on each incoming message when the processing of that message has completed.

Here’s an example processing method which accepts strings and filters out any strings which are three characters or fewer.

@Incoming("all-strings")
@Outgoing("long-strings")
public PublisherBuilder<String> removeShortStrings(String input) {
    if (input.length() > 3) {
        // Long string, keep it by returning a stream with this string
        return ReactiveStreams.of(input);
    } else {
        // Short string, drop it by returning an empty stream
        return ReactiveStreams.empty();
    }
}

This method has the PublisherBuilder<O> method(I input) signature, listed in the spec as defaulting to PRE-PROCESS message acknowledgement.

We can convert it to manual acknowledgement like this:

@Incoming("all-strings")
@Outgoing("long-strings")
@Acknowledgement(MANUAL)
public PublisherBuilder<Message<String>> removeShortStrings(Message<String> input) {
    if (input.getPayload().length() > 3) {
        return ReactiveStreams.of(Message.of(input.getPayload(), input::ack));
    } else {
        input.ack();
        return ReactiveStreams.empty();
    }
}

So how does this code link up the message acknowledgements as we described above?

In the case of a long string, it uses Message.of(payload, ackFunction) to create a new Message which, when it is acknowledged, will call ack() on the input message.

In the case of a short string, we want to drop the message, so it calls input.ack() directly to indicate that the message processing is complete.

Controlling logging and trace

You can enable debug logging of the reactive messaging feature by setting the trace specification in the server.xml.

<logging traceSpecification="REACTIVEMESSAGE=all"/>

If you’re using the Kafka connector, you might find it helpful to control logging from the Kafka client. The Kafka client uses slf4j for logging and to integrate this with OpenLiberty’s logging, you need to include the slf4j-jdk14 library alongside the Kafka client library. You can include this Maven depedency:

<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-jdk14</artifactId>
  <version>1.7.25</version>
</dependency>

After doing this, you can enable trace logging for both the reactive messaging feature and the kafka client by setting this trace specification.

<logging traceSpecification="REACTIVEMESSAGE=all:org.apache.kafka.*=all"/>

Relationship to other specifications

MicroProfile Reactive Messaging makes use of and interoperates with two other specifications:

  • Reactive Streams is a specification for doing asynchronous stream processing with back pressure. It defines a minimal set of interfaces to allow components which do this sort of stream processing to be connected together.

  • MicroProfile Reactive Streams Operators is a MicroProfile specification which builds on Reactive Streams to provide a set of basic operators to link different reactive components together and to perform processing on the data which passes between them.

When you use the MicroProfile Reactive Messaging @Incoming and @Outgoing annotations, Open Liberty creates a Reactive Streams component for each method and joins them up by matching the channel names.

In addition, MicroProfile Reactive Messaging supports annotated method which return types from Reactive Streams and Reactive Streams Operators, allowing the user to implement their logic with other libraries which can create these types. For example, they could use the ProcessorBuilder<I, O> method() method signature and return a ProcessorBuilder created using the Reactive Streams Operators API. The full list of supported method signatures can be found in the specification.

Further information

Try it now