confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
45 stars 857 forks source link

Cannot connect to IBM event streams (IBM's managed Kafka) #1241

Closed jayasurya-jeyakodi closed 4 years ago

jayasurya-jeyakodi commented 4 years ago

Description

I have posted the question in detail here. https://stackoverflow.com/questions/61094399/cannot-connect-to-apache-kafka-with-confluent-kafka-hosted-on-ibm

I saw from IBM docs https://cloud.ibm.com/docs/services/EventStreams?topic=eventstreams-kafka_using that following are the requirements for client

  1. Supports Kafka 0.10, or later
  2. Can connect and authenticate using SASL PLAIN with TLSv1.2
  3. Supports the SNI extensions for TLS where the server's hostname is includes in the TLS handshake
  4. Supports elliptic curve cryptography

I am not sure if the Kafka client(Confluent.Kafa-1.4.0) am using satisfies the above conditions. How to reproduce

  1. Create a very simple console application
  2. Create a IBM event stream workload in lite plan.
  3. Take service credentials from Ibm website.
  4. Pass these values to kafka client.

Checklist

Please provide the following information:

mhowlett commented 4 years ago

it looks like there's a problem with the API version request (there shouldn't be though if this is kafka 2.2). Maybe try setting ApiVersionRequest to false and BrokerVersionFallback to 0.10.0.

jayasurya-jeyakodi commented 4 years ago

Nope i still get the same error

Protocol parse failure for ApiVersion v3 at 3/276 (rd_kafka_handle_ApiVersion:1911) (incorrect broker.version.fallback?)
 ApiArrayCnt -1 out of range
failed: err: Local: Broker transport failure: (errno: Undefined error: 0)
 Received ApiVersionResponse (v3, 276 bytes, CorrId 2, rtt 263.11ms)
mhowlett commented 4 years ago

are you experiencing this problem with v1.3.0? maybe this is somehow related to KIP-511.

jayasurya-jeyakodi commented 4 years ago

I am assuming you mean Confluent.Kafka nuget version 1.3.0. Let me try downgrading and see if the error goes. But do you know if the client supports

mhowlett commented 4 years ago

I have no idea about the latter two without looking it up. the error you're seeing looks specific to the kip-511 additions at a quick glance of the code, which is why i suggested that.

jayasurya-jeyakodi commented 4 years ago

I am now getting System.MissingMethodException: Method not found: 'System.Threading.Tasks.Task1<Confluent.Kafka.DeliveryResult2<!0,!1>> Confluent.Kafka.IProducer2.ProduceAsync(System.String, Confluent.Kafka.Message2<!0,!1>)'.

mhowlett commented 4 years ago

recompile

jayasurya-jeyakodi commented 4 years ago

Thats what i did before too (Clean & Rebuid). Let me clear assembly cache and try

jayasurya-jeyakodi commented 4 years ago

@mhowlett Thanks for your help! Downgrading the nuget fixed the issue. But is there any way i can use the latest version? or is it bug in the latest version

edenhill commented 4 years ago

Protocol parse failure for ApiVersion v3 at 3/276 (rd_kafka_handle_ApiVersion:1911) (incorrect broker.version.fallback?) ApiArrayCnt -1 out of range

librdkafka will attempt the highest ApiVersion it supports (v3), if the broker supports an older version then this parse error is expected and librdkafka will downgrade to a lesser version, so this all seems fine.

Can you provide full logs from a v1.4.0 client trying to connect to the broker with Debug: "protocol,broker,feature" ? Do not configure api.version.request, et.al.

killuazhu commented 4 years ago

I'm having a similar issue when using confluent_kafka python library. Everything is fine with 1.3.0, but once upgraded to 1.4.0. I'm starting to the same ApiArrayCnt -1 out of range error.

When enabling 'debug': 'protocol,broker,feature'

Broker changed state CONNECT -> APIVERSION_QUERY
Sent ApiVersionRequest (v3, 66 bytes @ 0, CorrId 7)
Received ApiVersionResponse (v3, 276 bytes, CorrId 7, rtt 23.33ms)
Protocol parse failure for ApiVersion v3 at 3/276 (rd_kafka_handle_ApiVersion:1911) (incorrect broker.version.fallback?)
ApiArrayCnt -1 out of range
failed: err: Local: Broker transport failure: (errno: Undefined error: 0)
Updated enabled protocol features -ApiVersion to 
Broker changed state APIVERSION_QUERY -> DOWN

Setting api.version.request to either False or True has no difference. I've left 'broker.version.fallback': '0.10.0'

edenhill commented 4 years ago

@killuazhu What broker version are you on?

edenhill commented 4 years ago

And do you see the parse failure without debugging enabled?

killuazhu commented 4 years ago

@edenhill here is the broker version when I run kafka-broker-api-versions.sh, do you see anything unusual?

node-1:9093 (id: 2 rack: wdc07) -> (
        Produce(0): 0 to 7 [usable: 7],
        Fetch(1): 0 to 11 [usable: 10],
        ListOffsets(2): 0 to 5 [usable: 5],
        Metadata(3): 0 to 8 [usable: 7],
        LeaderAndIsr(4): 0 to 2 [usable: 2],
        StopReplica(5): 0 to 1 [usable: 1],
        UpdateMetadata(6): 0 to 5 [usable: 5],
        ControlledShutdown(7): 0 to 2 [usable: 2],
        OffsetCommit(8): 0 to 7 [usable: 6],
        OffsetFetch(9): 0 to 5 [usable: 5],
        FindCoordinator(10): 0 to 2 [usable: 2],
        JoinGroup(11): 0 to 5 [usable: 4],
        Heartbeat(12): 0 to 3 [usable: 2],
        LeaveGroup(13): 0 to 2 [usable: 2],
        SyncGroup(14): 0 to 3 [usable: 2],
        DescribeGroups(15): 0 to 3 [usable: 2],
        ListGroups(16): 0 to 2 [usable: 2],
        SaslHandshake(17): 0 to 1 [usable: 1],
        ApiVersions(18): 0 to 2 [usable: 2],
        CreateTopics(19): 0 to 3 [usable: 3],
        DeleteTopics(20): 0 to 3 [usable: 3],
        DeleteRecords(21): 0 to 1 [usable: 1],
        InitProducerId(22): 0 to 1 [usable: 1],
        OffsetForLeaderEpoch(23): 0 to 3 [usable: 2],
        AddPartitionsToTxn(24): 0 to 1 [usable: 1],
        AddOffsetsToTxn(25): 0 to 1 [usable: 1],
        EndTxn(26): 0 to 1 [usable: 1],
        WriteTxnMarkers(27): 0 [usable: 0],
        TxnOffsetCommit(28): 0 to 2 [usable: 2],
        DescribeAcls(29): 0 to 1 [usable: 1],
        CreateAcls(30): 0 to 1 [usable: 1],
        DeleteAcls(31): 0 to 1 [usable: 1],
        DescribeConfigs(32): 0 to 2 [usable: 2],
        AlterConfigs(33): 0 to 1 [usable: 1],
        AlterReplicaLogDirs(34): 0 to 1 [usable: 1],
        DescribeLogDirs(35): 0 to 1 [usable: 1],
        SaslAuthenticate(36): 0 to 1 [usable: 1],
        CreatePartitions(37): 0 to 1 [usable: 1],
        CreateDelegationToken(38): 0 to 1 [usable: 1],
        RenewDelegationToken(39): 0 to 1 [usable: 1],
        ExpireDelegationToken(40): 0 to 1 [usable: 1],
        DescribeDelegationToken(41): 0 to 1 [usable: 1],
        DeleteGroups(42): 0 to 1 [usable: 1],
        ElectPreferredLeaders(43): 0 [usable: 0],
        UNKNOWN(44): 0
)
node-2:9093 (id: 1 rack: wdc04) -> (
        Produce(0): 0 to 7 [usable: 7],
        Fetch(1): 0 to 11 [usable: 10],
        ListOffsets(2): 0 to 5 [usable: 5],
        Metadata(3): 0 to 8 [usable: 7],
        LeaderAndIsr(4): 0 to 2 [usable: 2],
        StopReplica(5): 0 to 1 [usable: 1],
        UpdateMetadata(6): 0 to 5 [usable: 5],
        ControlledShutdown(7): 0 to 2 [usable: 2],
        OffsetCommit(8): 0 to 7 [usable: 6],
        OffsetFetch(9): 0 to 5 [usable: 5],
        FindCoordinator(10): 0 to 2 [usable: 2],
        JoinGroup(11): 0 to 5 [usable: 4],
        Heartbeat(12): 0 to 3 [usable: 2],
        LeaveGroup(13): 0 to 2 [usable: 2],
        SyncGroup(14): 0 to 3 [usable: 2],
        DescribeGroups(15): 0 to 3 [usable: 2],
        ListGroups(16): 0 to 2 [usable: 2],
        SaslHandshake(17): 0 to 1 [usable: 1],
        ApiVersions(18): 0 to 2 [usable: 2],
        CreateTopics(19): 0 to 3 [usable: 3],
        DeleteTopics(20): 0 to 3 [usable: 3],
        DeleteRecords(21): 0 to 1 [usable: 1],
        InitProducerId(22): 0 to 1 [usable: 1],
        OffsetForLeaderEpoch(23): 0 to 3 [usable: 2],
        AddPartitionsToTxn(24): 0 to 1 [usable: 1],
        AddOffsetsToTxn(25): 0 to 1 [usable: 1],
        EndTxn(26): 0 to 1 [usable: 1],
        WriteTxnMarkers(27): 0 [usable: 0],
        TxnOffsetCommit(28): 0 to 2 [usable: 2],
        DescribeAcls(29): 0 to 1 [usable: 1],
        CreateAcls(30): 0 to 1 [usable: 1],
        DeleteAcls(31): 0 to 1 [usable: 1],
        DescribeConfigs(32): 0 to 2 [usable: 2],
        AlterConfigs(33): 0 to 1 [usable: 1],
        AlterReplicaLogDirs(34): 0 to 1 [usable: 1],
        DescribeLogDirs(35): 0 to 1 [usable: 1],
        SaslAuthenticate(36): 0 to 1 [usable: 1],
        CreatePartitions(37): 0 to 1 [usable: 1],
        CreateDelegationToken(38): 0 to 1 [usable: 1],
        RenewDelegationToken(39): 0 to 1 [usable: 1],
        ExpireDelegationToken(40): 0 to 1 [usable: 1],
        DescribeDelegationToken(41): 0 to 1 [usable: 1],
        DeleteGroups(42): 0 to 1 [usable: 1],
        ElectPreferredLeaders(43): 0 [usable: 0],
        UNKNOWN(44): 0
)
node-3:9093 (id: 0 rack: wdc06) -> (
        Produce(0): 0 to 7 [usable: 7],
        Fetch(1): 0 to 11 [usable: 10],
        ListOffsets(2): 0 to 5 [usable: 5],
        Metadata(3): 0 to 8 [usable: 7],
        LeaderAndIsr(4): 0 to 2 [usable: 2],
        StopReplica(5): 0 to 1 [usable: 1],
        UpdateMetadata(6): 0 to 5 [usable: 5],
        ControlledShutdown(7): 0 to 2 [usable: 2],
        OffsetCommit(8): 0 to 7 [usable: 6],
        OffsetFetch(9): 0 to 5 [usable: 5],
        FindCoordinator(10): 0 to 2 [usable: 2],
        JoinGroup(11): 0 to 5 [usable: 4],
        Heartbeat(12): 0 to 3 [usable: 2],
        LeaveGroup(13): 0 to 2 [usable: 2],
        SyncGroup(14): 0 to 3 [usable: 2],
        DescribeGroups(15): 0 to 3 [usable: 2],
        ListGroups(16): 0 to 2 [usable: 2],
        SaslHandshake(17): 0 to 1 [usable: 1],
        ApiVersions(18): 0 to 2 [usable: 2],
        CreateTopics(19): 0 to 3 [usable: 3],
        DeleteTopics(20): 0 to 3 [usable: 3],
        DeleteRecords(21): 0 to 1 [usable: 1],
        InitProducerId(22): 0 to 1 [usable: 1],
        OffsetForLeaderEpoch(23): 0 to 3 [usable: 2],
        AddPartitionsToTxn(24): 0 to 1 [usable: 1],
        AddOffsetsToTxn(25): 0 to 1 [usable: 1],
        EndTxn(26): 0 to 1 [usable: 1],
        WriteTxnMarkers(27): 0 [usable: 0],
        TxnOffsetCommit(28): 0 to 2 [usable: 2],
        DescribeAcls(29): 0 to 1 [usable: 1],
        CreateAcls(30): 0 to 1 [usable: 1],
        DeleteAcls(31): 0 to 1 [usable: 1],
        DescribeConfigs(32): 0 to 2 [usable: 2],
        AlterConfigs(33): 0 to 1 [usable: 1],
        AlterReplicaLogDirs(34): 0 to 1 [usable: 1],
        DescribeLogDirs(35): 0 to 1 [usable: 1],
        SaslAuthenticate(36): 0 to 1 [usable: 1],
        CreatePartitions(37): 0 to 1 [usable: 1],
        CreateDelegationToken(38): 0 to 1 [usable: 1],
        RenewDelegationToken(39): 0 to 1 [usable: 1],
        ExpireDelegationToken(40): 0 to 1 [usable: 1],
        DescribeDelegationToken(41): 0 to 1 [usable: 1],
        DeleteGroups(42): 0 to 1 [usable: 1],
        ElectPreferredLeaders(43): 0 [usable: 0],
        UNKNOWN(44): 0
)

And do you see the parse failure without debugging enabled?

No, the parse failure did not appear when having debugging enabled.

I've also opened a ticket with IBM cloud event stream team, they mentioned they can reproduce and working on a fix, although did not give detail about what the fix is about.

jayasurya-jeyakodi commented 4 years ago

kafkaerror.txt @edenhill i am attaching the logs file when run with clien 1.4.0 and no apiversion set Here is my config

image

killuazhu commented 4 years ago

My issue has been resolved after IBM Cloud pushed out a fix. I'm able to use 1.4.0 to connect to IBM Event Stream properly.