microsoft / CSharpClient-for-Kafka

.Net implementation of the Apache Kafka Protocol that provides basic functionality through Producer/Consumer classes. The project also offers balanced consumer implementation.
Other
463 stars 95 forks source link

VersionId used in PartitionLeaderFinder when creating topic metadata request seems to be incorrect #90

Open akudhingra opened 5 years ago

akudhingra commented 5 years ago

In file: "CSharpClient-for-Kafka/src/KafkaNET.Library/Consumers/PartitionLeaderFinder.cs" on line 80 when making the creating a TopicMetadataRequest to get consumer metadata, a hard coded value of 1 is passed in as the versionId:

IEnumerable<TopicMetadata> metaData = consumer.GetMetaData(TopicMetadataRequest.Create(new[] { partition.Topic }, 1, 0, clientId));

Looking at other instances of VersionId used in other classes, seems like this should be set to 0.

Having it set to 1 causes the partition not finding a leader and results in the following exception, which fails silently:

2018-09-26 16:11:24.6465 [21] WARN: Kafka.Client.Consumers.PartitionLeaderFinder:Error retrieving meta data from broker 0: Exception Message: Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond. Source: System Stack Trace: at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size) at System.IO.BinaryReader.ReadBytes(Int32 count) at Kafka.Client.Utils.BitWorks.ReadShortString(KafkaBinaryReader reader, String encoding) at Kafka.Client.Cluster.Broker.ParseFrom(KafkaBinaryReader reader) at Kafka.Client.Requests.TopicMetadataRequest.Parser.ParseFrom(KafkaBinaryReader reader) at Kafka.Client.KafkaConnection.Handle[T](Byte[] data, IResponseParser1 parser, Boolean shouldParse) at Kafka.Client.KafkaConnection.Send(TopicMetadataRequest request) at Kafka.Client.Consumers.Consumer.GetMetaData(TopicMetadataRequest request) at Kafka.Client.Consumers.PartitionLeaderFinder.Start()

---- Inner Exception ---- Exception Message: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond Source: System Stack Trace: at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)

akudhingra commented 5 years ago

@edenhill / @soumyajit-sahu could you please take a look?

edenhill commented 5 years ago

As stated in the README, you should probably use https://github.com/confluentinc/confluent-kafka-dotnet

soumyajit-sahu commented 5 years ago

@aakarshit This library is no longer under development/maintenance. There would be many more changes required to make it work after changing the versionId (e.g. adding the TimeStamp and Throttling fields in response). Checkout the confluent-kafka-dotnet as mentioned by edenhill.