dashbitco / broadway_kafka

A Broadway connector for Kafka
233 stars 53 forks source link

Offset reset policy isn't applied for offsets out of range. #71

Closed jamespeterschinner closed 2 years ago

jamespeterschinner commented 2 years ago

When the BroadwayKafka.Producer receives new assignments from from the :brod_group_coordinator the offset reset policy isn't applied for offsets which are out of range, but rather only for offsets that are :undefined. See: https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/brod_client.ex#L130.

I expected the :offset_reset_policy to behave as per the :brod_consumer documentation

Seems like the :offset_reset_policy should be handled in the fetch function (catch the :offset_out_of_range error and try again?)

v0idpwn commented 2 years ago

Can this exception happen in BroadwayKafka, considering it never uses external input as the offset, only rely on information given by kafka?

jamespeterschinner commented 2 years ago

Yeah, seems like it can, however I'm not sure of the exact mechanism which caused it:

00:11:53.607 [error] GenServer MyApp.BroadwayPipeline.Broadway.Producer_0 terminating
** (RuntimeError) cannot fetch records from Kafka (topic=stg partition=0 offset=33841). Reason: :offset_out_of_range
    (broadway_kafka 0.1.4) lib/producer.ex:475: BroadwayKafka.Producer.fetch_messages_from_kafka/3
    (broadway_kafka 0.1.4) lib/producer.ex:247: BroadwayKafka.Producer.handle_info/2
    (broadway 0.6.2) lib/broadway/topology/producer_stage.ex:228: Broadway.Topology.ProducerStage.handle_info/2
    (gen_stage 1.1.0) lib/gen_stage.ex:2108: GenStage.noreply_callback/3
    (stdlib 3.17) gen_server.erl:695: :gen_server.try_dispatch/4
    (stdlib 3.17) gen_server.erl:771: :gen_server.handle_msg/6
    (stdlib 3.17) proc_lib.erl:226: :proc_lib.init_p_do_apply/3

I think maybe this offset was deleted due to the retention period, which you'd think would mean the :brod_group_coordinator would not send out that offset, but for whatever reason it is and causes a crash.

josevalim commented 2 years ago

Does it happen consistently? Or maybe it was a race condition indeed?

jamespeterschinner commented 2 years ago

Manually restarting the (Elixir) application didn't resolve the issue, I thought that there may have been some state somewhere which wasn't cleared when the producer crashed, but this isn't the case. I checked the broker settings and the retention.ms is configured to 7 days.

josevalim commented 2 years ago

Oh, so we are permanently requesting an offset that is no longer available? If so, Yeah, we should consider resetting it even in those cases. A PR is welcome!

jamespeterschinner commented 2 years ago

Regarding the mechanism, my current thinking is that this application wasn't able to connect for 7 days (maybe some certificate issue due to rolling certs), after which time the events were deleted due to the retention period, then upon reconnecting we get this error. This seems to be the most plausible scenario.

v0idpwn commented 2 years ago

I think this should be closed per #72 :)