dashbitco / broadway_kafka

A Broadway connector for Kafka
233 stars 53 forks source link

Add "offset_reset_policy" option #11

Closed sweco-semtne closed 4 years ago

sweco-semtne commented 4 years ago

Hi! Is it possible to set the "offset_reset_policy" to get the producer to start from offset "latest" and not from 0?

msaraiva commented 4 years ago

Hi @sweco-semtne!

The default behaviour is already starting from the latest committed offset. This value is sent to the group member (the Broadway producer) by the group coordinator as soon as the topic/partition is assigned to it. There should be only two situations where it should be 0:

  1. The Broadway producer is fetching records from a new stream
  2. The client crashed before sending the commit offset request, so the newly assigned group member will start from 0 again.

Can you confirm you're not falling into one of those situations? If case you're not, can you send us details of your pipeline configuration?

Thanks a lot for reporting this issue.

msaraiva commented 4 years ago

@sweco-semtne we're adding an offset_reset_policy option 👍

@dtykocki thanks for the detailed report. It was very helpful.

msaraiva commented 4 years ago

Closed in #14.

Please let me know if you run into any issue.

dtykocki commented 4 years ago

Thanks @msaraiva. Took this one for a test drive and seeing crashes while attempting to determine the offset:

06:22:32.057 [error] GenServer ExampleBroadwayKafka.MyBroadway.Broadway.Producer_9 terminating
** (RuntimeError) cannot resolve begin offset (hosts=["host-1": 9096, "host-2": 9096, "host-3": 9096] topic=whitelist partition=1).
 Reason: [{{:"host-1", 9096},
            {{{:kpro_req, #Reference<0.662181211.1724383238.252780>, :api_versions, 0, false, ""}, :closed},
              [{:kpro_lib, :send_and_recv_raw, 4, [file: 'src/kpro_lib.erl', line: 71]}, {:kpro_lib, :send_and_recv, 5, [file: 'src/kpro_lib.erl', line: 82]}, {:kpro_connection, :query_api_versions, 4, [file: 'src/kpro_connection.erl', line: 246]}, {:kpro_connection, :init_connection, 2, [file: 'src/kpro_connection.erl', line: 233]}, {:kpro_connection, :init, 4, [file: 'src/kpro_connection.erl', line: 170]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}]}},
          {{:"host-2", 9096},
            {{{:kpro_req, #Reference<0.662181211.1724383238.252795>, :api_versions, 0, false, ""}, :closed},
              [{:kpro_lib, :send_and_recv_raw, 4, [file: 'src/kpro_lib.erl', line: 71]}, {:kpro_lib, :send_and_recv, 5, [file: 'src/kpro_lib.erl', line: 82]}, {:kpro_connection, :query_api_versions, 4, [file: 'src/kpro_connection.erl', line: 246]}, {:kpro_connection, :init_connection, 2, [file: 'src/kpro_connection.erl', line: 233]}, {:kpro_connection, :init, 4, [file: 'src/kpro_connection.erl', line: 170]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}]}},
          {{:"host-3", 9096},
            {{{:kpro_req, #Reference<0.662181211.1724383238.252812>, :api_versions, 0, false, ""}, :closed},
              [{:kpro_lib, :send_and_recv_raw, 4, [file: 'src/kpro_lib.erl', line: 71]}, {:kpro_lib, :send_and_recv, 5, [file: 'src/kpro_lib.erl', line: 82]}, {:kpro_connection, :query_api_versions, 4, [file: 'src/kpro_connection.erl', line: 246]}, {:kpro_connection, :init_connection, 2, [file: 'src/kpro_connection.erl', line: 233]}, {:kpro_connection, :init, 4, [file: 'src/kpro_connection.erl', line: 170]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}]}}]
(broadway_kafka) lib/brod_client.ex:137: BroadwayKafka.BrodClient.resolve_offset/5
(broadway_kafka) lib/producer.ex:252: anonymous fn/3 in BroadwayKafka.Producer.handle_info/2
(elixir) lib/enum.ex:1336: Enum."-map/2-lists^map/1-0-"/2
(broadway_kafka) lib/producer.ex:241: BroadwayKafka.Producer.handle_info/2
(broadway) lib/broadway/producer.ex:258: Broadway.Producer.handle_info/2
(gen_stage) lib/gen_stage.ex:2086: GenStage.noreply_callback/3
(stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:711: :gen_server.handle_msg/6

This very well could be related to running Kafka in Heroku. I'll poke around a bit to see if something more subtle is going on.

msaraiva commented 4 years ago

@dtykocki thanks to @gabrielgiordan we have a fix for this which has just been merged. Please let me know if any of you are still facing any issue, otherwise, I'll release a new version with both PRs merged.

Cheers.

dtykocki commented 4 years ago

🎉 Looking good now. Thanks @msaraiva and @gabrielgiordan!