Closed kupiku closed 8 years ago
Sorry, I forgot. I'm using kafka 0.8.2.2 server
Hi @kipiku. I've actually raised the same issue in #27. I think this really needs to be fixed too… On Wed, 6 Apr 2016 at 18:21, kupiku notifications@github.com wrote:
Sorry, I forgot. I'm using kafka 0.8.2.2 server
— You are receiving this because you are subscribed to this thread.
Reply to this email directly or view it on GitHub https://github.com/Microsoft/Kafkanet/issues/47#issuecomment-206214085
Cheers, Ducas
I'm not realy sure it is the same problem, it is definatly related, and maybe the fix applies both issues.
I am ONLY using .Net consumer, and I am worried about this exception I get:
ERROR [8] consumed offset: 34 doesn't match fetch offset: 35 for topic:0: fetched offset = 35: consumed offset = 34; consumer may lose data.
And sometimes it gets more gap between feteched and commited offset:
ERROR [8] consumed offset: 83 doesn't match fetch offset: 84 for SIEMLogs:0: fetched offset = 87: consumed offset = 83; consumer may lose data
ERROR [8] consumed offset: 76 doesn't match fetch offset: 77 for SIEMLogs:0: fetched offset = 81: consumed offset = 76; consumer may lose data
Interesting… is this a balanced consumer? are there other consumers active in the same group? Are rebalances happening around the time? Is this just when starting the consumer or in the middle of operation?
I see the first kind of error where it's off by 1 a lot when starting the consumer. I haven't seen the others yet… I saw another PR relating to consumers losing messages on rebalance. Could be related to that… On Thu, 7 Apr 2016 at 18:10, kupiku notifications@github.com wrote:
I'm not realy sure it is the same problem, it is definatly related, and maybe the fix applies both issues.
I am ONLY using .Net consumer, and I am worried about this exception I get:
ERROR [8] consumed offset: 34 doesn't match fetch offset: 35 for topic:0: fetched offset = 35: consumed offset = 34; consumer may lose data.
And sometimes it gets more gap between feteched and commited offset:
ERROR [8] consumed offset: 83 doesn't match fetch offset: 84 for SIEMLogs:0: fetched offset = 87: consumed offset = 83; consumer may lose data
ERROR [8] consumed offset: 76 doesn't match fetch offset: 77 for SIEMLogs:0: fetched offset = 81: consumed offset = 76; consumer may lose data
— You are receiving this because you commented. Reply to this email directly or view it on GitHub https://github.com/Microsoft/Kafkanet/issues/47#issuecomment-206750534
Cheers, Ducas
Here is my scenario:
A topic with only one partition.
3 consumer groups:
I get that nasty exception for all consumer groups.
Hi @kupiku I have fixed the "consumer may lose data" logging with PR https://github.com/Microsoft/CSharpClient-for-Kafka/pull/46 [closed now]. Please give it a try when you can.
Thank you.
Indeed I will give it a try
/**/ FIRMA Tu amigo y vecino, miguel
/**/
2016-04-19 21:04 GMT+02:00 Soumyajit Sahu notifications@github.com:
Hi @kupiku https://github.com/kupiku I have fixed the "consumer may lose data" logging with PR #46 https://github.com/Microsoft/CSharpClient-for-Kafka/pull/46 [closed now]. Please give it a try when you can.
— You are receiving this because you were mentioned. Reply to this email directly or view it on GitHub https://github.com/Microsoft/CSharpClient-for-Kafka/issues/47#issuecomment-212071238
Consuming messages from one topic does not manage lag propertly. When consumed all messages, I get Lag=1 for consumer group (using native consumerCheckeroffset tool) and when I produce new messages i get this exception:
ERROR [8] consumed offset: 34 doesn't match fetch offset: 35 for topic:0: fetched offset = 35: consumed offset = 34; consumer may lose data
Could it be, in ConsumerIterator.cs ( MakeNext function)? MessageAndOffset item = current.Current; consumedOffset = item.MessageOffset;
I saw in other net libraries (like ExactTargetDev/kafka-net) that they use next offset to commit offset:
this.consumedOffset = item.NextOffset;
here is my code : // CONSTRUCTOR public KafkaSimpleConsumer(ConsumerConfiguration configuration, string topic) { try { zkConsumerConnector = new ZookeeperConsumerConnector(configuration, true); consumerStreams = zkConsumerConnector.CreateMessageStreams(new Dictionary<string, int> { { topic, 1 } }, new DefaultDecoder()); streams = consumerStreams[topic]; it = streams[0].GetEnumerator(); } catch (Exception ex) { throw ex; }
// FUNCTION FOR READING public Message GetNextKafkaMsg() { try { if (it.MoveNext()) return it.Current; else return null; } catch (ConsumerTimeoutException) {
it = streams[0].GetEnumerator();
return null; } }