KxSystems / kafka

kdb+ to Apache Kafka adapter, for pub/sub
https://code.kx.com/q/interfaces
Apache License 2.0
50 stars 30 forks source link

Consumer group reassignment of offset not working #59

Open SrikarSaggurthi opened 4 years ago

SrikarSaggurthi commented 4 years ago

Describe the bug For an existing consumer group with stored offsets, resetting the offset to the latest is not working, it still reads from the stored offset.

To Reproduce 1) First, have to set up a consumer group (say test) and run for a few messages and commit those offsets. .kfk.Sub[client;topicName;(enlist 6h$1)!(enlist .kfk.OFFSET.STORED)];

2) Now restart the process and assign a specific partition and latest offset with same group test .kfk.Sub[client;topicName;(enlist 6h$1)!(enlist .kfk.OFFSET.END)];`

Expected behavior It should only start reading if there are new messages in `topicName, however it starts reading from the last committed offset.

Desktop (please complete the following information):

Additional context relevant config parmater "auto.offset.reset:latest"

mshimizu-kx commented 3 years ago

The partition is ignored at subscribe as described here.

Maybe you can assign a new offset for the consumer with .kfk.assignOffsets or .kafka.assignNewOffsetsToTopicPartition.

.kafka.subscribe[consumer;topic1];
.kafka.subscribe[consumer;topic2];
.kafka.assignNewOffsetsToTopicPartition[consumer; ; (1#0i)!1#.kafka.OFFSET_END] each (topic1; topic2);

You can check the behavior with the test_consumer.q example.