smallrye / smallrye-reactive-messaging

SmallRye Reactive Messaging
http://www.smallrye.io/smallrye-reactive-messaging/
Apache License 2.0
241 stars 179 forks source link

Connect to different brokers per topic - no events are consumed #2621

Closed duckdeer closed 5 months ago

duckdeer commented 6 months ago

I'm using Smallrye-Kafka in my Quarkus application for connecting to kafka. Until now the requirement was "having a single bootstrap-server which is valid for all channels". The configuration and event consuming worked without problems. This requirement changed and now I'm trying to configure the target broker per channel.

I've configured my application.properties and was able to connect to different brokers per topic. The startup log shows a successful connection to my different brokers without any errors. The correct topics are also mentioned in the log, so everything looks perfect. But the problem is, that no events are consumed from the topics. When I include some error in the configuration, the connection fails on startup. So I'm sure that my configuration is read and the connection to my brokers are established correctly.

My application.properties contains some some config options which are valid for all channels. Beside that there are two topics which are referencing to a kafka-configuration option which is mentioned in the official documentation. I'm using the same consumer-group for both topics by purpose. There are multiple cluster-nodes of my application where each node uses a different consumer-group.

My code is shown below:

# Kafka common
kafka.health-enabled=false
kafka.session.timeout.ms=45000

# Common configuration used for all channels
mp.messaging.connector=smallrye-kafka
mp.messaging.connector.smallrye-kafka.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
mp.messaging.connector.smallrye-kafka.value-deserialization-failure-handler=kafka-value-failure-handler
mp.messaging.connector.smallrye-kafka.fail-on-deserialization-failure=false
mp.messaging.connector.smallrye-kafka.auto.offset.reset=earliest
mp.messaging.connector.smallrye-kafka.use.latest.version=true
mp.messaging.connector.smallrye-kafka.auto.register.schemas=false
mp.messaging.connector.smallrye-kafka.connections.max.idle.ms=-1
mp.messaging.connector.smallrye-kafka.failure-strategy=ignore

# Topic 1
mp.messaging.incoming.my-channel-1.topic=my-topic-1
mp.messaging.incoming.my-channel-1.group.id=my-consumer-group

mp.messaging.incoming.my-channel-1.kafka-configuration=my-configuration-1

# Topic 2
mp.messaging.incoming.my-channel-2.topic=my-topic-2
mp.messaging.incoming.my-channel-2.group.id=my-consumer-group

mp.messaging.incoming.my-channel-2.kafka-configuration=my-configuration-2
import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import java.util.HashMap;
import java.util.Map;

@Singleton
@Slf4j
public class KafkaAuthConfiguration {

    private static final String SASL_CONFIGURATION_STRING = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\"password=\"%s\";";

    @Produces
    @Identifier("my-configuration-1")
    @Singleton
    public Map<String, Object> config1() {
        final Map<String, Object> config = createConfig("clusterUrl_1", "clusterApiKey_1", "clusterApiSecret_1");
        log.info("Created kafka config 1");
        return config;
    }

    @Produces
    @Identifier("my-configuration-2")
    @Singleton
    public Map<String, Object> config2() {
        final Map<String, Object> config = createConfig("clusterUrl_2", "clusterApiKey_2", "clusterApiSecret_2");
        log.info("Created kafka config 2");
        return config;
    }  

    private Map<String, Object> createConfig(final String clusterUrl, final String apiKey, final String apiSecret) {
        final HashMap<String, Object> config = new HashMap<>(4);

        config.put("bootstrap.servers", clusterUrl);
        config.put("security.protocol", "SASL_SSL");
        config.put("sasl.mechanism", "PLAIN");
        config.put("ssl.endpoint.identification.algorithm", "https");
        config.put("sasl.jaas.config", String.format(SASL_CONFIGURATION_STRING, apiKey, apiSecret));

        return config;
    }
}

Does somebody have a hint for me what's missing in the configuration so that no events are consumed from by brokers?

ozangunalp commented 6 months ago

If you can prepare a minimal reproducer I can take a look, even with both channels connected to the same broker.

cescoffier commented 5 months ago

Yes. We need a reproducer.

duckdeer commented 5 months ago

I've created a little example and here everything works as expected. Started 2 Kafka brokers in a docker environment and was able to consume from both without problems.

Hence I expect that the issue is somewhere on the Kafka side. I'll do further analysis here.

From my point of view this issue can be closed.