Optimizing asynchronous communication with MicroProfile Reactive Messaging

Integrating MicroProfile Reactive Messaging and Apache Kafka with the liberty-kafka connector provides an efficient asynchronous communication method for Open Liberty applications. This setup helps you handle large volumes of data efficiently, which is essential for event-driven systems.

The following sections describe how to intergrate MicroProfile Reactive Messaging with Apache Kafka to send messages within and between applications:

Configure the Liberty-Kafka connector

The liberty-kafka connector enables applications to send and receive messages from an Apache Kafka broker. It uses MicroProfile Reactive Messaging standards for robust, asynchronous communication in microservices architectures.

Integrate Kafka with Open Liberty by following these steps:

1. Configure the Kafka broker connection

In the microprofile-config.properties file, specify the Kafka broker addresses to establish a connection, which indicates where your Kafka broker is hosted.

mp.messaging.connector.liberty-kafka.bootstrap.servers=myKafkaBroker:9092

By configuring a MicroProfile application as shown in the example, to connect to a Kafka broker at myKafkaBroker:9092 for messaging. The application is able to send and receive messages through Kafka, facilitating event-driven communication.

2. Define channels for messaging

  • To specify the Kafka topic from which messages are received, create a channel for incoming messages.

mp.messaging.incoming.myChannel.connector=liberty-kafka
mp.messaging.incoming.myChannel.topic=myTopicName
  • To specify the Kafka topic from which messages are sent, create a channel for outgoing messages.

mp.messaging.outgoing.myChannel.connector=liberty-kafka
mp.messaging.outgoing.myChannel.topic=myTopicName.

By setting up a channel as shown in the examples, you connect a message channel directly to Kafka, which gives you precise control over the messaging channels. You can specify attributes such as bootstrap.servers for connection and topic for message direction, enhancing your application’s ability to scale and remain robust by efficiently managing messages.

For more information on Liberty-Kafka connector options and channel properties, see Liberty-Kafka connector options and channel properties.

3. Include Kafka client libraries

To integrate Kafka into your application environment by using Open Liberty, choose one of the following methods based on your requirement:

Include Kafka libraries as an application dependency

To use the Kafka connector provided by Open Liberty, you must include the Kafka client API jar in your application.

If you are building your application with Maven, add the Kafka client dependency in your Maven pom.xml file.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.1</version>
</dependency>

This approach integrates Kafka client libraries directly into your application. It does not require any additional server configuration for permissions, simplifying deployment and configuration management.

Include Kafka libraries as a shared library

You can integrate Kafka client libraries as a shared resource within the Open Liberty server. This approach is useful for situations where several applications on the same server instance require the Kafka client libraries. It effectively minimizes duplication.

However, if Kafka client libraries are used as a shared library, you must explicitly grant the necessary Java permissions for the libraries to function correctly. These permissions allow the Kafka client to connect to Kafka brokers, read system properties, and access or modify security properties.

To configure these permissions, you can use the server.xml configuration file. The following example demonstrates how to grant the necessary permissions to a Kafka client library that is specified as a shared library:

    <variable name="kafkaCodebase" value="${server.config.dir}/kafkaLib/kafka-clients-<client.version>.jar"/>
   <javaPermission codebase="${kafkaCodebase}" className="javax.management.MBeanServerPermission" name="createMBeanServer"/>
   <javaPermission codebase="${kafkaCodebase}" className="javax.management.MBeanPermission" name="*" actions="*"/>
   <javaPermission codebase="${kafkaCodebase}" className="javax.management.MBeanTrustPermission"name="register"/>

   <!-- Kafka client reads system properties -->
   <javaPermission codebase="${kafkaCodebase}" className="java.util.PropertyPermission"name="*"actions="read"/>

   <!-- Kafka client connects to the kafka broker server -->
   <javaPermission codebase="${kafkaCodebase}" className="java.net.SocketPermission"name="*"actions="connect"/>

   <!-- Kafka client loads serializers and deserializers by name -->
   <javaPermission codebase="${kafkaCodebase}" className="java.lang.RuntimePermission"name="getcodebase="${kafkaCodebase}" classLoader"actions="*"/>

   <!-- Kafka reads truststores -->
   <javaPermission codebase="${kafkaCodebase}" className="java.io.FilePermission" name="*" <!-- all files in the current directory (i.e. the server directory) --> actions="read"/>

   <!-- Kafka client allowed to invoke the Subject.doAs methods -->
   <javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.AuthPermission" name="doAs"/>

   <!-- Kafka client allowed to call getSubject -->
   <javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.AuthPermission" name="getSubject"/>

   <!-- Kafka client sets properties for the Simple SASL/PLAIN Server Provider -->
   <javaPermission codebase="${kafkaCodebase}" className="java.security.SecurityPermission" name="putProviderProperty.Simple SASL/PLAIN Server Provider"/>

   <!-- Kafka client allowed to set a Provider -->
   <javaPermission codebase="${kafkaCodebase}" className="java.security.SecurityPermission" name="insertProvider"/>

   <!-- Kafka client allowed access to private Credentials belonging to a particular Subject -->
   <javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.PrivateCredentialPermission" name="* * "*"" actions="read"/>

   <!-- Kafka client allowed to modify the set of public credentials associated with a Subject -->
   <javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.AuthPermission" name="modifyPublicCredentials"/>

   <!-- Kafka client allowed to modify the set of private credentials associated with a Subject -->
   <javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.AuthPermission" name="modifyPrivateCredentials"/>

4. Configure security for the liberty-kafka connector

For more information on security and authentication methods, see Kafka connector security configuration.

Sending and receiving messages among applications by using connectors

To send and receive messages from other systems, reactive messaging uses connectors. Connectors can be attached to one end of a channel and are configured by using MicroProfile Config. Open Liberty includes the liberty-kafka connector for sending and receiving messages from an Apache Kafka broker.

The following example shows you how to configure a microservice for retrieving messages from a Kafka topic by using MicroProfile Reactive Messaging and the liberty-kafka connector.

mp.messaging.incoming.foo.connector=liberty-kafka
mp.messaging.incoming.foo.bootstrap.servers=kafkabrokerhost:9092
mp.messaging.incoming.foo.group.id=foo-reader
mp.messaging.incoming.foo.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.foo.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

The kafkabrokerhost:9092 Kafka broker address, the foo-reader consumer group ID, and the deserializers for both key and value are org.apache.kafka.common.serialization.StringDeserializer, indicating that both keys and values are expected to be strings.

Similarly, the following example shows you how to set up a microservice to send messages to a Kafka broker by using MicroProfile Reactive Messaging and the liberty-kafka connector.

mp.messaging.outgoing.bar.connector=liberty-kafka
mp.messaging.outgoing.bar.bootstrap.servers=kafkabrokerhost:9092
mp.messaging.outgoing.bar.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.bar.value.serializer=org.apache.kafka.common.serialization.StringSerializer

The example uses the liberty-kafka connector to manage the connection between the application and Kafka. The bootstrap.servers setting points to kafkabrokerhost:9092, the Kafka broker’s network address, allowing the application to locate and send messages to the Kafka cluster. The key and value of messages are configured to use StringSerializer. The application serializes both parts of the message as strings for Kafka transmission.

The application gains the ability to offload messages to the Kafka topic bar. This approach to distributed messaging enhances scalability and flexibility in handling data flows.

For more information on liberty-kafka connector options and channel properties, see Liberty-kafka connector options and channel properties.

For more information, see Creating the consumer in the inventory microservice in the Creating reactive Java microservices guide.

Troubleshooting

To troubleshoot the liberty-kafka connector, address key issues like Kafka connectivity, managing multiple server instances, and giving distinct identifiers to producers and consumers. Make sure that the bootstrap.servers are configured correctly for connection. Each consumer has a distinct group.id to prevent conflicts, and producers need a distinct client.id to avoid identifier overlap.

Multiple server instances

If you start multiple instances of Open Liberty with the same application, you must assign a distinct group.id to each channel for every server instance. This requirement applies to all incoming channels. Without a distinct group.id on each server instance, the server will block any new connections to a topic after the first connection is established. This policy makes sure that each connection to a topic is distinct and properly managed across all server instances.

Multiple Reactive Messaging applications using the same Kafka server

When multiple applications with a Kafka client are deployed to Open Liberty and connect to the same Kafka server, errors can happen. The errors come from conflicting identifiers that both Kafka producers and consumers use within these applications. Kafka generates the client.id for both, that lead to these conflicts, especially since consumers determine their identifiers by using either their group.id or client.id.

To mitigate these conflicts, it is essential to make sure that each consumer channel has a distinct group.id and each producer channel has a distinct client.id. However, specifying these attributes directly on the liberty-kafka connector is not an effective solution and must be avoided.

These steps aim to identify and address common challenges that arise during the integration of Kafka with Open Liberty. They help in facilitating the smooth functioning of your microservices architecture.

For more information on Apache Kafka, see the Apache Kafka documentation.