Liberty-Kafka connector options and channel properties

You can fine-tune the Liberty-Kafka connector by setting MicroProfile Config properties. The attributes that you specify for these properties configure message channels, message acknowledgment, context services, and other operational details.

You can use all these options as attributes on either the connector or a channel that uses the Liberty-Kafka connector. If you specify the option on both the channel and the connector, the channel takes precedence.

To secure communication with Kafka brokers, set the necessary security properties in the microprofile-config.properties file.

Managing incoming and outgoing channels with Liberty-Kafka connector attributes

The Liberty-Kafka connector helps applications use Kafka for sending and receiving messages. It integrates with the MicroProfile Reactive Messaging framework, simplifying message management. You can configure how to receive messages from Kafka, such as selecting which topics to listen to and managing message acknowledgments, facilitating efficient messages processing.

For sending messages, you can decide where to send messages by specifying topics and use settings to make sending messages more efficient, like managing background tasks better. This means you can adjust the connector to handle both incoming and outgoing messages in the best way for your application. It facilitates smooth operation and the ability to meet the demands of working with Kafka.

Attributes supported by the Liberty-Kafka connector for incoming and outgoing channels
Attribute NameChannelDefaultDescription

topic

Incoming and Outgoing channels

The name of the channel.

The Kafka topic that the channel is to either send or receive messages from.

unacked.limit

Incoming channel

Defaults to the value of max.poll.records if set, or to 500.

The number outstanding unacknowledged messages.
If this limit is reached, the connector stops retrieving records from Kafka until some messages are acknowledged.

fast.ack

Incoming channel

  • MicroProfile Reactive Messaging 1.0 - False

  • MicroProfile Reactive Messaging 3.0 - true

Defines the acknowledge behavior of the Liberty-Kafka connector within the MicroProfile Reactive Messaging framework for incoming channels in relation to activities with the Kafka topic.
If the value of fast.ack is false, the acknowledgment is not reported as complete until the partition offset is committed to the Kafka broker. If an error occurs during this process, then the acknowledgment is reported as failed.
If the value of fast.ack is true, and the acknowledgment is reported as complete when the Kafka connector receives the acknowledgment signal. For more information, see fast.ack.

context.service

Incoming and Outgoing channels

If the Jakarta Concurrency feature is enabled. the default context service is used.
If the Jakarta Concurrency feature is not enabled, the built in Liberty context service is used with a set list of context types to capture and apply around asynchronous tasks.

Enabling the Jakarta Concurrency feature is needed for the context.service option to be effective for the Liberty-Kafka connector.
This setting specifies the Context Service that is used for Asynchronous tasks. For more information, see context.service.

<any other property>

Incoming and Outgoing channels

Uses the Kafka Client default

All other properties are passed directly as config parameters to the KafkaConsumer API. A list of required and optional properties can be found in the Kafka documentation.

fast.ack

Properties like fast.ack allow for control over message acknowledgment processes, improving the efficiency of message processing.

In the following example, in the microprofile-config.properties file of the application, the fast.ack setting is defaulted to false for the connector, affecting all channels in the application. However, for a distinct incoming channel called foo, this setting is overridden to true. This alteration favors a quicker acknowledgment approach, which enhances the handling performance for messages that arrive on this channel.

mp.messaging.connector.liberty-kafka.fast.ack=false

mp.messaging.incoming.foo.connector=liberty-kafka
mp.messaging.incoming.foo.fast.ack=true

context.service

The context.service attribute specifies the Context Service for asynchronous operations, critical for performance tuning. Context Services that are defined within the application itself cannot be used with the Liberty-Kafka connector.

In the following example, the server.xml file defines three different context services, each with a unique identifier (rst, uvw, and xyz).

<contextService id=“rst”/>
<contextService id=“uvw”/>
<contextService id=“xyz”/>

The microprofile-config.properties file is part of the application’s configuration and specifies how MicroProfile features should be used within the application.

mp.messaging.connector.liberty-kafka.context.service=rst

mp.messaging.incoming.def.connector=liberty-kafka
mp.messaging.incoming.foo.connector=liberty-kafka
mp.messaging.incoming.foo.context.service=uvw
mp.messaging.outgoing.bar.connector=liberty-kafka
mp.messaging.outgoing.bar.context.service=xyz

In the example, the property mp.messaging.connector.liberty-kafka.context.service=rst indicates that the Kafka connector that is used for handling messaging between services uses the rst context service by default for its operations.

The application has three channels (def, foo, and bar), which are logical endpoints for incoming and outgoing messages. The configuration for these channels specifies which Kafka connector to use (liberty-kafka) and for two of the channels (foo and bar), overrides the default context service with their own (uvw and xyz, respectively). The def channel does not specify its own context.service, so it inherits the default one (rst) defined at the connector level.

By defining separate context services, the application can isolate certain operations or configurations, which can be useful in complex applications or during integration with external systems. These configurations demonstrate the flexibility and control that you have over message processing in Open Liberty applications.