kafka4beam / brod

Apache Kafka client library for Erlang/Elixir
Apache License 2.0
666 stars 202 forks source link

Process abnormal exit during termination when custom partitioning strategy is used #607

Open Tasyp opened 1 week ago

Tasyp commented 1 week ago

I have implemented a consumer using brod_group_subscriber_v2 and with a custom partitioning strategy. The setup includes multiple consumers on different nodes. The setup works well until you try to shut down the application node by node.

The process crashes with the following error:

(ErlangError) Erlang error: {:noproc, {:gen_server, :call, [#PID<0.4314.0>, {:assign_partitions, ...}]}}
  File "gen_server.erl", line 385, in :gen_server.call/3
  File "/app/deps/brod/src/brod_group_coordinator.erl", line 837, in :brod_group_coordinator.assign_partitions/1
  File "/app/deps/brod/src/brod_group_coordinator.erl", line 669, in :brod_group_coordinator.sync_group/1
  File "/app/deps/brod/src/brod_group_coordinator.erl", line 571, in :brod_group_coordinator.do_stabilize/3
  File "/app/deps/brod/src/brod_group_coordinator.erl", line 572, in :brod_group_coordinator.do_stabilize/3
  File "/app/deps/brod/src/brod_group_coordinator.erl", line 416, in :brod_group_coordinator.handle_info/2
  File "gen_server.erl", line 1123, in :gen_server.try_dispatch/4
  File "gen_server.erl", line 1200, in :gen_server.handle_msg/6

I wanted to ask whether this is an expected behavior? It seems as if the coordinator is still up but the brod_group_subscriber_v2 process has already exited so it cannot respond.

I am not quite sure how to fix it because the coordinator seems to be linked to the group subscriber. So I would assume this shouldn't happen at all?

If you have any suggestions, on how to avoid this crash, I could help with implementing it and opening a PR.

fmcgeough commented 1 week ago

Are you saying that you set partition_assignment_strategy to callback_implemented when you start brod_group_subscriber_v2?

Tasyp commented 1 week ago

Are you saying that you set partition_assignment_strategy to callback_implemented when you start brod_group_subscriber_v2?

Correct, yes. I've implemented the callback as well.

zmstone commented 1 week ago

Hi @Tasyp If brod_group_subscriber_v2 shuts down, brod_group_coordinator should receive an EXIT message and terminate itself. https://github.com/kafka4beam/brod/blob/5172dbe5565bf1f234b8e5eaa7dc8924c1d3c05a/src/brod_group_coordinator.erl#L378-L384

The noproc exception when making a call to the MemberPid seems to be a race condition, Maybe you can try to see if there is a {'EXIT', Pid, shutdown} message in the coordinator process mailbox written to the log (which seems to be truncated when reporting this issue)?

Anyways, the fix is for brod_group_coordinator to check if MemberPid is alive or catch noproc excaption when evaluating below callbacks:

  1. MemberModule:assignments_revoked
  2. MemberModule:assign_partitions
  3. MemberModule:assignments_received

If MemberPid is not alive, it should terminate itself (which will trigger a leave-group request in the gen_server terminate callback.