KxSystems / kafka

kdb+ to Apache Kafka adapter, for pub/sub
https://code.kx.com/q/interfaces
Apache License 2.0
50 stars 30 forks source link

Stale consumers after commit offsets #76

Open cmccarthy1 opened 3 years ago

cmccarthy1 commented 3 years ago

Internally raised issue

Describe the bug Committing an offset as a member of a consumer group during a group rebalance event for that group can cause the consumer to become stale thus resulting in the consumer no longer receiving messages

To Reproduce The following scripts can be used to reproduce the issue (Note that the localhost/port need to be added in accordance with your kafka installation)

cat stale_con.q
//load kafka
OFFSET_LOG:() ; MSGS:()
\c 5000 5000
commit:{ .kfk.CommitOffsets[0i;`test1;;1b] exec partition!offset from MSGS where offset = (max;offset)fby partition ; `COMMITED set .z.p ;  }
.kfk.offsetcb: {[cid;err;offsets] if[not err like "Success" ; 0N!"offsetcb not success" ; OFFSET_LOG,:(cid;err;offsets) ; `commit set { } ]; }
.kfk.consumecb:{ x[`rcvtime]:.z.p ; MSGS,:: enlist x _ `data  ; `MSG set x  }
cfg:(!) . flip(
  (`metadata.broker.list;`$"localhost:port");
  (`bootstrap.servers;`$"localhost:port");
  (`group.id;`$"test_consumer_group_1");
  (`enable.auto.commit;`false);
  (`enable.auto.offset.store;`false);
  (`auto.offset.reset;`latest);
  (`session.timeout.ms;`60000);
  );
.kfk.Consumer cfg
.kfk.Sub[0i;`test1;enlist[.kfk.PARTITION_UA]!enlist[.kfk.OFFSET.END] ]
cat other_cons.q
OFFSET_LOG:() ; MSGS:()
\c 5000 5000
system"sleep 2"
commit:{ .kfk.CommitOffsets[0i;`test1;;1b] exec partition!offset from MSGS where offset = (max;offset)fby partition ; `COMMITED set .z.p ;  }
.kfk.offsetcb: {[cid;err;offsets] if[not err like "Success" ; 0N!"offsetcb not success" ; OFFSET_LOG,:(cid;err;offsets) ]; }
.kfk.consumecb:{ x[`rcvtime]:.z.p ; MSGS,:: enlist x _ `data  ; `MSG set x ; }
cfg:(!) . flip(
  (`metadata.broker.list;`$"localhost:port");
  (`bootstrap.servers;`$"localhost:port");
  (`group.id;`$"test_consumer_group_1");
  (`enable.auto.commit;`false);
  (`enable.auto.offset.store;`false);
  (`auto.offset.reset;`latest);
  (`session.timeout.ms;`60000);
  );
clients:{ .kfk.Consumer cfg } each til 10
{ .kfk.Sub[x;`test1;enlist[.kfk.PARTITION_UA]!enlist[.kfk.OFFSET.END] ] } each clients

Steps to reproduce:

  1. Have a process producing on the topic `test1
  2. start stale_con.q
  3. start other_cons.q once the stale one is up and running
  4. manually run commit[] on stale_con.q process in quick succession.
  5. If “offsetcb not success” is not seen then restart the other_cons.q process and try again
  6. After receiving the "Offset commit failed - Specified group generation id is not valid" from offsetcb the consumer won't consume any more messages.

Expected behavior If offset commit is unsuccessful the consumer should be able to retry commit or configuration should be set to allow this

Desktop (please complete the following information):


q).kfk.VersionSym[]
`1.4.2
Kdb: 4.0 2020.10.02
Kx kafka release:  v1.4.0```
**Additional context**
Add any other context about the problem here.
mshimizu-kx commented 3 years ago

To me this happens regardless of executing commit[]. Even launching another stale_con.q stops original process to receive message. Is there any necessary config on producer side or broker?

mshimizu-kx commented 3 years ago

The issue cannot be reproduced due to another problem. The steps are below:

  1. Start a process producing on the topic `test1with examples/test_producer.q
  2. start stale_con.q
  3. start other_cons.q once the stale one is up and running
  4. Regardless of executing commit function the process of stale_con.q ceases to receive messages. Once processes with others_con.q disappear, the lone process resumes to receive messages.