confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
90 stars 869 forks source link

Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration #2161

Open codex70 opened 11 months ago

codex70 commented 11 months ago

Description

OK, I'm new to C# and .net, but have a working implementation with a java client on my machine and am trying to help our .net developers connect to the kafka service.

Kafka is running inside a kubernetes cluster on a private network (for example 10.1.2.3:9092). When I connect from my workstation with a java client, I only have to specify the bootstrap server and everything else works without any issue. This means that the network configuration is fine and there are no firewall issues blocking the connection.

However I try to configure the client, either plaintext, ssl or sasl I get either the following error, or an issue with ssl handshake. In java it's not necessary to configure anything related to ssl or sasl.

%7|1703086748.514|MEMBERID|rdkafka#consumer-1| [thrd:app]: Group "test-consumer-group": updating member id "(not-set)" -> ""
%7|1703086748.518|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v2.3.0 (0x20300ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, SSL ZLIB SNAPPY ZSTD CURL SASL_SCRAM SASL_OAUTHBEARER PLUGINS HDRHISTOGRAM, debug 0x2504)
%7|1703086748.518|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "test-consumer-group" changed state init -> query-coord (join-state init)
%7|1703086748.518|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "test-consumer-group": no broker available for coordinator query: intervaled in state query-coord
%7|1703086748.527|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "test-consumer-group" received op SUBSCRIBE in state query-coord (join-state init)
%7|1703086748.527|SUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group "test-consumer-group": subscribe to new subscription of 1 topics (join-state init)
%7|1703086748.527|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "test-consumer-group": no broker available for coordinator query: intervaled in state query-coord
%6|1703086748.529|FAIL|rdkafka#consumer-1| [thrd:sasl_plaintext://10.1.2.3:9092/bootstrap]: sasl_plaintext://10.1.2.3:9092/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 1ms in state APIVERSION_QUERY)
%3|1703086748.529|ERROR|rdkafka#consumer-1| [thrd:sasl_plaintext://10.1.2.3:9092/bootstrap]: 1/1 brokers are down
%3|1703086748.530|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: sasl_plaintext://10.1.2.3:9092/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 1ms in state APIVERSION_QUERY)
%6|1703086748.737|FAIL|rdkafka#consumer-1| [thrd:sasl_plaintext://10.1.2.3:9092/bootstrap]: sasl_plaintext://10.1.2.3:9092/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)

How to reproduce

The following is the most simple implementation in .net

using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Connector_Simplified
{
    internal class Worker : BackgroundService
    {
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            await Task.Yield();
            ConsumerConfig config = new ConsumerConfig
            {
                BootstrapServers = "10.1.2.3:9094",
                GroupId = "test-consumer-group",
                ApiVersionRequest = true,
                AutoOffsetReset = AutoOffsetReset.Earliest,
                SecurityProtocol= SecurityProtocol.SaslPlaintext,
                Debug = "consumer, cgrp, topic, fetch",
                SaslMechanism=Confluent.Kafka.SaslMechanism.Gssapi,
//                SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.Https,
//                SecurityProtocol = SecurityProtocol.SaslSsl,
                SslCaLocation = "probe"
            };
            IConsumer<Ignore, string> consumer = new ConsumerBuilder<Ignore, string>(config).Build();
            consumer.Subscribe("test-topic");

            while (!stoppingToken.IsCancellationRequested)
            {
                ConsumeResult<Ignore, string> message = consumer.Consume(stoppingToken);
                Console.WriteLine($"Received message at {message.TopicPartitionOffset}: {message.Message.Value}");
            }
        }
    }
}

I have played with every possible combination of PlainText, Sasl and SSL that I can think of, nothing works.

Checklist

Please provide the following information:

anchitj commented 9 months ago

This means configuration issue. Are you using the correct SaslMechanism and SecurityProtocol which the broker supports?

codex70 commented 9 months ago

I'm fairly sure I am, I think I've tried everything just in case. We're now looking to reconfigure the broker to see if we can set up something that mutually works, but it needs to be done in conjunction with other systems that are currently working.