Optimizing asynchronous communication with MicroProfile Reactive Messaging
Integrating MicroProfile Reactive Messaging and Apache Kafka with the Liberty-Kafka connector provides an efficient asynchronous communication method for Open Liberty applications. This setup helps you handle large volumes of data efficiently, which is essential for event-driven systems.
Reactive messaging patterns enhance scalability and resilience by enabling applications to send, receive, and process messages asynchronously. Open Liberty incorporates this paradigm with its MicroProfile Reactive Messaging support. You can use MicroProfile Config properties to configure reactive messaging channels in Open Liberty and control how your applications interact with message brokers like Kafka. By configuring this setting, you can precisely tailor the behavior of reactive messaging to the requirements of your application. For more information, see MicroProfile Config properties for MicroProfile Reactive Messaging.
The Liberty-Kafka connector enables applications to send and receive messages from an Apache Kafka broker. It uses MicroProfile Reactive Messaging standards for robust, asynchronous communication in microservices architectures. The following section describes how to intergrate MicroProfile Reactive Messaging with Apache Kafka to send messages within and between applications.
Enable MicroProfile Reactive Messaging
To enable reactive messaging in Open Liberty, add the following element declaration into your server.xml
file, inside the featureManager
element.
<featureManager> <feature>mpReactiveMessaging-3.0</feature> </featureManager>
This step enables the reactive messaging feature in Open Liberty, allowing you to build and run applications that use the MicroProfile Reactive Messaging API.
Configure the Liberty-Kafka connector
Integrate Kafka with Open Liberty by following these steps.
1. Configure the Kafka broker connection
In the microprofile-config.properties
file, specify the Kafka broker addresses to establish a connection, which indicates where your Kafka broker is hosted.
mp.messaging.connector.liberty-kafka.bootstrap.servers=myKafkaBroker:9092
By configuring a MicroProfile application as shown in the example, connect to a Kafka broker at myKafkaBroker:9092
for messaging. The application can send and receive messages through Kafka, which facilitates event-driven communication.
2. Configuring microservices for Kafka messaging with Liberty-Kafka connector
To enable communication among applications, microservices must be set up to send and receive messages through Kafka topics. Using the Liberty-Kafka connector, part of the MicroProfile Reactive Messaging specification, applications can connect to an Apache Kafka broker, ensuring efficient message handling for both incoming and outgoing channels.
Incoming message configuration
For receiving messages from a Kafka topic, configure an incoming channel by using the Liberty-Kafka connector. The setup includes specifying the Kafka topic from which messages are received, other settings such as the broker’s address and consumer group ID for optimized message consumption.
mp.messaging.incoming.myChannel.connector=liberty-kafka mp.messaging.incoming.myChannel.topic=myTopicName mp.messaging.incoming.myChannel.bootstrap.servers=kafkabrokerhost:9092 mp.messaging.incoming.myChannel.group.id=myGroupID mp.messaging.incoming.myChannel.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer mp.messaging.incoming.myChannel.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
The configuration ensures that your application connects to the specified Kafka topic (myTopicName
) for incoming messages, with kafkabrokerhost:9092
as the broker address and myGroupID
as the consumer group ID. The the key.deserializer
and value.deserializer
properties specify how Kafka should convert the bytes of the keys and values of messages back into objects that your application can understand.
To troubleshoot the Liberty-Kafka connector, address key issues like Kafka connectivity, managing multiple server instances, and giving distinct identifiers to producers and consumers. Make sure that the bootstrap.servers
are configured correctly for connection. Each consumer has a distinct group.id
to prevent conflicts, and producers need a distinct client.id
to avoid identifier overlap.
If you start multiple instances of Open Liberty with the same application, you must assign a distinct group.id
to each channel for every server instance. This requirement applies to all incoming channels. Without a distinct group.id
on each server instance, the server will block any new connections to a topic after the first connection is established. This policy makes sure that each connection to a topic is distinct and properly managed across all server instances.
Deploying multiple applications with Kafka clients on Open Liberty to the same Kafka server can lead to identifier conflicts. Kafka assigns client.id
to both producers and consumers, leading to this issue. Consumers also identify themselves using either their group.id
or client.id
. To avoid these conflicts, assigning a distinct group.id
and client.id
to each producer and consumer is crucial. Avoid directly setting these IDs on the Liberty-Kafka connector. Effective management of identifiers is crucial for successful Kafka integration with Open Liberty, facilitating smooth operations within the microservices architecture.
The key.deserializer
and value.deserializer
properties are critical for the correct functioning of your Kafka consumer. They ensure that your application can accurately interpret the keys and values of incoming messages from a Kafka topic. Without proper deserialization, your application might encounter errors or be unable to process the messages it receives.
Outgoing message configuration
Similarly, you must configure an outgoing channel to send messages to a Kafka topic that uses the Liberty-Kafka connector. The setup includes specifying the Kafka topic to which messages are sent, along with serialization for message keys and values.
mp.messaging.outgoing.myChannel.connector=liberty-kafka mp.messaging.outgoing.myChannel.topic=myTopicName mp.messaging.outgoing.myChannel.bootstrap.servers=kafkabrokerhost:9092 mp.messaging.outgoing.myChannel.key.serializer=org.apache.kafka.common.serialization.StringSerializer mp.messaging.outgoing.myChannel.value.serializer=org.apache.kafka.common.serialization.StringSerializer
The configuration ensures messages are sent to the myTopicName
Kafka topic, with the broker located at kafkabrokerhost:9092
. The key.serializer
and value.serializer
properties are crucial for the correct formatting of messages before they are sent to a Kafka topic. Serialization refers to converting the key and value objects from your application into a byte array so that Kafka can efficiently transmit them over the network to the specified topic.
When you set up channels as demonstrated, you directly connect message channels to Kafka. This method provides you with precise control over the messaging channels. As a result, it enhances the scalability and robustness of your application by efficiently managing both incoming and outgoing messages.
For more information, see the following resources:
See Creating the consumer in the inventory microservice in the Creating reactive Java microservices guide.
3. Include Kafka client libraries
To integrate Kafka into your application environment by using Open Liberty, choose one of the following methods.
Include Kafka libraries as an application dependency
This method does not require any additional server configuration for permissions. It incorporates the Kafka client libraries directly within your application deployment. This approach is suitable if you want to keep the Kafka client libraries tightly coupled with your application. The libraries that are used are the ones that are specified in your application’s dependency management system (like Maven). This method simplifies dependency management but does not share the Kafka client libraries across multiple applications that are deployed on the same server.
Include Kafka libraries as a shared library
A shared library enables the assignment of essential permissions for the libraries to operate effectively. When you share the Kafka client libraries, multiple applications on the same Open Liberty server can use them efficiently. This method reduces redundancy and can facilitate easier library management, especially in environments where multiple applications need to interact with Kafka. However, it requires careful management of permissions to avoid class conflicts or security issues.
Include Kafka libraries as an application dependency
To use the Kafka connector provided by Open Liberty, you must include the Kafka client API jar in your application.
If you are building your application with Maven, add the Kafka client dependency in your Maven pom.xml
file.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
This approach integrates Kafka client libraries directly into your application. It does not require any additional server configuration for permissions, simplifying deployment and configuration management.
Include Kafka libraries as a shared library
You can integrate Kafka client libraries as a shared resource within the Open Liberty server. This approach is useful for situations where several applications on the same server instance require the Kafka client libraries. It effectively minimizes duplication.
However, if Kafka client libraries are used as a shared library, you must explicitly grant the necessary Java permissions for the libraries to function correctly. These permissions allow the Kafka client to connect to Kafka brokers, read system properties, and access or modify security properties.
To configure these permissions, you can use the server.xml
configuration file. The following example demonstrates how to grant the necessary permissions to a Kafka client library that is specified as a shared library:
<variable name="kafkaCodebase" value="${server.config.dir}/kafkaLib/kafka-clients-<client.version>.jar"/>
<javaPermission codebase="${kafkaCodebase}" className="javax.management.MBeanServerPermission" name="createMBeanServer"/>
<javaPermission codebase="${kafkaCodebase}" className="javax.management.MBeanPermission" name="*" actions="*"/>
<javaPermission codebase="${kafkaCodebase}" className="javax.management.MBeanTrustPermission"name="register"/>
<!-- Kafka client reads system properties -->
<javaPermission codebase="${kafkaCodebase}" className="java.util.PropertyPermission"name="*"actions="read"/>
<!-- Kafka client connects to the kafka broker server -->
<javaPermission codebase="${kafkaCodebase}" className="java.net.SocketPermission"name="*"actions="connect"/>
<!-- Kafka client loads serializers and deserializers by name -->
<javaPermission codebase="${kafkaCodebase}" className="java.lang.RuntimePermission"name="getcodebase="${kafkaCodebase}" classLoader"actions="*"/>
<!-- Kafka reads truststores -->
<javaPermission codebase="${kafkaCodebase}" className="java.io.FilePermission" name="*" <!-- all files in the current directory (i.e. the server directory) --> actions="read"/>
<!-- Kafka client allowed to invoke the Subject.doAs methods -->
<javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.AuthPermission" name="doAs"/>
<!-- Kafka client allowed to call getSubject -->
<javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.AuthPermission" name="getSubject"/>
<!-- Kafka client sets properties for the Simple SASL/PLAIN Server Provider -->
<javaPermission codebase="${kafkaCodebase}" className="java.security.SecurityPermission" name="putProviderProperty.Simple SASL/PLAIN Server Provider"/>
<!-- Kafka client allowed to set a Provider -->
<javaPermission codebase="${kafkaCodebase}" className="java.security.SecurityPermission" name="insertProvider"/>
<!-- Kafka client allowed access to private Credentials belonging to a particular Subject -->
<javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.PrivateCredentialPermission" name="* * "*"" actions="read"/>
<!-- Kafka client allowed to modify the set of public credentials associated with a Subject -->
<javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.AuthPermission" name="modifyPublicCredentials"/>
<!-- Kafka client allowed to modify the set of private credentials associated with a Subject -->
<javaPermission codebase="${kafkaCodebase}" className="javax.security.auth.AuthPermission" name="modifyPrivateCredentials"/>
4. Configure security for the liberty-kafka connector
For more information on security and authentication protocols, see Kafka connector security configuration.
For more information on Apache Kafka, see the Apache Kafka documentation.