Optimizing asynchronous communication with MicroProfile Reactive Messaging

Integrating MicroProfile Reactive Messaging and Apache Kafka with the Liberty-Kafka connector provides an efficient asynchronous communication method for Open Liberty applications. This setup helps you handle large volumes of data efficiently, which is essential for event-driven systems.

Reactive messaging patterns enhance scalability and resilience by enabling applications to send, receive, and process messages asynchronously. Open Liberty incorporates this paradigm with its MicroProfile Reactive Messaging support. You can use MicroProfile Config properties to configure reactive messaging channels in Open Liberty and control how your applications interact with message brokers like Kafka. By configuring this setting, you can precisely tailor the behavior of reactive messaging to the requirements of your application. For more information, see MicroProfile Config properties for MicroProfile Reactive Messaging.

The Liberty-Kafka connector enables applications to send and receive messages from an Apache Kafka broker. It uses MicroProfile Reactive Messaging standards for robust, asynchronous communication in microservices architectures. The following section describes how to intergrate MicroProfile Reactive Messaging with Apache Kafka to send messages within and between applications.

Enable MicroProfile Reactive Messaging

To enable reactive messaging in Open Liberty, add the following element declaration into your server.xml file, inside the featureManager element.

<featureManager>
    <feature>mpReactiveMessaging-3.0</feature>
</featureManager>

This step enables the reactive messaging feature in Open Liberty, allowing you to build and run applications that use the MicroProfile Reactive Messaging API.

Configure the Liberty-Kafka connector

Integrate Kafka with Open Liberty by following these steps.

1. Configure the Kafka broker connection

In the microprofile-config.properties file, specify the Kafka broker addresses to establish a connection, which indicates where your Kafka broker is hosted.

mp.messaging.connector.liberty-kafka.bootstrap.servers=myKafkaBroker:9092

By configuring a MicroProfile application as shown in the example, connect to a Kafka broker at myKafkaBroker:9092 for messaging. The application can send and receive messages through Kafka, which facilitates event-driven communication.

2. Configuring microservices for Kafka messaging with Liberty-Kafka connector

To enable communication among applications, microservices must be set up to send and receive messages through Kafka topics. Using the Liberty-Kafka connector, part of the MicroProfile Reactive Messaging specification, applications can connect to an Apache Kafka broker, ensuring efficient message handling for both incoming and outgoing channels.

Incoming message configuration

For receiving messages from a Kafka topic, configure an incoming channel by using the Liberty-Kafka connector. The setup includes specifying the Kafka topic from which messages are received, other settings such as the broker’s address and consumer group ID for optimized message consumption.

mp.messaging.incoming.myChannel.connector=liberty-kafka
mp.messaging.incoming.myChannel.topic=myTopicName
mp.messaging.incoming.myChannel.bootstrap.servers=kafkabrokerhost:9092
mp.messaging.incoming.myChannel.group.id=myGroupID
mp.messaging.incoming.myChannel.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.myChannel.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

The configuration ensures that your application connects to the specified Kafka topic (myTopicName) for incoming messages, with kafkabrokerhost:9092 as the broker address and myGroupID as the consumer group ID. The the key.deserializer and value.deserializer properties specify how Kafka should convert the bytes of the keys and values of messages back into objects that your application can understand.

To troubleshoot the Liberty-Kafka connector, address key issues like Kafka connectivity, managing multiple server instances, and giving distinct identifiers to producers and consumers. Make sure that the bootstrap.servers are configured correctly for connection. Each consumer has a distinct group.id to prevent conflicts, and producers need a distinct client.id to avoid identifier overlap.

If you start multiple instances of Open Liberty with the same application, you must assign a distinct group.id to each channel for every server instance. This requirement applies to all incoming channels. Without a distinct group.id on each server instance, the server will block any new connections to a topic after the first connection is established. This policy makes sure that each connection to a topic is distinct and properly managed across all server instances.

Deploying multiple applications with Kafka clients on Open Liberty to the same Kafka server can lead to identifier conflicts. Kafka assigns client.id to both producers and consumers, leading to this issue. Consumers also identify themselves using either their group.id or client.id. To avoid these conflicts, assigning a distinct group.id and client.id to each producer and consumer is crucial. Avoid directly setting these IDs on the Liberty-Kafka connector. Effective management of identifiers is crucial for successful Kafka integration with Open Liberty, facilitating smooth operations within the microservices architecture.

The key.deserializer and value.deserializer properties are critical for the correct functioning of your Kafka consumer. They ensure that your application can accurately interpret the keys and values of incoming messages from a Kafka topic. Without proper deserialization, your application might encounter errors or be unable to process the messages it receives.

Outgoing message configuration

Similarly, you must configure an outgoing channel to send messages to a Kafka topic that uses the Liberty-Kafka connector. The setup includes specifying the Kafka topic to which messages are sent, along with serialization for message keys and values.

mp.messaging.outgoing.myChannel.connector=liberty-kafka
mp.messaging.outgoing.myChannel.topic=myTopicName
mp.messaging.outgoing.myChannel.bootstrap.servers=kafkabrokerhost:9092
mp.messaging.outgoing.myChannel.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.myChannel.value.serializer=org.apache.kafka.common.serialization.StringSerializer

The configuration ensures messages are sent to the myTopicName Kafka topic, with the broker located at kafkabrokerhost:9092. The key.serializer and value.serializer properties are crucial for the correct formatting of messages before they are sent to a Kafka topic. Serialization refers to converting the key and value objects from your application into a byte array so that Kafka can efficiently transmit them over the network to the specified topic.

When you set up channels as demonstrated, you directly connect message channels to Kafka. This method provides you with precise control over the messaging channels. As a result, it enhances the scalability and robustness of your application by efficiently managing both incoming and outgoing messages.

For more information, see the following resources:

3. Include Kafka client libraries

To integrate Kafka into your application environment by using Open Liberty, choose one of the following methods.

  • Include Kafka libraries as an application dependency

    This method does not require any additional server configuration for permissions. It incorporates the Kafka client libraries directly within your application deployment. This approach is suitable if you want to keep the Kafka client libraries tightly coupled with your application. The libraries that are used are the ones that are specified in your application’s dependency management system (like Maven). This method simplifies dependency management but does not share the Kafka client libraries across multiple applications that are deployed on the same server.

  • Include Kafka libraries as a shared library

    A shared library enables the assignment of essential permissions for the libraries to operate effectively. When you share the Kafka client libraries, multiple applications on the same Open Liberty server can use them efficiently. This method reduces redundancy and can facilitate easier library management, especially in environments where multiple applications need to interact with Kafka. However, it requires careful management of permissions to avoid class conflicts or security issues.

Include Kafka libraries as an application dependency

To use the Kafka connector provided by Open Liberty, you must include the Kafka client API jar in your application.

If you are building your application with Maven, add the Kafka client dependency in your Maven pom.xml file.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.1</version>
</dependency>

This approach integrates Kafka client libraries directly into your application. It does not require any additional server configuration for permissions, simplifying deployment and configuration management.

Include Kafka libraries as a shared library

You can integrate Kafka client libraries as a shared resource within the Open Liberty server. This approach is useful for situations where several applications on the same server instance require the Kafka client libraries. It effectively minimizes duplication.

However, if Kafka client libraries are used as a shared library, you must explicitly grant the necessary Java permissions for the libraries to function correctly. These permissions allow the Kafka client to connect to Kafka brokers, read system properties, and access or modify security properties.

To configure these permissions, you can use the server.xml configuration file. The following example demonstrates how to grant the necessary permissions to a Kafka client library that is specified as a shared library:

    <variable name="kafkaCodebase" value="${server.config.dir}/kafkaLib/kafka-clients-<client.version>.jar"/>
   <javaPermission codebase="${kafkaCodebase}" className="javax.management.MBeanServerPermission" name="createMBeanServer"/>
   <javaPermission codebase="${kafkaCodebase}" className="javax.management.MBeanPermission" name="*" actions="*"/>
   <javaPermission codebase="${kafkaCodebase}" className="javax.management.MBeanTrustPermission"name="register"/>

   <!-- Kafka client reads system properties -->
   <javaPermission codebase="${kafkaCodebase}" className="java.util.PropertyPermission"name="*"actions="read"/>

   <!-- Kafka client connects to the kafka broker server -->
   <javaPermission codebase="${kafkaCodebase}" className="java.net.SocketPermission"name="*"actions="connect"/>

   <!-- Kafka client loads serializers and deserializers by name -->
   <javaPermission codebase="${kafkaCodebase}" className="java.lang.RuntimePermission"name="getcodebase="${kafkaCodebase}" classLoader"actions="*"/>

   <!-- Kafka reads truststores -->
   <javaPermission codebase="${kafkaCodebase}" className="java.io.FilePermission" name="*" <!-- all files in the current directory (i.e. the server directory) --> actions="read"/>

   <!-- Kafka client allowed to invoke the Subject.doAs methods -->
   <javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.AuthPermission" name="doAs"/>

   <!-- Kafka client allowed to call getSubject -->
   <javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.AuthPermission" name="getSubject"/>

   <!-- Kafka client sets properties for the Simple SASL/PLAIN Server Provider -->
   <javaPermission codebase="${kafkaCodebase}" className="java.security.SecurityPermission" name="putProviderProperty.Simple SASL/PLAIN Server Provider"/>

   <!-- Kafka client allowed to set a Provider -->
   <javaPermission codebase="${kafkaCodebase}" className="java.security.SecurityPermission" name="insertProvider"/>

   <!-- Kafka client allowed access to private Credentials belonging to a particular Subject -->
   <javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.PrivateCredentialPermission" name="* * "*"" actions="read"/>

   <!-- Kafka client allowed to modify the set of public credentials associated with a Subject -->
   <javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.AuthPermission" name="modifyPublicCredentials"/>

   <!-- Kafka client allowed to modify the set of private credentials associated with a Subject -->
   <javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.AuthPermission" name="modifyPrivateCredentials"/>

4. Configure security for the liberty-kafka connector

For more information on security and authentication protocols, see Kafka connector security configuration.

For more information on Apache Kafka, see the Apache Kafka documentation.