OpenLiberty / docs

See Open Liberty documentation on
13 stars 47 forks source link

Documentation of MicroProfile Reactive Messaging and Stream Operators 3.0 #6928

Closed abutch3r closed 9 months ago

abutch3r commented 1 year ago

Feature epic details

Operating systems

Does the documentation apply to all operating systems?


MicroProfile Reactive Messaging(RM) and Reactive Streams(RSO) 3.0.


Updates to existing topics

3.0 pages for RSO - RM -

Metrics List page

Create a new topic

Liberty-kafka connector

This is currently documented only in blogs and is something we should have a formal version of

Open Liberty includes the liberty-kafka connector for sending and receiving messages from an Apache Kafka broker.

To specify the use of the liberty-kafka connector for your application you can specify its properties as defined under and by setting it as the connector for a channel.

All configuration for Connectors is defined in

Example of setting attributes on the connector: mp.messaging.connector.liberty-kafka.bootstrap.servers=localhost:9082

Example of defining the connector on a channel:

Kafka Client libraries

When using the Kafka connector included in Open Liberty, you must include the the Kafka client API jar in your application or include it using a shared library.

If you’re building your application with Maven, you do this by adding this to the dependencies:

org.apache.kafka kafka-clients 3.5.1

The connector can be configured either directly or have its attributes defined on a channel that uses the connector. The Kafka options on the channel and connector will passed on to the Kafka client. each channel has its own instance of the connector, therefore if it provides its values it will not conflict with another channels.

For some properties it is recommended they are defined on the channel, for any property that is common across multiple channels, it should be defined on the connector. For example, the Kafka bootstrap.servers property is likely to be common across the channels, therefore should be defined on the connector. If the final outgoing channel connected to a different Kafka server to place the result of the interaction, it would provide its own bootstrap.servers. However, a Kafka property such as topic or is best added to the channel they are to be used by.


The liberty-kafka connector supports the following authentication methods

To configure any of these methods, review the relevant Kafka documentation to determine the properties that are needed to be defined and append them to either the Connector or associated Channels. As the Channels values take precedence over the Connector values, it is possible to use different sets of credentials to connect to the same Kafka boot server.

liberty-kafka Connector options

The Liberty-kafka connector has some additional properties that can be set that defines certain behaviors during operation

All these options can either be used as attributes on either the connector or a channel that uses the liberty-kafka connector. If the option is specified on both the channel and the connector, then the channel takes precedence

Incoming Channels
Property Name Description Default
topic The Kafka topic that the channel is to either send or receive messages from
unacked.limit The number outstanding unacknowledged messages. If this limit is reached, the connector will stop retrieving records from Kafka until some messages have been acknowledged. Defaults to the value of max.poll.records if set, or to 500.
fast.ack Defines the acknowledge behavior of the liberty-kafka connector within the MicroProfile Reactive Messaging framework for incoming channels in relation to activities with the Kafka topic.

If the value of fast.ack is false, the acknowledgement is not reported as complete until the partition offset has been committed to the Kafka broker. If an error occurs during this process, then the acknowledgement is reported as failed.

If the value of fast.ack is true, n acknowledgement is reported as complete as soon as the Kafka Connector receives the acknowledgement signal.

- MicroProfile Reactive Messaging 1.0 - False
- MicroProfile Reactive Messaging 3.0 - true
context.service Allows the setting of the Context Service used for Asynchronous tasks.

For the context.service option to take effect for the liberty-kafka connector, the concurrent feature must be enabled.

If the concurrent-x.y feature is enabled. the default context service is used.

If the concurrent feature is not enabled, the built in Liberty context service is used with a set list of context types to capture and apply around asynchronous tasks.

All other properties are passed directly as config parameters to the KafkaConsumer API. A list of required and optional properties can be found in the Kafka documentation. Uses the Kafka Client default
Outgoing channels
Property Name Description Default
topic The Kafka topic that the channel is to either send or receive messages from
context.service Allows the setting of the Context Service used for Asynchronous tasks.

For the context.service option to take effect for the liberty-kafka connector, the concurrent feature must be enabled.

If the concurrent-x.y feature is enabled. the default context service is used.

If the concurrent feature is not enabled, the built in Liberty context service is used with a set list of context types to capture and apply around asynchronous tasks.

All other properties are passed directly as config parameters to the KafkaProducer API. A list of required and optional properties can be found in the Kafka documentation. Uses the Kafka Client default

fast.ack example:

In this example, the fast.ack property is set to false on the connector as the default for any channels in the application. for the Incoming channel foo, this is overridden to true.

context.service example:

In the server.xml 3 different context services are defined with unique ids. In the application's file. the first context service is set on the connector. The application has three channels, The channel def does not specify its own context.service, so it uses the one defined on the connector. The second and third channels define their own services and will use those respectively.


<contextService id=“rst”/>
<contextService id=“uvw”/>
<contextService id=“xyz”/>



Context Services that are defined within the application itself cannot be used with the liberty-kafka connector

Troubleshooting the liberty-kafka connector

Connecting to Kafka

Multiple Server instances

If multiple instances of OpenLiberty with the same application are started. For all incoming channels you must specify a unique on the channel in each server instance, otherwise the server will reject any additional connections to a topic above the first connection.

Multiple Reactive Messaging Applications using the same Kafka server

If multiple applications that use a Kafka client are deployed to liberty and attempt to connect to the same Kafka server then errors might occur due to conflict identifiers used by both Kafka Producers and Consumers across the two applications. This is due to how kafka generates the for both. Consumers will generate identifiers based on their or their

For consumers, it is recommended to create unique for each incoming channel

For producers, it is recommended to create unique for each outgoing channel

Specifying either attribute on the liberty-kafka Connector will not resolve the issue and is not best practice.

dmuelle commented 1 year ago

Hi @abutch3r - please use the following template when you populate this issue. Thanks!

Feature epic details

Operating systems

Does the documentation apply to all operating systems?


Provide a concise summary of your feature. What is the update, why does it matter, and to whom? What do 80% of target users need to know to be most easily productive using your runtime update?


List any new or changed properties, parameters, elements, attributes, etc. Include default values and configuration examples where relevant:

Updates to existing topics

To update existing topics, specify a link to the topics that are affected. Include a copy of the current text and the exact text to which it will change. For example: Change ABC to XYZ

Create a new topic

To create a topic, specify a first draft of the topic that you want added and the section in the navigation where the topic should go.

abutch3r commented 11 months ago

@dmuelle would you be able to have an initial review of the above material I have put together, it does need some refining in the new year. but having an idea of particular focus areas would be helpful.

Is there anything you consider is also missing given the current state of documentation relating to the 1.0 version of this feature, which is very basic. and would currently be considered more in the blogs.

The closest blog we have is which does a mixture of overall RM pieces and the liberty-kafka connector

We don't focus on RSO, so I don't think anything in particular is needed there.

dmuelle commented 11 months ago

Hi Alex- thanks for putting this together- a few points to consider

Ram and I are both out after Friday 12/22 but maybe we can have a quick call when we're all back to sort out what's needed

dmuelle commented 11 months ago

@ramkumar-k-9286 more resources and links for reactive n this old issue-

abutch3r commented 11 months ago

thanks @dmuelle

In response to your questions

A call in the New Year would be really helpful

dmuelle commented 11 months ago

I missed that we wanted to include an link to the new kafka connector topic in the feature description- in that case I think it's better to add it from the docs side so docs team can maintain the link without needing access to metatype files. So no need to edit those, we'll add an extended description from the docs repo

ramkumar-k-9286 commented 9 months ago

Hi Alex @abutch3r

I have added the document to the Draft OL site.

Please consider this a draft document. Please review the document.

Draft link:

Please add your comments to the issue. I will make the corrections.

Regards, Ramkumar.

CC @dmuelle

abutch3r commented 9 months ago

Initial comments

For 3. Include Kafka Client Libraries

We state shared libraries can be used - however if done that way, they need to supply permissions for it.

There are two ways this can be done - via permissions.xml

<permissions xmlns=""

     <!-- Kafka client registers MBeans -->

     <!-- Kafka client reads system properties -->

     <!-- Kafka client connects to the kafka broker server -->

     <!-- Kafka client loads serializers and deserializers by name -->

     <!-- Kafka reads truststores -->
        <name>*</name> <!-- all files in the current directory (i.e. the server directory) -->

     <!-- Kafka client allowed to invoke the Subject.doAs methods -->

     <!-- Kafka client allowed to call getSubject -->

     <!-- Kafka client sets properties for the Simple SASL/PLAIN Server Provider -->
        <name>putProviderProperty.Simple SASL/PLAIN Server Provider</name>

     <!-- Kafka client allowed to set a Provider -->

     <!-- Kafka client allowed access to private Credentials belonging to a particular Subject -->
        <name>* * "*"</name>

     <!-- Kafka client allowed to modify the set of public credentials associated with a Subject -->

     <!-- Kafka client allowed to modify the set of private credentials associated with a Subject -->

or within Server.xml

    <variable name="kafkaCodebase" value="${server.config.dir}/kafkaLib/kafka-clients-<client.version>.jar"/>
   <javaPermission codebase="${kafkaCodebase}" className="" name="createMBeanServer"/>
   <javaPermission codebase="${kafkaCodebase}" className="" name="*" actions="*"/>
   <javaPermission codebase="${kafkaCodebase}" className=""name="register"/>

   <!-- Kafka client reads system properties -->
   <javaPermission codebase="${kafkaCodebase}" className="java.util.PropertyPermission"name="*"actions="read"/>

   <!-- Kafka client connects to the kafka broker server -->
   <javaPermission codebase="${kafkaCodebase}" className=""name="*"actions="connect"/>

   <!-- Kafka client loads serializers and deserializers by name -->
   <javaPermission codebase="${kafkaCodebase}" className="java.lang.RuntimePermission"name="getcodebase="${kafkaCodebase}" classLoader"actions="*"/>

   <!-- Kafka reads truststores -->
   <javaPermission codebase="${kafkaCodebase}" className="" name="*" <!-- all files in the current directory (i.e. the server directory) --> actions="read"/>

   <!-- Kafka client allowed to invoke the Subject.doAs methods -->
   <javaPermission codebase="${kafkaCodebase}" className="" name="doAs"/>

   <!-- Kafka client allowed to call getSubject -->
   <javaPermission codebase="${kafkaCodebase}" className="" name="getSubject"/>

   <!-- Kafka client sets properties for the Simple SASL/PLAIN Server Provider -->
   <javaPermission codebase="${kafkaCodebase}" className="" name="putProviderProperty.Simple SASL/PLAIN Server Provider"/>

   <!-- Kafka client allowed to set a Provider -->
   <javaPermission codebase="${kafkaCodebase}" className="" name="insertProvider"/>

   <!-- Kafka client allowed access to private Credentials belonging to a particular Subject -->
   <javaPermission codebase="${kafkaCodebase}" className="" name="* * "*"" actions="read"/>

   <!-- Kafka client allowed to modify the set of public credentials associated with a Subject -->
   <javaPermission codebase="${kafkaCodebase}" className="" name="modifyPublicCredentials"/>

   <!-- Kafka client allowed to modify the set of private credentials associated with a Subject -->
   <javaPermission codebase="${kafkaCodebase}" className="" name="modifyPrivateCredentials"/>

We should provide one of these otherwise we would get support quesionts - so server.xml is likely sufficient - given it is smaller.

As shared libraries are documented elsewhere, there is no need for an example of it here.

Sending and receiving messages between applications using connectors

For outgoing, The bootstrap server is missing a value in the example

Connector Options and Channel Properties

ramkumar-k-9286 commented 9 months ago

Hi Alex @abutch3r

Made the suggested corrections. Please review the document.

Draft link:

Regards, Ramkumar.

CC @dmuelle

abutch3r commented 9 months ago

Happy with the changes and the page as a whole - just let me know if any other changes are made and require me to re-review


abutch3r commented 9 months ago

@ramkumar-k-9286 based on our discussion earlier a few more changes around the authentication part to simplify and better align with Kafka documentation

I caused some confusion with separating SASL_PLAIN and SASL_SSL,, PLAIN sits on top of SASL_PLAIN and you wouldn't use SASL_SSL as the SASL mechanism and I would remove Basic Auth as an option as that is actually for Admin APIs, not for Client access.

So for the options I would now go with:

As Examples of each of the above.

SSL - Client authenticating the server



Authenticating with Kafka's Plain Login Module
mp.messaging.connector.liberty-kafka.sasl.mechanism=PLAIN required username\="test" password\="test-QmCFfb";

Authenticating with Open Liberty's Kafka Login Module that can use passwords encoded by Open Liberty's securityUtility on a per channel basis

mp.messaging.incoming.aes-test-in.ssl.truststore.password=kafka-teststore required username\="test" password\="{aes}<encoded password>";

mp.messaging.outgoing.aes-test-out.sasl.mechanism=PLAIN required username\="test" password\="{aes}<encoded password>";

Mutual TLS - each channel uses a separate keystore to authenticate itself with the Kafka boostrap server



ramkumar-k-9286 commented 9 months ago

Hi Alex @abutch3r

Edits for the existing documents:

Added mp.messaging.message.count{channel=“<channelname>“} info for mpMetrics-5.0 and mpMetrics-4.0

Draft link:

Information was added to the MicroProfile Reactive Messaging 1.0 page.

Draft link:

Suggested edits made to the table - MicroProfile Config properties for MicroProfile Reactive Messaging

Draft link:

Please check and confirm if you are happy with the changes.

Regards, Ramkumar

abutch3r commented 9 months ago

@ramkumar-k-9286 happy with those changes

dmuelle commented 9 months ago

Peer review

Looks good @ramkumar-k-9286 - a few suggestions

The title shouldn't be just a concatenation of product names but should focus on functional benefit- like "Optimizing asynchronous communication with MicroProfile Reactive Messaging"- but in the TOC, just "Optimizing asynchronous communication"

also- most headings in the topic use Headline Capitalization- per IBM style, convert them to Sentence capitalization


Reduce wordiness in shortdesc, and make it two sentences

Integrating MicroProfile Reactive Messaging and Apache Kafka with the liberty-kafka connector provides an efficient asynchronous communication method for Open Liberty applications. This setup helps you handle large volumes of data efficiently, which is essential for event-driven systems.

then introduce the sections, but dont use passive voice

The following sections describe how to intergrate MicroProfile Reactive Messaging with Apache Kafka to send messages within and between applications:

make sure to update the anchor list, still has a link to Connector options and channel properties, which is no longer on this page

Configure the Liberty-kafka connector

The liberty-kafka connector feature within Open Liberty facilitates seamless integration with Apache Kafka, enablinges applications to send and receive messages from an Apache Kafka broker. It uses MicroProfile Reactive Messaging standards for robust, asynchronous communication in microservices architectures.

make sure to provide a link to

You can fine-tune your application’s interaction with Kafka by configuring the connector in the file, as shown in the following example.


What does this example show? Whats the result?

Giving you precise control over the messaging channels.

this is a sentence fragment

Integrating Kafka with Open Liberty involves a set of carefully planned actions to achieve seamless communication between your application and Kafka message brokers. The process starts with establishing a stable connection with Kafka brokers. It also includes creating specific channels for sending and receiving messages, and incorporating the necessary Kafka client libraries to your application. Each of these steps is crucial for using the full potential of Kafka within an Open Liberty environment, enabling efficient, scalable messaging capabilities. To properly set up the liberty-kafka connector, proceed with the following steps:

replace this paragraph with a concise sentence that leads into an ordered anchor list of the steps, link to the step headings. Add the corresponding numbers to each heading.

Indicating where your Kafka broker is hosted.

sentence fragment

To integrate Kafka into your application environment by using Open Liberty, choose one of the following methods based on your requirement.

list the options here and link to them so that the user only needs to read the option that they want.

  • This approach integrates Kafka

stray plus sign

Link the first mention of shared libraries to

Kafka client libraries can be integrated a

avoid passive voice

to the Kafka client library specified as a shared ---> to a Kafka client library that is specified as a shared

Kafka connector configuration and security

I think we should move this into a separate topic and provide a link to it from the end of the configuration section

To make sure of secure communication with Kafka brokers, you set the appropriate security properties within the file, facilitating the support of various authentication methods.

what does this mean- do you need to support more than one method? does the user configure all 3 options, or just one?

Make sure the list is parallel- text names first, followed by abbreviations in parentheses.

The following example demonstrates how to configure a Kafka client for secure SSL communication with Kafka brokers , by using the MicroProfile Config API within in the file.

The following configuration that is shown enables SSL-based authentication to make sure so that the client can securely verify the identity of the Kafka server it connects to. It is essential for applications that are deployed in sensitive environments where data security and privacy are the priorities.

Client authenticating the server

The following examples demonstrate~s~ how to configure secure communication with Kafka brokers by using the MicroProfile Config API, specifically within the context of Open Liberty applications. It demonstrates the setup of SASL_SSL (Simple Authentication and Security Layer over SSL) for authentication , and details both the use of with the Kafka Plain Login Module and the Open Liberty Kafka Login Module.

Does the user configure both, or either?

securityUtility encode link is broken.

The following example involves configuring configures each channel with its own keystore to authenticate itself with the Kafka bootstrap server, as detailed in the configuration settings.

Mutual TLS not only secures the data in transit but also makes sure that each communication partner is authenticated, thus adding another layer of security in distributed systems communication.

move that sentence to the intro for this section.

Each channel uses a separate keystore to authenticate itself with the Kafka Bootstrap server

this doesn't need to be in a bullet.

Sending and receiving messages between applications by using connectors

Should probably be "Sending and receiving messages among applications by using connectors" as there might be more than 2 apps. Between is for two items

The following example shows you how to configure a microservice for retrieving messages from a Kafka topic , which is achieved by using MicroProfile (MP) Reactive Messaging and a Kafka connector the liberty-kafka connector.

The kafkabrokerhost:9092 Kafka broker address, the foo-reader consumer group ID, and the deserializers for both key and value are org.apache.kafka.common.serialization.StringDeserializer, indicating that both keys and values are expected to be strings.

this doesn't make sense, seems like two sentences might have been smushed together

This configuration is essential for retrieving messages from the specified topic, facilitating the building of reactive applications that can efficiently process data streams.

The example indicates the use of uses the liberty-kafka connector for managing to manage the connection between the application and Kafka.

add a fmi link to


Run acrolinx and revise this section- make sure to reduce wordiness and overlong sentences

dmuelle commented 9 months ago

feature pages

Draft link:

Link to new topic is broken and title is wrong. Need to check the 3.0 page once it's available, hopefully Monday.

MP config table

Draft link:

Specifies a connector attribute value. If an attribute is specified for both the channel and its corresponding connector, the attribute set for the channel overrides that of the connector.

---> Specifies a connector attribute value. If an attribute is specified for both the channel and its corresponding connector, the value that is set for the channel overrides the value for the connector.

dmuelle commented 9 months ago

Added mp.messaging.message.count{channel=““} info for mpMetrics-5.0 and mpMetrics-4.0

Draft link:

Looks good but confirm there is not a minimum version of Reactive messaging required

dmuelle commented 9 months ago

Optimizing asynchronous communication with MicroProfile Reactive Messaging

Configure the Liberty-Kafka connector

link to on first mention of the "Apache Kafka broker"

Looking again at the first part of the "Configure..." section, the examples are exactly the same as those in the following steps. No need to duplicate them. Remove everything from "You can fine-tune your application’s interaction ..." to "...efficently managing messages."

then link to the config properties table in step 1

describe the result of the config in step one. You can use some of the text you removed from the intro, but make sure to adaot it for this example.

Include Kafka client libraries

list the two options here and link to them so that the user only needs to read the option that they want.

For more information on security and authentication methods, see Kafka connector security configuration.

so this doesn't get lost, add a 4th step "Configure security for the liberty-kafka connector". Then just use the statement you already have beneath that step.

Sending and receiving messages among applications by using connectors

remove this sentence:

This configuration is essential for retrieving messages from the specified topic, facilitating the building of reactive applications that can efficiently process data streams.

This is a sentence fragment:

Similarly, the following example of how to set up a microservice to send messages to a Kafka broker.

Put this at the end of the sending and receiving section instead of in Troubleshooting

For more information, see Creating the consumer in the inventory microservice in the Creating reactive Java microservices guide.


Run acrolinx and fix errors

ramkumar-k-9286 commented 9 months ago

Hi David @dmuelle

Incorporated suggested comments.

Draft links:

Regards, Ramkumar

dmuelle commented 9 months ago

published at, closing as completed