dashbitco / broadway_kafka

A Broadway connector for Kafka
232 stars 53 forks source link

Deadlock on race between assigments_revoked call and handle DOWN message #116

Closed anoskov closed 1 year ago

anoskov commented 2 years ago

Hello! As a result of the analysis of the previous issue (https://github.com/dashbitco/broadway_kafka/issues/112), it turned out that this was caused by deadlock. Deadlock on race between a) assigments_revoked callback call inside :brod_group_coordinator b) handle 'DOWN' message inside BroadwayKafka producer

I'll describe: a) :brod_group_coordinator in stabilize function do assigments_revoked call (BroadwayKafka producer implementation) https://github.com/kafka4beam/brod/blob/master/src/brod_group_coordinator.erl#L502 which do infinity GenStage.call to producer https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L525 b) Producer a moment before received 'DOWN' message and called :brod_group_coordinator.stop https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L422 who is stuck and waiting result of a)

group coordinator process info:

[
  current_function: {:gen, :do_call, 4},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 2,
  links: [],
  dictionary: [
    rand_seed: {%{
       bits: 58,
       jump: #Function<3.92093067/1 in :rand."-fun.exsplus_jump/1-">,
       next: #Function<0.92093067/1 in :rand."-fun.exsss_next/1-">,
       type: :exsss,
       uniform: #Function<1.92093067/1 in :rand."-fun.exsss_uniform/1-">,
       uniform_n: #Function<2.92093067/2 in :rand."-fun.exsss_uniform/2-">
     }, [23511365307712030 | 235316257979211424]},
    "$initial_call": {:brod_group_coordinator, :init, 1},
    "$ancestors": [Messaging.Events.Kafka.Consumer.Broadway.Producer_4,
     Messaging.Events.Kafka.Consumer.Broadway.ProducerSupervisor,
     Messaging.Events.Kafka.Consumer.Broadway.Supervisor,
     Messaging.Events.Kafka.Consumer, Messaging.Supervisor, #PID<0.4609.0>]
  ],
  trap_exit: true,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.4608.0>,
  total_heap_size: 3577,
  heap_size: 2586,
  stack_size: 48,
  reductions: 521044,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 20,
    minor_gcs: 17
  ],
  suspending: []
]

group coordinator stacktrace

{:current_stacktrace,
 [
   {:gen, :do_call, 4, [file: 'gen.erl', line: 214]},
   {GenServer, :call, 3, [file: 'lib/gen_server.ex', line: 1027]},
   {BroadwayKafka.Producer, :"-assignments_revoked/1-fun-1-", 2,
    [file: 'lib/broadway_kafka/producer.ex', line: 525]},
   {:telemetry, :span, 3,
    [file: '/builds/ccs/messaging/deps/telemetry/src/telemetry.erl', line: 320]},
   {:brod_group_coordinator, :stabilize, 3,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 502
    ]},
   {:brod_group_coordinator, :handle_info, 2,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 376
    ]},
   {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 695]},
   {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 771]}
 ]}

producer stacktrace

{:current_stacktrace,
 [
   {:brod_group_coordinator, :stop, 1,
    [
      file: '/builds/ccs/messaging/deps/brod/src/brod_group_coordinator.erl',
      line: 311
    ]},
   {BroadwayKafka.Producer, :terminate, 2,
    [file: 'lib/broadway_kafka/producer.ex', line: 533]},
   {:gen_server, :try_terminate, 3, [file: 'gen_server.erl', line: 733]},
   {:gen_server, :terminate, 10, [file: 'gen_server.erl', line: 918]},
   {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}
 ]}
zavndw commented 1 year ago

Join this issue

danmarcab commented 1 year ago

We are also experiencing this issue, trying to reproduce it in dev to attempt a fix, but no luck so far

slashmili commented 1 year ago

Should be fixed in #121. Please give the main branch a try