ntent / kafka4net

C# client for Kafka
Apache License 2.0
52 stars 32 forks source link

How to start consuming with a specific offset. #21

Closed avoxm closed 8 years ago

avoxm commented 9 years ago

It looks like it is quite straight forward to consumer message from "start" or from the "end" like this

            var startPosition = new StartPositionTopicStart();
            var consumer = new Consumer(new ConsumerConfiguration(_seedAddresses, topic, startPosition));

            var consumerSubscription = consumer.OnMessageArrived.
                Subscribe(msg =>
                {
                    Console.WriteLine("Received : {0}", Encoding.UTF8.GetString(msg.Value));
                });

            consumer.IsConnected.Wait();

Could you please share a code that shows how to consume from a specific offset.

Here is a sample scenario : Run a process that starts consuming new messages and before recycling stores the offset. Run the process again and start consuming from stored offset.

vchekan commented 9 years ago

Sure, there a unit test called ExplicitOffsets() https://github.com/ntent-ad/kafka4net/blob/475e1b5a8b400fa242ac549e5e8b07def2a1eb12/tests/RecoveryTest.cs#L984

avoxm commented 9 years ago

@vchekan thanks for the reference. Actually I have already tried it, for some reason it is throwing kafka4net.PartitionFailedException

Here's my code :

            var offsetFetchCluster = new Cluster(_seedAddresses);
            offsetFetchCluster.ConnectAsync().Wait();

            Task.Delay(TimeSpan.FromSeconds(1)).Wait();

            var offsets = new TopicPartitionOffsets(
                                topic, (offsetFetchCluster.FetchPartitionOffsetsAsync(topic, ConsumerLocation.TopicEnd).Result)
                                        .GetPartitionsOffset.Select(kv => new KeyValuePair<int, long>(kv.Key, kv.Value - 300)));

            var consumer = new Consumer(new ConsumerConfiguration(_seedAddresses, topic, offsets));

            var consumerSubscription = consumer.OnMessageArrived.
                Subscribe(msg =>
                {
                    Console.WriteLine("Received : {0}", Encoding.UTF8.GetString(msg.Value));
                });

            consumer.IsConnected.Wait();
vchekan commented 9 years ago
offsetFetchCluster.FetchPartitionOffsetsAsync
...
new KeyValuePair<int, long>(kv.Key, kv.Value - 300)

What this snippet does, it starts from tail-300 position. Is it possible, that you have less than 300 items in your partition(s) and offset comes out to be negative? In the unit test the topic is filled out and than offset is tested, so it is guaranteed that resulting offset is positive. Is it your case? If you store offsets outside and want to restart from those positions, you might want to:

var offsets = LoadMyOffsetsFromSqlForExample().
    Select(_ => new KeyValuePair<int,long>(_.Item1, _.Item2));
var kafkaOffsets = new TopicPartitionOffsets(topic, offsets);
avoxm commented 9 years ago

Hmmmm... It might be the case since the message says : "partition 2 failed with error code OffsetOutOfRange", but this is bizarre since when I run with StartPositionTopicStart it receives thousands of messages. And here is what I see in offsets "2|11000142,4|5000163,0|5000126,1|11987920,3|12000139"

So apparently there are messages...

vchekan commented 9 years ago

StartPositionTopicStart this is where your partition starts. You might want to print out where it ends and which position you do request to make sure you are within existing range.

As time goes, kafka truncates older partition segments. In your case, partition 2 starts at offset 11000142 and everything before this offset is gone.

You might be willing to implement some guard code for case when your client was down for 1 week for example, and its offset does not exist anymore. Depending on business logic, you might start from the earliest existing position, or terminate with fatal status if it is mission critical to process every message and skips are not allowed.

avoxm commented 9 years ago

You are right... Apparently I have messages only in one partition here is what I have Start : 2|11000143,4|5000164,0|5000127,1|11736199,3|12000140 End : 2|11000143,4|5000164,0|5000127,1|11987921,3|12000140

I guess I will need to check that my start offset per each partition is not bigger than my stored value.

Thanks a lot for your help Vadim !