microsoft / CSharpClient-for-Kafka

.Net implementation of the Apache Kafka Protocol that provides basic functionality through Producer/Consumer classes. The project also offers balanced consumer implementation.
Other
463 stars 95 forks source link

Consumer method is not work #35

Open vuralmehmett opened 8 years ago

vuralmehmett commented 8 years ago

I'm using version 0.9.0. I can send message to queue successfully but i can not read message.

this method;

var managerConfig = new KafkaSimpleManagerConfiguration() { FetchSize = FetchSize, BufferSize = BufferSize, Zookeeper = m_zookeeper }; m_consumerManager = new KafkaSimpleManager<int, Kafka.Client.Messages.Message>(managerConfig); // get all available partitions for a topic through the manager var allPartitions = m_consumerManager.GetTopicPartitionsFromZK(m_topic); // Refresh metadata and grab a consumer for desired partitions m_consumerManager.RefreshMetadata(0, m_consumerId, 0, m_topic, true); var partitionConsumer = m_consumerManager.GetConsumer(m_topic, partitionId);

or this;

ConsumerConfiguration config = new ConsumerConfiguration { AutoCommit = false, GroupId = consumerGroupId ConsumerId = uniqueConsumerId MaxFetchBufferLength = m_BufferMaxNoOfMessages, FetchSize = fetchSize, AutoOffsetReset = OffsetRequest.LargestTime, NumberOfTries = 20, ZooKeeper = new ZooKeeperConfiguration(zookeeperString, 30000, 30000, 2000) }; var balancedConsumer = new ZookeeperConsumerConnector(config, true, m_ConsumerRebalanceHandler, m_ZKDisconnectHandler, m_ZKExpireHandler); // grab streams for desired topics var streams = m_ZooKeeperConsumerConnector.CreateMessageStreams(m_TopicMap, new DefaultDecoder()); var KafkaMessageStream = streams[m_Topic][0]; // start consuming stream foreach (Message message in m_KafkaMessageStream.GetCancellable(cancellationTokenSource.Token))

are not work.

akarthick commented 8 years ago

With 0.9.0 Kafka version looks like for producing the messages ClientId is mandated and I had to pass the ClientId explicitly to get the message produced successfully. Otherwise I was not getting any exception during the Send call but the message was not produced to Kafka.

Secondly with 0.9.0 Kafka version as mentioned by cehhaydun, the consumer code is not working. Both simple and balanced. Please provide any help as soon as possible!

akarthick commented 8 years ago

I enabled the kafkaNet.library logging and following is the exception which I am able to see multiple times. Exception: 2016-03-29 17:40:51,236 [8] ERROR ZKRebalancerListener [(null)] - error in EarliestOrLatestOffset() : Exception Message: Unable to read beyond the end of the stream. Source: mscorlib Stack Trace: at System.IO.BinaryReader.FillBuffer(Int32 numBytes) at System.IO.BinaryReader.ReadInt32() at Kafka.Client.Serialization.KafkaBinaryReader.ReadInt32() in Microsoft KafkaNet\Kafkanet-master\src\KafkaNET.Library\Serialization\KafkaBinaryReader.cs:line 84 at Kafka.Client.Consumers.OffsetResponse.Parser.ParseFrom(KafkaBinaryReader reader) in Microsoft KafkaNet\Kafkanet-master\src\KafkaNET.Library\Consumers\OffsetResponse.cs:line 52 at Kafka.Client.KafkaConnection.Handle[T](Byte[] data, IResponseParser1 parser, Boolean shouldParse) in Microsoft KafkaNet\Kafkanet-master\src\KafkaNET.Library\KafkaConnection.cs:line 313 at Kafka.Client.KafkaConnection.Send(OffsetRequest request) in Microsoft KafkaNet\Kafkanet-master\src\KafkaNET.Library\KafkaConnection.cs:line 156 at Kafka.Client.Consumers.Consumer.GetOffsetsBefore(OffsetRequest request) in Microsoft KafkaNet\Kafkanet-master\src\KafkaNET.Library\Consumers\Consumer.cs:line 228 at Kafka.Client.ZooKeeperIntegration.Listeners.ZKRebalancerListener1.EarliestOrLatestOffset(String topic, Int32 brokerId, Int32 partitionId, Int64 earliestIoLatest) in Microsoft KafkaNet\Kafkanet-master\src\KafkaNET.Library\ZooKeeperIntegration\Listeners\ZKRebalancerListener.cs:line 557