twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.8+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.84k stars 188 forks source link

When changing from SASL to TLS authentication, the latest offset is not stored in the consumer group #186

Closed ekeric13 closed 2 years ago

ekeric13 commented 2 years ago

So I am using franz-go via benthos. Specifically this module: https://www.benthos.dev/docs/components/inputs/kafka_franz/

One thing I noticed is if i changed my authentication method from sasl to tls, my consumer group sometimes does not remember the last offset and repeats the last message on the partition. I manually checked that the offset was stored before switching auth methods, nonetheless the stored offset for the consumer group changes back 1 when i switch.

I have reproduced the error going from sasl to tls, and sasl to tls. Not sure if it matters but i am using brokers via aws and the ports for tls and sasl are different. The mechanism of sasl i am using is IAM. The topic has a replication factor of 3 and min.insync.replicas is set to 2

Logs when I am using sasl:

qd_kafka_outbox_test_dev | {"@service":"benthos","level":"info","msg":"Running main config from specified file","path":"/queue-dispatcher/config.yaml","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"beginning autocommit loop","path":"root.input","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"Receiving messages from Kafka topics: [heartbeat]","path":"root.input","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"metadata update triggered","path":"root.input","time":"2022-08-03T00:39:14Z","why":"client initialization"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_deadletter_producer_avro","level":"info","msg":"Writing messages to Kafka topic: ${! meta(\"input_topic\") + env(\"DEADLETTER_TOPIC_SUFFIX\").or(\"-deadletter\") }","path":"root.output.switch.0.output.switch.0.output","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"","level":"info","msg":"Dropping messages.","path":"root.output.switch.0.output.switch.1.output","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_invalid_producer","level":"info","msg":"Writing messages to Kafka topic: ${! meta(\"input_topic\") + env(\"INVALID_TOPIC_SUFFIX\").or(\"-invalid\") }","path":"root.output.switch.1.output.switch.0.output","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"","level":"info","msg":"Dropping messages.","path":"root.output.switch.1.output.switch.1.output","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_deadletter_producer_http","level":"info","msg":"Writing messages to Kafka topic: ${! meta(\"input_topic\") + env(\"DEADLETTER_TOPIC_SUFFIX\").or(\"-deadletter\") }","path":"root.output.switch.2.output.fallback.1.switch.0.output","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"","level":"info","msg":"Dropping messages.","path":"root.output.switch.2.output.fallback.1.switch.1.output","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","level":"info","msg":"Launching a benthos instance, use CTRL+C to close","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"http_call","level":"info","msg":"Sending messages via HTTP requests to: http://api-gateway:8080/get_events/${! meta(\"EventName\") }","path":"root.output.switch.2.output.fallback.0","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","level":"info","msg":"Listening for HTTP requests at: http://0.0.0.0:4195","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"beginning to manage the group lifecycle","path":"root.input","time":"2022-08-03T00:39:15Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"joining group","path":"root.input","time":"2022-08-03T00:39:15Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","member_id":"kgo-86ed804b-7d9a-42ff-b519-e55187a1b9f3","msg":"join returned MemberIDRequired, rejoining with response's MemberID","path":"root.input","time":"2022-08-03T00:39:16Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","balance_protocol":"cooperative-sticky","generation":"26","group":"local-queue-dispatcher","instance_id":"\u003cnil\u003e","label":"kafka_topic_consumer","leader":"true","level":"info","member_id":"kgo-86ed804b-7d9a-42ff-b519-e55187a1b9f3","msg":"joined, balancing group","path":"root.input","time":"2022-08-03T00:39:18Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"balancing group as leader","path":"root.input","time":"2022-08-03T00:39:18Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","id":"kgo-86ed804b-7d9a-42ff-b519-e55187a1b9f3","interests":"interested topics: [heartbeat], previously owned: ","label":"kafka_topic_consumer","level":"info","msg":"balance group member","path":"root.input","time":"2022-08-03T00:39:18Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"balanced","path":"root.input","plan":"kgo-86ed804b-7d9a-42ff-b519-e55187a1b9f3{heartbeat[0 1 2 3 4 5 6 7]}","time":"2022-08-03T00:39:18Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"syncing","path":"root.input","protocol":"cooperative-sticky","protocol_type":"consumer","time":"2022-08-03T00:39:18Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","assigned":"heartbeat[0 1 2 3 4 5 6 7]","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"synced","path":"root.input","time":"2022-08-03T00:39:18Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","added":"heartbeat[0 1 2 3 4 5 6 7]","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","lost":"","msg":"new group session begun","path":"root.input","time":"2022-08-03T00:39:18Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"beginning heartbeat loop","path":"root.input","time":"2022-08-03T00:39:18Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","how":"assigning everything new, keeping current assignment","input":"heartbeat[0{23.34 0} 3{-2.-1 0} 5{-2.-1 0} 2{-2.-1 0} 6{-2.-1 0} 1{-2.-1 0} 7{-2.-1 0} 4{-2.-1 0}]","label":"kafka_topic_consumer","level":"info","msg":"assigning partitions","path":"root.input","time":"2022-08-03T00:39:19Z","why":"newly fetched offsets for group local-queue-dispatcher"}

qd_kafka_outbox_test_dev | {"@service":"benthos","content":{"heartbeat":6},"error":null,"label":"message_in_transit","level":"info","meta":{"EventId":"c5ab084c-c4c4-4fcc-abf7-fcc384881ad3","EventName":"heartbeat_v1","GeneratedAt":"2022-07-19T22:04:23.559063Z","OrderingGroupId":"3a7fe5d5-85f1-4d32-8ddb-bbe407b5adcf","Origin":"abff48392c1684bec55a659e007ee39b","consumer_group":"local-queue-dispatcher","id":"c5ab084c-c4c4-4fcc-abf7-fcc384881ad3","input_topic":"heartbeat","kafka_key":"3a7fe5d5-85f1-4d32-8ddb-bbe407b5adcf\n\n","kafka_offset":"23","kafka_partition":"0","kafka_timestamp_unix":"1659487672","kafka_topic":"heartbeat","old_root":"{\"heartbeat\":6}","team":"infra"},"msg":"","path":"root.pipeline.processors.2.switch.4.processors.0","time":"2022-08-03T00:47:52Z"}

Logs when I change the auth method to just tls:

qd_kafka_outbox_test_dev | {"@service":"benthos","level":"info","msg":"Running main config from specified file","path":"/queue-dispatcher/config.yaml","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"beginning autocommit loop","path":"root.input","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"Receiving messages from Kafka topics: [heartbeat]","path":"root.input","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"metadata update triggered","path":"root.input","time":"2022-08-03T00:48:35Z","why":"client initialization"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_deadletter_producer_avro","level":"info","msg":"Writing messages to Kafka topic: ${! meta(\"input_topic\") + env(\"DEADLETTER_TOPIC_SUFFIX\").or(\"-deadletter\") }","path":"root.output.switch.0.output.switch.0.output","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"","level":"info","msg":"Dropping messages.","path":"root.output.switch.0.output.switch.1.output","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_invalid_producer","level":"info","msg":"Writing messages to Kafka topic: ${! meta(\"input_topic\") + env(\"INVALID_TOPIC_SUFFIX\").or(\"-invalid\") }","path":"root.output.switch.1.output.switch.0.output","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"","level":"info","msg":"Dropping messages.","path":"root.output.switch.1.output.switch.1.output","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_deadletter_producer_http","level":"info","msg":"Writing messages to Kafka topic: ${! meta(\"input_topic\") + env(\"DEADLETTER_TOPIC_SUFFIX\").or(\"-deadletter\") }","path":"root.output.switch.2.output.fallback.1.switch.0.output","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"","level":"info","msg":"Dropping messages.","path":"root.output.switch.2.output.fallback.1.switch.1.output","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","level":"info","msg":"Launching a benthos instance, use CTRL+C to close","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","level":"info","msg":"Listening for HTTP requests at: http://0.0.0.0:4195","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"http_call","level":"info","msg":"Sending messages via HTTP requests to: http://api-gateway:8080/get_events/${! meta(\"EventName\") }","path":"root.output.switch.2.output.fallback.0","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"beginning to manage the group lifecycle","path":"root.input","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"joining group","path":"root.input","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","member_id":"kgo-cd7b956d-a823-4de6-aee4-c51f13656424","msg":"join returned MemberIDRequired, rejoining with response's MemberID","path":"root.input","time":"2022-08-03T00:48:36Z"
qd_kafka_outbox_test_dev | {"@service":"benthos","balance_protocol":"cooperative-sticky","generation":"27","group":"local-queue-dispatcher","instance_id":"\u003cnil\u003e","label":"kafka_topic_consumer","leader":"true","level":"info","member_id":"kgo-cd7b956d-a823-4de6-aee4-c51f13656424","msg":"joined, balancing group","path":"root.input","time":"2022-08-03T00:48:47Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"balancing group as leader","path":"root.input","time":"2022-08-03T00:48:47Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","id":"kgo-cd7b956d-a823-4de6-aee4-c51f13656424","interests":"interested topics: [heartbeat], previously owned: ","label":"kafka_topic_consumer","level":"info","msg":"balance group member","path":"root.input","time":"2022-08-03T00:48:47Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"balanced","path":"root.input","plan":"kgo-cd7b956d-a823-4de6-aee4-c51f13656424{heartbeat[0 1 2 3 4 5 6 7]}","time":"2022-08-03T00:48:47Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"syncing","path":"root.input","protocol":"cooperative-sticky","protocol_type":"consumer","time":"2022-08-03T00:48:47Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","assigned":"heartbeat[0 1 2 3 4 5 6 7]","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"synced","path":"root.input","time":"2022-08-03T00:48:47Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","added":"heartbeat[0 1 2 3 4 5 6 7]","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","lost":"","msg":"new group session begun","path":"root.input","time":"2022-08-03T00:48:47Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"beginning heartbeat loop","path":"root.input","time":"2022-08-03T00:48:47Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","how":"assigning everything new, keeping current assignment","input":"heartbeat[6{-2.-1 0} 1{-2.-1 0} 7{-2.-1 0} 4{-2.-1 0} 0{23.34 0} 3{-2.-1 0} 5{-2.-1 0} 2{-2.-1 0}]","label":"kafka_topic_consumer","level":"info","msg":"assigning partitions","path":"root.input","time":"2022-08-03T00:48:47Z","why":"newly fetched offsets for group local-queue-dispatcher"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"metadata update triggered","path":"root.input","time":"2022-08-03T00:48:47Z","why":"loading offsets in new session from assign"}

qd_kafka_outbox_test_dev | {"@service":"benthos","content":{"heartbeat":6},"error":null,"label":"message_in_transit","level":"info","meta":{"EventId":"c5ab084c-c4c4-4fcc-abf7-fcc384881ad3","EventName":"heartbeat_v1","GeneratedAt":"2022-07-19T22:04:23.559063Z","OrderingGroupId":"3a7fe5d5-85f1-4d32-8ddb-bbe407b5adcf","Origin":"abff48392c1684bec55a659e007ee39b","consumer_group":"local-queue-dispatcher","id":"c5ab084c-c4c4-4fcc-abf7-fcc384881ad3","input_topic":"heartbeat","kafka_key":"3a7fe5d5-85f1-4d32-8ddb-bbe407b5adcf\n\n","kafka_offset":"23","kafka_partition":"0","kafka_timestamp_unix":"1659487672","kafka_topic":"heartbeat","old_root":"{\"heartbeat\":6}","team":"infra"},"msg":"","path":"root.pipeline.processors.2.switch.4.processors.0","time":"2022-08-03T00:48:53Z"}

Logs that stuck out:

qd_kafka_outbox_test_dev | {"@service":"benthos","how":"assigning everything new, keeping current assignment","input":"heartbeat[0{23.34 0} 3{-2.-1 0} 5{-2.-1 0} 2{-2.-1 0} 6{-2.-1 0} 1{-2.-1 0} 7{-2.-1 0} 4{-2.-1 0}]","label":"kafka_topic_consumer","level":"info","msg":"assigning partitions","path":"root.input","time":"2022-08-03T00:39:19Z","why":"newly fetched offsets for group local-queue-dispatcher"}
qd_kafka_outbox_test_dev | {"@service":"benthos","how":"assigning everything new, keeping current assignment","input":"heartbeat[6{-2.-1 0} 1{-2.-1 0} 7{-2.-1 0} 4{-2.-1 0} 0{23.34 0} 3{-2.-1 0} 5{-2.-1 0} 2{-2.-1 0}]","label":"kafka_topic_consumer","level":"info","msg":"assigning partitions","path":"root.input","time":"2022-08-03T00:48:47Z","why":"newly fetched offsets for group local-queue-dispatcher"}

Not sure if anything sticks out in the logs that makes you think the franz-go client is ill-configured? or if there is a bug with franz-go itself...

twmb commented 2 years ago

How easy is this to reproduce?

Also, the benthos version pin for franz-go is pretty out of date, is it possible for you to update it locally to v1.6.0 and see if this is reproduced?

ekeric13 commented 2 years ago

I want to try and reproduce it outside of benthos for sure. And yeah will see if i can bump up the version. Was able to reproduce it like 15% of the time. It's possible that the auth method had nothing to do with it and it would have happened even if i restarted it, but wasn't able to ever reproduce it when I DIDN'T change the auth method, so it felt like that was at fault.

ekeric13 commented 2 years ago

Trying to bump the version and I have hit a some errors. Is there a migration guide?

One error revolved around how PollFetches now returns an error and I am confident that I have correctly handled that.

Another "error" I am getting is heartbeat errored, but it seems to have a logLevel of info. Is this not really an "error"?

The error that is really tripping me up is this though:

Kafka replied to our OffsetCommitRequest incorrectly! Num topics in request: 1, in reply: 0, we cannot handle this!

It is coming from this section of the code: https://github.com/benthosdev/benthos/blob/main/internal/impl/kafka/input_kafka_franz.go#L297-L322

Specifically I believe CommitOffsetsSync does not like:

uncommitted map[string]map[int32]EpochOffset,

Was there a behavior change that I am missing? It doesn't seem like uncommitted has changed but in the new version the number of topics in the request > num topics in the response... which is also a bit confusing in itself.

twmb commented 2 years ago

What's the error you are receiving from PollFetches? The main behavior change is in 1.5.1 which injects the context.Canceled error if the context is canceled.

w.r.t. heartbeat error, is there any more information to that error? I think this is just a signal that rebalancing is occurring, most of the time.

What broker implementation are you talking to? This code issued an OffsetCommitRequest and the broker did not reply to the topics in that request. That's odd. Would it be possible to spew.Dump / log the full Request and Response in that callback? Sorry for not being able to help much with this one without more information. Also, the comment above is no longer true -- BlockRebalanceOnPoll exists to block polling / more directly control rebalancing.

ekeric13 commented 2 years ago

What's the error you are receiving from PollFetches?

Yeah the fix was to handle the context.DeadlineExceeded error, which occurred when there as no records to be fetched. So sounds related to the behavior change in 1.5.1.

heartbeat error

It occurs multiple times. I can show you some logs around it the first two times it appears for more context:

qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"trace","msg":"heartbeating","path":"root.input.broker.inputs.0","time":"2022-08-10T00:05:22Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","broker":"1","bytes_written":"88","err":"\u003cnil\u003e","label":"kafka_topic_consumer","level":"trace","msg":"wrote Heartbeat v4","path":"root.input.broker.inputs.0","time":"2022-08-10T00:05:22Z","time_to_write":"194µs","write_wait":"346.1µs"}
qd_kafka_outbox_test_dev | {"@service":"benthos","broker":"1","bytes_read":"16","err":"\u003cnil\u003e","label":"kafka_topic_consumer","level":"trace","msg":"read Heartbeat v4","path":"root.input.broker.inputs.0","read_wait":"362.9µs","time":"2022-08-10T00:05:22Z","time_to_read":"1.0555ms"}
qd_kafka_outbox_test_dev | {"@service":"benthos","err":"REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"trace","msg":"heartbeat complete","path":"root.input.broker.inputs.0","time":"2022-08-10T00:05:22Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","err":"REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"heartbeat errored","path":"root.input.broker.inputs.0","time":"2022-08-10T00:05:22Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"cooperative consumer calling onRevoke at the end of a session even though no partitions were lost","path":"root.input.broker.inputs.0","time":"2022-08-10T00:05:22Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"trace","msg":"entering OnPartitionsRevoked","path":"root.input.broker.inputs.0","time":"2022-08-10T00:05:22Z","with":"map[]"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"joining group","path":"root.input.broker.inputs.0","time":"2022-08-10T00:05:22Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","broker":"1","bytes_written":"278","err":"\u003cnil\u003e","label":"kafka_topic_consumer","level":"trace","msg":"wrote JoinGroup v7","path":"root.input.broker.inputs.0","time":"2022-08-10T00:05:22Z","time_to_write":"161.9µs","write_wait":"200.9µs"}
qd_kafka_outbox_test_dev | {"@service":"benthos","broker":"1","bytes_read":"131","err":"\u003cnil\u003e","label":"kafka_topic_consumer","level":"trace","msg":"read JoinGroup v7","path":"root.input.broker.inputs.1","read_wait":"294.8µs","time":"2022-08-10T00:05:22Z","time_to_read":"492.6336ms"}
qd_kafka_outbox_test_dev | {"@service":"benthos","addr":"broker:29092","broker":"1","label":"kafka_topic_consumer","level":"trace","msg":"connection initialized successfully","path":"root.input.broker.inputs.1","time":"2022-08-10T00:05:22Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","broker":"1","bytes_written":"130","err":"\u003cnil\u003e","label":"kafka_topic_consumer","level":"trace","msg":"wrote Fetch v12","path":"root.input.broker.inputs.1","time":"2022-08-10T00:05:22Z","time_to_write":"70.7µs","write_wait":"2.4775ms"}
qd_kafka_outbox_test_dev | {"@service":"benthos","how":"unassigning everything","input":"","label":"kafka_topic_consumer","level":"info","msg":"assigning partitions","path":"root.input.broker.inputs.0","time":"2022-08-10T00:05:23Z","why":"invalidating all assignments in LeaveGroup"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"trace","msg":"autocommitting","path":"root.input.broker.inputs.0","time":"2022-08-10T00:05:23Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","err":"context canceled","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"heartbeat errored","path":"root.input.broker.inputs.0","time":"2022-08-10T00:05:23Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","broker":"1","bytes_read":"0","err":"context canceled","label":"kafka_topic_consumer","level":"trace","msg":"read Fetch v12","path":"root.input.broker.inputs.0","read_wait":"382.2µs","time":"2022-08-10T00:05:23Z","time_to_read":"973.5218ms"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"trace","msg":"entering OnPartitionsRevoked","path":"root.input.broker.inputs.0","time":"2022-08-10T00:05:23Z","with":"map[heartbeat-localdev:[0 1 2 3 4 5 6 7]]"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"trace","msg":"in CommitOffsetsSync","path":"root.input.broker.inputs.0","time":"2022-08-10T00:05:23Z","with":"map[heartbeat-localdev:map[]]"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"trace","msg":"issuing commit","path":"root.input.broker.inputs.0","time":"2022-08-10T00:05:23Z","uncommitted":"map[heartbeat-localdev:map[]]"}

What broker implementation are you talking to?

I am talking to a docker image of cp-kafka:7.1.1 as I am just doing local testing. Notably I am only working with 1 broker to make things simple. So likewise all my topics have a replication factor of 1 and etc. My broker logs are:

[2022-08-10 02:03:30,664] INFO [GroupCoordinator 1]: Preparing to rebalance group local-queue-dispatcher in state PreparingRebalance with old generation 504 (__consumer_offsets-39) (reason: Adding new member kgo-bd9480cc-38ac-45f0-b476-ff0009c0f62e with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-08-10 02:03:31,152] INFO [GroupCoordinator 1]: Stabilized group local-queue-dispatcher generation 505 (__consumer_offsets-39) with 2 members (kafka.coordinator.group.GroupCoordinator)
[2022-08-10 02:03:31,158] INFO [GroupCoordinator 1]: Assignment received from leader kgo-a38f01a0-9a3b-415a-ba0c-78c2b0bc03eb for group local-queue-dispatcher for generation 505. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2022-08-10 02:03:31,626] INFO [GroupCoordinator 1]: Preparing to rebalance group local-queue-dispatcher in state PreparingRebalance with old generation 505 (__consumer_offsets-39) (reason: Removing member kgo-a38f01a0-9a3b-415a-ba0c-78c2b0bc03eb on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2022-08-10 02:03:31,626] INFO [GroupCoordinator 1]: Member MemberMetadata(memberId=kgo-a38f01a0-9a3b-415a-ba0c-78c2b0bc03eb, groupInstanceId=None, clientId=kgo, clientHost=/169.69.0.16, sessionTimeoutMs=45000, rebalanceTimeoutMs=60000, supportedProtocols=List(cooperative-sticky)) has left group local-queue-dispatcher through explicit `LeaveGroup` request (kafka.coordinator.group.GroupCoordinator)
[2022-08-10 02:03:31,645] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group local-queue-dispatcher in PreparingRebalance state. Created a new member id kgo-186767fe-7b66-4291-9f55-2c1ca84e21e7 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2022-08-10 02:03:31,651] INFO [GroupCoordinator 1]: Stabilized group local-queue-dispatcher generation 506 (__consumer_offsets-39) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2022-08-10 02:03:31,651] INFO [GroupCoordinator 1]: Member MemberMetadata(memberId=kgo-bd9480cc-38ac-45f0-b476-ff0009c0f62e, groupInstanceId=None, clientId=kgo, clientHost=/169.69.0.16, sessionTimeoutMs=45000, rebalanceTimeoutMs=60000, supportedProtocols=List(cooperative-sticky)) has left group local-queue-dispatcher through explicit `LeaveGroup` request (kafka.coordinator.group.GroupCoordinator)
[2022-08-10 02:03:31,654] INFO [GroupCoordinator 1]: Assignment received from leader kgo-186767fe-7b66-4291-9f55-2c1ca84e21e7 for group local-queue-dispatcher for generation 506. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2022-08-10 02:03:31,672] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group local-queue-dispatcher in Stable state. Created a new member id kgo-b5d5878d-be36-435d-a51c-7d823856b5ba and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2022-08-10 02:03:31,674] INFO [GroupCoordinator 1]: Preparing to rebalance group local-queue-dispatcher in state PreparingRebalance with old generation 506 (__consumer_offsets-39) (reason: Adding new member kgo-b5d5878d-be36-435d-a51c-7d823856b5ba with group instance id None) (kafka.coordinator.group.GroupCoordinator)

And then here is the request and response printed:

qd_kafka_outbox_test_dev | req: &kmsg.OffsetCommitRequest{Version:8, Group:"local-queue-dispatcher", Generation:542, MemberID:"kgo-dffcca3d-0925-4015-b368-841933178c81", InstanceID:(*string)(nil), RetentionTimeMillis:-1, Topics:[]kmsg.OffsetCommitRequestTopic{kmsg.OffsetCommitRequestTopic{Topic:"consumer.admin_ops.outbox_heartbeats_kafka_connect", Partitions:[]kmsg.OffsetCommitRequestTopicPartition(nil), UnknownTags:kmsg.Tags{keyvals:map[uint32][]uint8(nil)}}}, UnknownTags:kmsg.Tags{keyvals:map[uint32][]uint8(nil)}}
qd_kafka_outbox_test_dev | res: &kmsg.OffsetCommitResponse{Version:8, ThrottleMillis:0, Topics:[]kmsg.OffsetCommitResponseTopic(nil), UnknownTags:kmsg.Tags{keyvals:map[uint32][]uint8(nil)}}
twmb commented 2 years ago

re: v1.5.1 context.DeadlineExceeded, this was the result of a long discussion... see the commit message for more details.


w.r.t. heartbeating, does it stabilize? I wonder if this is due to a restart. How often are the rejoins?


w.r.t. commit, no partitions are being committed. What happens when a partition is committed? I wonder if Kafka doesn't reply with a topic if that topic has no partitions -- I can look into this and change franz-go accordingly. I also can just change the commit to not include topics with no partitions.

twmb commented 2 years ago

Looking at the Kafka source, I'm pretty sure that if no partitions are in a topic, then the response will not include the topic. Under the hood, the data is passed around as a Map<TopicPartition, Whatever>, and if there is no partition, then there will be no TopicPartition. This answers the third problem, but I guess my question here is: is it intentional for Benthos to be sending empty commits? I think the code on line 312 in this block should actually only add offsets to the finalOffsets map if offsets is non-empty. FWIW, I need to fix this in franz-go as well.

twmb commented 2 years ago

ping on this^, I think the one unknown problem is if rebalances stabilize or not. They should, and I've tested this so much against Kafka that I'd be pretty surprised if it isn't.

twmb commented 2 years ago

The empty commit bugfix has been released in v1.7.0

I think that's the only concrete action from the problems described above.

Please let me know if you try it out, how things work, and if you encounter any other problems.

I'm not quite sure the heartbeating thing is something to change / fix for now. Need more logs.

ekeric13 commented 2 years ago

Hey sorry it's been a bit, got busy at work.

Anyway, so I updated to version v1.7.0 and made your suggestion of not add offsets to the final offset that are empty

and my logs look like this:

qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"joining group","path":"root.input.broker.inputs.1","time":"2022-08-25T19:40:50Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","member_id":"kgo-e215d3d7-30cc-4614-a025-37b9cf2a71aa","msg":"join returned MemberIDRequired, rejoining with response's MemberID","path":"root.input.broker.inputs.1","time":"2022-08-25T19:40:50Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","err":"REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"heartbeat errored","path":"root.input.broker.inputs.0","time":"2022-08-25T19:40:51Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"cooperative consumer calling onRevoke at the end of a session even though no partitions were lost","path":"root.input.broker.inputs.0","time":"2022-08-25T19:40:51Z"}
qd_kafka_outbox_test_dev | req: &kmsg.OffsetCommitRequest{Version:0, Group:"", Generation:-1, MemberID:"", InstanceID:(*string)(nil), RetentionTimeMillis:-1, Topics:[]kmsg.OffsetCommitRequestTopic(nil), UnknownTags:kmsg.Tags{keyvals:map[uint32][]uint8(nil)}}
qd_kafka_outbox_test_dev | res: &kmsg.OffsetCommitResponse{Version:0, ThrottleMillis:0, Topics:[]kmsg.OffsetCommitResponseTopic(nil), UnknownTags:kmsg.Tags{keyvals:map[uint32][]uint8(nil)}}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"joining group","path":"root.input.broker.inputs.0","time":"2022-08-25T19:40:51Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","generation":"977","group":"local-queue-dispatcher","instance_id":"\u003cnil\u003e","label":"kafka_topic_consumer","leader":"false","level":"info","member_id":"kgo-e215d3d7-30cc-4614-a025-37b9cf2a71aa","msg":"joined","path":"root.input.broker.inputs.1","time":"2022-08-25T19:40:51Z"}

....

qd_kafka_outbox_test_dev | {"@service":"benthos","how":"unassigning everything","input":"","label":"kafka_topic_consumer","level":"info","msg":"assigning partitions","path":"root.input.broker.inputs.1","time":"2022-08-25T19:40:51Z","why":"invalidating all assignments in LeaveGroup"}
qd_kafka_outbox_test_dev | {"@service":"benthos","err":"context canceled","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"heartbeat errored","path":"root.input.broker.inputs.1","time":"2022-08-25T19:40:51Z"}
qd_kafka_outbox_test_dev | req: &kmsg.OffsetCommitRequest{Version:0, Group:"", Generation:-1, MemberID:"", InstanceID:(*string)(nil), RetentionTimeMillis:-1, Topics:[]kmsg.OffsetCommitRequestTopic(nil), UnknownTags:kmsg.Tags{keyvals:map[uint32][]uint8(nil)}}
qd_kafka_outbox_test_dev | res: &kmsg.OffsetCommitResponse{Version:0, ThrottleMillis:0, Topics:[]kmsg.OffsetCommitResponseTopic(nil), UnknownTags:kmsg.Tags{keyvals:map[uint32][]uint8(nil)}}
qd_kafka_outbox_test_dev | {"@service":"benthos","how":"unassigning everything","input":"","label":"kafka_topic_consumer","level":"info","msg":"assigning partitions","path":"root.input.broker.inputs.1","time":"2022-08-25T19:40:51Z","why":"clearing assignment at end of group management session"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","member_id":"kgo-e215d3d7-30cc-4614-a025-37b9cf2a71aa","msg":"leaving group","path":"root.input.broker.inputs.1","time":"2022-08-25T19:40:51Z"}

....

qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"beginning to manage the group lifecycle","path":"root.input.broker.inputs.1","time":"2022-08-25T19:40:51Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"joining group","path":"root.input.broker.inputs.1","time":"2022-08-25T19:40:51Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","member_id":"kgo-2bee6fb5-2058-4018-81a1-25c7ac50d56a","msg":"join returned MemberIDRequired, rejoining with response's MemberID","path":"root.input.broker.inputs.1","time":"2022-08-25T19:40:52Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","err":"REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"heartbeat errored","path":"root.input.broker.inputs.0","time":"2022-08-25T19:40:52Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"cooperative consumer calling onRevoke at the end of a session even though no partitions were lost","path":"root.input.broker.inputs.0","time":"2022-08-25T19:40:52Z"}
qd_kafka_outbox_test_dev | req: &kmsg.OffsetCommitRequest{Version:0, Group:"", Generation:-1, MemberID:"", InstanceID:(*string)(nil), RetentionTimeMillis:-1, Topics:[]kmsg.OffsetCommitRequestTopic(nil), UnknownTags:kmsg.Tags{keyvals:map[uint32][]uint8(nil)}}
qd_kafka_outbox_test_dev | res: &kmsg.OffsetCommitResponse{Version:0, ThrottleMillis:0, Topics:[]kmsg.OffsetCommitResponseTopic(nil), UnknownTags:kmsg.Tags{keyvals:map[uint32][]uint8(nil)}}

so it seems that the rebalancing continues? The log of

qd_kafka_outbox_test_dev | {"@service":"benthos","err":"REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"heartbeat errored","path":"root.input.broker.inputs.0","time":"2022-08-25T19:40:51Z"}

never seems to go away.

I am going to play with that OnPartitionsRevoked callback some more and see what is really happening. I think you are saying that we shouldn't be committing offsets that are empty, and that could be causing a rebalance?

twmb commented 2 years ago

The empty commit shouldn't be causing a rebalance, especially not anymore -- empty commits are not issued. This commit: https://github.com/twmb/franz-go/commit/11e327740bf44079a203ed22a919e4f9202a4b68 strips all empty topics from the commit, and then this code: https://github.com/twmb/franz-go/blob/master/pkg/kgo/consumer_group.go#L2493-L2496 avoids committing because there are no topics. You can see that a bit in the req/resp: the versions are 0, every field is it's default value.

This series of logs is really weird:

qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"joining group","path":"root.input.broker.inputs.1","time":"2022-08-25T19:40:50Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","member_id":"kgo-e215d3d7-30cc-4614-a025-37b9cf2a71aa","msg":"join returned MemberIDRequired, rejoining with response's MemberID","path":"root.input.broker.inputs.1","time":"2022-08-25T19:40:50Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","err":"REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"heartbeat errored","path":"root.input.broker.inputs.0","time":"2022-08-25T19:40:51Z"}

the heartbeat loop is not started until after the group is joined.

The client first logs that it will rejoin: https://github.com/twmb/franz-go/blob/master/pkg/kgo/consumer_group.go#L1097 Then it rejoins, and one of these logs should be printed (it is not in your logs): https://github.com/twmb/franz-go/blob/master/pkg/kgo/consumer_group.go#L1121-L1140 and then a sync happens, and THEN the client starts the heartbeat loo (in setupAssignedAndHeartbeat): https://github.com/twmb/franz-go/blob/master/pkg/kgo/consumer_group.go#L310

Do you have two clients running / are random clients constantly being recreated, causing repeated rebalances?

ekeric13 commented 2 years ago

I do have two clients running in parallel. Same consumer group name but consumes different topics. The logs with just one client running does have this section of logs continously repeated:

qd_kafka_outbox_test_dev | {"@service":"benthos","err":"context canceled","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"heartbeat errored","path":"root.input","time":"2022-08-25T20:31:52Z"}
qd_kafka_outbox_test_dev | req: &kmsg.OffsetCommitRequest{Version:0, Group:"", Generation:-1, MemberID:"", InstanceID:(*string)(nil), RetentionTimeMillis:-1, Topics:[]kmsg.OffsetCommitRequestTopic(nil), UnknownTags:kmsg.Tags{keyvals:map[uint32][]uint8(nil)}}
qd_kafka_outbox_test_dev | res: &kmsg.OffsetCommitResponse{Version:0, ThrottleMillis:0, Topics:[]kmsg.OffsetCommitResponseTopic(nil), UnknownTags:kmsg.Tags{keyvals:map[uint32][]uint8(nil)}}
qd_kafka_outbox_test_dev | {"@service":"benthos","how":"unassigning everything","input":"","label":"kafka_topic_consumer","level":"info","msg":"assigning partitions","path":"root.input","time":"2022-08-25T20:31:52Z","why":"clearing assignment at end of group management session"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","member_id":"kgo-5f316e7a-568e-4504-a024-8ed1f497c5db","msg":"leaving group","path":"root.input","time":"2022-08-25T20:31:52Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","how":"unassigning everything","input":"","label":"kafka_topic_consumer","level":"info","msg":"assigning partitions","path":"root.input","time":"2022-08-25T20:31:52Z","why":"invalidating all assignments in LeaveGroup"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"beginning autocommit loop","path":"root.input","time":"2022-08-25T20:31:52Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"Receiving messages from Kafka topics: [ethos.admin_ops.outbox_heartbeats_kafka_connect]","path":"root.input","time":"2022-08-25T20:31:52Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"immediate metadata update triggered","path":"root.input","time":"2022-08-25T20:31:52Z","why":"client initialization"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"beginning to manage the group lifecycle","path":"root.input","time":"2022-08-25T20:31:52Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"joining group","path":"root.input","time":"2022-08-25T20:31:52Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","member_id":"kgo-3a9c3b67-271b-4aab-869b-e0c481650a91","msg":"join returned MemberIDRequired, rejoining with response's MemberID","path":"root.input","time":"2022-08-25T20:31:52Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","balance_protocol":"cooperative-sticky","generation":"1095","group":"local-queue-dispatcher","instance_id":"\u003cnil\u003e","label":"kafka_topic_consumer","leader":"true","level":"info","member_id":"kgo-3a9c3b67-271b-4aab-869b-e0c481650a91","msg":"joined, balancing group","path":"root.input","time":"2022-08-25T20:31:52Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"balancing group as leader","path":"root.input","time":"2022-08-25T20:31:52Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","id":"kgo-3a9c3b67-271b-4aab-869b-e0c481650a91","interests":"interested topics: [ethos.admin_ops.outbox_heartbeats_kafka_connect], previously owned: ","label":"kafka_topic_consumer","level":"info","msg":"balance group member","path":"root.input","time":"2022-08-25T20:31:52Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"balanced","path":"root.input","plan":"kgo-3a9c3b67-271b-4aab-869b-e0c481650a91{ethos.admin_ops.outbox_heartbeats_kafka_connect[0 1 2 3]}","time":"2022-08-25T20:31:52Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"syncing","path":"root.input","protocol":"cooperative-sticky","protocol_type":"consumer","time":"2022-08-25T20:31:52Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","assigned":"ethos.admin_ops.outbox_heartbeats_kafka_connect[0 1 2 3]","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"synced","path":"root.input","time":"2022-08-25T20:31:52Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","added":"ethos.admin_ops.outbox_heartbeats_kafka_connect[0 1 2 3]","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","lost":"","msg":"new group session begun","path":"root.input","time":"2022-08-25T20:31:52Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"beginning heartbeat loop","path":"root.input","time":"2022-08-25T20:31:52Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","how":"assigning everything new, keeping current assignment","input":"ethos.admin_ops.outbox_heartbeats_kafka_connect[0{1 e0 ce0} 1{-2 e-1 ce0} 2{-2 e-1 ce0} 3{-2 e-1 ce0}]","label":"kafka_topic_consumer","level":"info","msg":"assigning partitions","path":"root.input","time":"2022-08-25T20:31:52Z","why":"newly fetched offsets for group local-queue-dispatcher"}
qd_kafka_outbox_test_dev | {"@service":"benthos","how":"unassigning everything","input":"","label":"kafka_topic_consumer","level":"info","msg":"assigning partitions","path":"root.input","time":"2022-08-25T20:31:53Z","why":"invalidating all assignments in LeaveGroup"}
qd_kafka_outbox_test_dev | {"@service":"benthos","err":"context canceled","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"heartbeat errored","path":"root.input","time":"2022-08-25T20:31:53Z"}

So I still have some sort heartbeat error that continuously causes the group to be created and then killed. But now rebalancing in progress stuff.

logs from kafka:

[2022-08-25 20:41:02,222] INFO [GroupCoordinator 1]: Stabilized group local-queue-dispatcher generation 39 (__consumer_offsets-39) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2022-08-25 20:41:02,226] INFO [GroupCoordinator 1]: Assignment received from leader kgo-6805234a-e43c-426a-8c3a-dbda682386ee for group local-queue-dispatcher for generation 39. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2022-08-25 20:41:03,212] INFO [GroupCoordinator 1]: Preparing to rebalance group local-queue-dispatcher in state PreparingRebalance with old generation 39 (__consumer_offsets-39) (reason: Removing member kgo-6805234a-e43c-426a-8c3a-dbda682386ee on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2022-08-25 20:41:03,213] INFO [GroupCoordinator 1]: Group local-queue-dispatcher with generation 40 is now empty (__consumer_offsets-39) (kafka.coordinator.group.GroupCoordinator)
[2022-08-25 20:41:03,213] INFO [GroupCoordinator 1]: Member MemberMetadata(memberId=kgo-6805234a-e43c-426a-8c3a-dbda682386ee, groupInstanceId=None, clientId=kgo, clientHost=/169.69.0.16, sessionTimeoutMs=45000, rebalanceTimeoutMs=60000, supportedProtocols=List(cooperative-sticky)) has left group local-queue-dispatcher through explicit `LeaveGroup` request (kafka.coordinator.group.GroupCoordinator)
[2022-08-25 20:41:03,224] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group local-queue-dispatcher in Empty state. Created a new member id kgo-acc97e32-68e6-4957-becf-42202581baff and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2022-08-25 20:41:03,225] INFO [GroupCoordinator 1]: Preparing to rebalance group local-queue-dispatcher in state PreparingRebalance with old generation 40 (__consumer_offsets-39) (reason: Adding new member kgo-acc97e32-68e6-4957-becf-42202581baff with group instance id None) (kafka.coordinator.group.GroupCoordinator)
twmb commented 2 years ago

Is line 379 closing the client on DeadlineExceeded, then continuing, and my guess is, quitting because the next poll returns ErrClientClosed?

ekeric13 commented 2 years ago

Yeah, not sure why the client is getting closed there. Refactoring the logic behind the error handling to assume we will be getting harmless context.DeadlineExceeded errors often and I got it working. This also solved the issue of running multiple clients in parallel having rebalancing issues.

You can see the changes I made here: https://github.com/benthosdev/benthos/compare/main...ekeric13:benthos:eric-bump-franz-go-version-2

Also I cannot reproduce the SASL to TLS that this issue was originally about now that I am on v1.7 and the code has been updated to work with the breaking changes. So I think this issue might be closed!

twmb commented 2 years ago

ok, good, that's great. Sorry about the breaking(ish) changes, the API didn't change, but I do concede that you need to handle a new error new if you previously canceled contexts.