dashbitco / broadway_kafka

A Broadway connector for Kafka
232 stars 53 forks source link

No rejoin after "payload connection down :shutdown, :tcp_closed}" deadlock on race between assigments_revoked call and handle DOWN message #112

Closed anoskov closed 1 year ago

anoskov commented 2 years ago

Hello! We have some issue with brod and broadway_kafka in kubernetes. When the service starts all consumers join to group and works fine. But after some time we got

2022-07-24 16:05:04.207 pid=<0.7117.24> [info] Group member (admin,coor=#PID<0.7117.24>,cb=#PID<0.20534.22>,generation=172):
re-joining group, reason:{:connection_down, {:shutdown, :tcp_closed}}
2022-07-24 16:05:04.207 pid=<0.7119.24> [info] client Client: payload connection down kafka-host:9092
reason:{:shutdown, :tcp_closed}

and after this brod doesn't reconnect and kafka-consumer-groups.sh says that consumer group has no active members.

We have been using raw brod quite a long time and usually he did reconnect after network issues.

What could be wrong? Maybe broadway_kafka doesn't rejoin after brod client down?

josevalim commented 2 years ago

Hi @anoskov! I recommend reaching out to ElixirForum for general help/questions, as it is more likely someone with a similar issue as yours can see your report.

anoskov commented 2 years ago

Thank you for answer. I asked a question there.

I noticed that disconnect occurs 10 minutes after connection. This value is simillar to connections.max.idle.ms and looks like kafka disonnects idle connections. I know that some kafka clients supports reconnect on idle feature. Does the brod/brodway_kafka support it?

UPD: brod_client process is alive and BroadwayKafka.BrodClient.connected?/1 says that it is connected and :brod.fetch/4 also returns messages. But in state we see dead_since

iex(admin@)32> :sys.get_state(:erlang.whereis(Admin.Kafka.Consumer.Broadway.Producer_0.Client))
{:state, Admin.Kafka.Consumer.Broadway.Producer_0.Client, [{"kafka", 9092}],
 #PID<0.31736.27>,
 [
   {:conn, {"kafka-0", 9092},
    {:dead_since, {1658, 257147, 463027}, {:shutdown, :tcp_closed}}}
 ], #PID<0.5428.4>, #PID<0.5430.4>, [connect_timeout: 10000],
 Admin.Kafka.Consumer.Broadway.Producer_0.Client}
iex(admin@7)33> BroadwayKafka.BrodClient.connected?(Admin.Kafka.Consumer.Broadway.Producer_0.Client)
true

looks like brod client reconnects but consumer doesn't rejoin

UPD2:

I checked Producer state and got timeout on :sys.get_state because it stuck in handle_info callback on BrodClient.stop_group_coordinator -> :brod_group_coordinator.stop

iex(admin@)13> :erlang.process_info(:erlang.whereis(Admin.Kafka.Consumer.Broadway.Producer_1))
[
  registered_name: Admin.Kafka.Consumer.Broadway.Producer_1,
  current_function: {:brod_group_coordinator, :stop, 1},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 4,
  links: [#PID<0.3880.0>],
  dictionary: [
    {63, []},
    {62, []},
    {61, []},
    {60, []},
    {59, []},
    {58, []},
    {57, []},
    {56, []},
    {55, []},
    {54, []},
    {53, []},
    {52, []},
    {51, []},
    {50, []},
    {49, []},
    {:"$initial_call", {GenStage, :init, 1}},
    {48, []},
    {:"$ancestors",
     [Admin.Kafka.Consumer.Broadway.ProducerSupervisor,
      Admin.Kafka.Consumer.Broadway.Supervisor, Admin.Kafka.Consumer,
      Admin.Supervisor, #PID<0.3569.0>]},
    {47, []},
    {46, []},
    {45, []},
    {44, []},
    {43, []},
    {42, []},
    {41, []},
    {40, []},
    {39, []},
    {38, []},
    {37, []},
    {36, []},
    {35, []},
    {34, []},
    {33, []},
    {32, []},
    {31, []},
    {30, []},
    {29, []},
    {28, []},
    {27, []},
    {26, []},
    {25, []},
    {24, ...},
    {...},
    ...
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.3568.0>,
  total_heap_size: 20338,
  heap_size: 2586,
  stack_size: 29,
  reductions: 20656517,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 65535,
    minor_gcs: 2858
  ],
  suspending: []
]
anoskov commented 1 year ago

@josevalim @slashmili Hello! It still occurs on 'main' branch. If for some time consumer doesn't receive message kafka disconnects idle connections. After this broadway producer and brod coordinator deadlock each other

consumer group info

I have no name!@kafka-0:/opt/bitnami/kafka$ ./bin/kafka-consumer-groups.sh --describe --group messaging --bootstrap-server localhost:9092
Consumer group 'messaging' has no active members.

brod client state

{:state, Messaging.Events.Kafka.Consumer.Broadway.Producer_0.Client,
 [{"kafka", 9092}], :undefined,
 [
   {:conn, {"kafka-0.cluster.local", 9092},
    {:dead_since, {1675, 174014, 110349}, {:shutdown, :tcp_closed}}}
 ], #PID<0.12743.1>, #PID<0.12744.1>,
 [connect_timeout: 10000, request_timeout: 240000],
 Messaging.Events.Kafka.Consumer.Broadway.Producer_0.Client}

trying get producer state

** (exit) exited in: :sys.get_state(#PID<0.12240.1>)
    ** (EXIT) time out

but it stuck on :brod_group_coordinator.stop

[
  registered_name: Messaging.Events.Kafka.Consumer.Broadway.Producer_0,
  current_function: {:brod_group_coordinator, :stop, 1},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 3,
  links: [#PID<0.5133.0>],
  dictionary: [
    {63, []},
    {62, []},
    {61, []},
    {60, []},
    {59, []},
    {58, []},
    {57, []},
    {56, []},
    {55, []},
    {54, []},
    {53, []},
    {52, []},
    {51, []},
    {50, []},
    {49, []},
    {48, []},
    {47, []},
    {46, []},
    {45, []},
    {44, []},
    {43, []},
    {42, []},
    {41, []},
    {40, []},
    {:"$initial_call", {GenStage, :init, 1}},
    {39, []},
    {38, []},
    {37, []},
    {:"$ancestors",
     [Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
      Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
      Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4877.0>]},
    {36, []},
    {35, []},
    {34, []},
    {33, []},
    {32, []},
    {31, []},
    {30, []},
    {29, []},
    {28, []},
    {27, []},
    {26, []},
    {25, []},
    {24, ...},
    {...},
    ...
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.4876.0>,
  total_heap_size: 32885,
  heap_size: 4185,
  stack_size: 29,
  reductions: 3299758,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 20,
    minor_gcs: 7
  ],
  suspending: []
]

coordinator stuck on assigments_revoked call

[
  current_function: {:gen, :do_call, 4},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 3,
  links: [],
  dictionary: [
    "$initial_call": {:brod_group_coordinator, :init, 1},
    "$ancestors": [Messaging.Events.Kafka.Consumer.Broadway.Producer_1,
     Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
     Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
     Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4877.0>]
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.4876.0>,
  total_heap_size: 17734,
  heap_size: 6772,
  stack_size: 48,
  reductions: 19501,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 20,
    minor_gcs: 3
  ],
  suspending: []
]

coordinator stacktrace

{:current_stacktrace,
 [
   {:gen, :do_call, 4, [file: 'gen.erl', line: 237]},
   {GenServer, :call, 3, [file: 'lib/gen_server.ex', line: 1035]},
   {BroadwayKafka.Producer, :"-assignments_revoked/1-fun-1-", 2,
    [file: 'lib/broadway_kafka/producer.ex', line: 532]},
   {:telemetry, :span, 3,
    [file: '/builds/ccs/messaging/deps/telemetry/src/telemetry.erl', line: 320]},
   {:brod_group_coordinator, :stabilize, 3,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 502
    ]},
   {:brod_group_coordinator, :handle_info, 2,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 372
    ]},
   {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 1120]},
   {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 1197]}
 ]}

producer stacktrace

{:current_stacktrace,
 [
   {:brod_group_coordinator, :stop, 1,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 311
    ]},
   {BroadwayKafka.Producer, :terminate, 2,
    [file: 'lib/broadway_kafka/producer.ex', line: 540]},
   {:gen_server, :try_terminate, 3, [file: 'gen_server.erl', line: 1158]},
   {:gen_server, :terminate, 10, [file: 'gen_server.erl', line: 1348]},
   {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 240]}
 ]}
josevalim commented 1 year ago

@anoskov can you please try the main branch again?

anoskov commented 1 year ago

@anoskov can you please try the main branch again?

Thank you! I'll check

anoskov commented 1 year ago

@josevalim Hello! We tried fix for a week and unfortunately it didn't help. Looks like its stuck on Process.exit https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L430

** (exit) exited in: :sys.get_state(Messaging.Events.Kafka.Consumer.Broadway.Producer_1)
    ** (EXIT) time out
    (stdlib 4.0.1) sys.erl:338: :sys.send_system_msg/2
    (stdlib 4.0.1) sys.erl:139: :sys.get_state/1
    iex:4: (file)

producer stacktrace

{:current_stacktrace,
 [
   {BroadwayKafka.Producer, :handle_info, 2,
    [file: 'lib/broadway_kafka/producer.ex', line: 430]},
   {Broadway.Topology.ProducerStage, :handle_info, 2,
    [file: 'lib/broadway/topology/producer_stage.ex', line: 229]},
   {GenStage, :noreply_callback, 3, [file: 'lib/gen_stage.ex', line: 2117]},
   {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 1120]},
   {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 1197]},
   {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 240]}
 ]}

coordinator stacktrace

{:current_stacktrace,
 [
   {:gen, :do_call, 4, [file: 'gen.erl', line: 237]},
   {GenServer, :call, 3, [file: 'lib/gen_server.ex', line: 1035]},
   {BroadwayKafka.Producer, :"-assignments_revoked/1-fun-1-", 2,
    [file: 'lib/broadway_kafka/producer.ex', line: 539]},
   {:telemetry, :span, 3,
    [file: '/builds/ccs/messaging/deps/telemetry/src/telemetry.erl', line: 321]},
   {:brod_group_coordinator, :stabilize, 3,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 502
    ]},
   {:brod_group_coordinator, :handle_info, 2,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 372
    ]},
   {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 1120]},
   {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 1197]}
 ]}

additional info

[
  current_function: {:gen, :do_call, 4},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 2,
  links: [],
  dictionary: [
    "$initial_call": {:brod_group_coordinator, :init, 1},
    "$ancestors": [Messaging.Events.Kafka.Consumer.Broadway.Producer_8,
     Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
     Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
     Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4769.0>]
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.4768.0>,
  total_heap_size: 10961,
  heap_size: 4185,
  stack_size: 48,
  reductions: 19899,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 20,
    minor_gcs: 7
  ],
  suspending: []
]
[
  registered_name: Messaging.Events.Kafka.Consumer.Broadway.Producer_1,
  current_function: {BroadwayKafka.Producer, :handle_info, 2},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 7,
  links: [#PID<0.5024.0>],
  dictionary: [
    {:"$initial_call", {GenStage, :init, 1}},
    {:"$ancestors",
     [Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
      Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
      Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4769.0>]},
    {63, []},
    {62, []},
    {61, []},
    {60, []},
    {59, []},
    {58, []},
    {57, []},
    {56, []},
    {55, []},
    {54, []},
    {53, []},
    {52, []},
    {51, []},
    {50, []},
    {49, []},
    {48, []},
    {47, []},
    {46, []},
    {45, []},
    {44, []},
    {43, []},
    {42, []},
    {41, []},
    {40, []},
    {39, []},
    {38, []},
    {37, []},
    {36, []},
    {35, []},
    {34, []},
    {33, []},
    {32, []},
    {31, []},
    {30, []},
    {29, []},
    {28, []},
    {27, []},
    {26, []},
    {25, []},
    {24, ...},
    {...},
    ...
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.4768.0>,
  total_heap_size: 35914,
  heap_size: 6772,
  stack_size: 28,
  reductions: 6413879,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 20,
    minor_gcs: 8
  ],
  suspending: []
]
josevalim commented 1 year ago

This is very unexpected because Process.exit is, afaik, asynchronous.

anoskov commented 1 year ago

@josevalim maybe stacktrace not accurate and it stucks on https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L432 ? Although I don't understand how that could be.

josevalim commented 1 year ago

Can you consistently reproduce it now or it only happens from time to time?

anoskov commented 1 year ago

I can't reproduce this manual but it happens few times a week when some of our servers is idle and kafka closes connect.

I tried to emulate two gen servers, where the second calls the first while he process msg in handle_info and do Process.exit for second. And it works good, first server got msg in receive block after Process.exit.

If you need any additional information I can collect it next time

anoskov commented 1 year ago

@josevalim seems to have found the problem. brod_group_coordinator sets trap_exit: true on start https://github.com/kafka4beam/brod/blob/master/src/brod_group_coordinator.erl#L320

So

Process.exit(pid, reason) If pid is trapping exits, the exit signal is transformed into a message {:EXIT, from, reason} and delivered to the message queue of pid

coordinator should handle it here https://github.com/kafka4beam/brod/blob/master/src/brod_group_coordinator.erl#L374-L386 but stucks on call to Producer which waiting for {:DOWN, _, _, ^coord, _} from coordinator

anoskov commented 1 year ago

@josevalim Hello. Can we just remove receive block in producer's handle_info or he should wait end of exit?

josevalim commented 1 year ago

@anoskov I think we can remove it. I have just pushed a commit that does so, please give it a try.

anoskov commented 1 year ago

@josevalim hm. I still see it in main branch https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L428 commit doesn't include this https://github.com/dashbitco/broadway_kafka/commit/6ef6f41fab0fa5bcf8b322ac9129042c8ce45ceb

josevalim commented 1 year ago

Apologies, I clearly pushed the wrong commit. It is there now.

anoskov commented 1 year ago

Looks like fix helped. Thank you