kafkaex / kafka_ex

Kafka client library for Elixir
MIT License
595 stars 163 forks source link

badmatch in NetworkClient #449

Open TC-aLi opened 2 years ago

TC-aLi commented 2 years ago

We're using kafka_ex as the driver for a 6 broker kafka cluster. It's integrated in an Erlang app. We also introduced worker_pool. When reviewing logs, I found this:

=ERROR REPORT==== ** Generic server 'wpool_pool-kafka_ex_pool-9' terminating ** Last message in was update_consumer_metadata ** When Server state == #{'__struct__' => 'Elixir.KafkaEx.Server.State', ...,...},...],...},...]},...} registered_name: 'wpool_pool-kafka_ex_pool-9' ancestors: ['wpool_pool-kafka_ex_pool-process-sup',kafka_ex_pool,<0.2763.0>] Supervisor: {local,'wpool_pool-kafka_ex_pool-process-sup'} Offender: [{pid,<0.3007.0>},{id,'wpool_pool-kafka_ex_pool-9'},{mfargs,{wpool_process,start_link,['wpool_pool-kafka_ex_pool-9','Elixir.KafkaEx.Server0P10AndLater',[[{uris,[{url,9094}]},{consumer_group,<<"kafka_ex">>},{use_ssl,true},{ssl_options,[{certfile,"cert.pem"},{keyfile,"key.pem"}]}]]}},{overrun_warning,1000},{overrun_handler,{'Elixir.KafkaEx.WorkerPool',producer_overrun_handler}},{overrun_warning,infinity},{max_overrun_warnings,infinity},{workers,N},{worker_opt,[]},{queue_type,fifo}]]}},{restart_type,permanent},{shutdown,5000},{child_type,worker}]

Supervisor 'wpool_pool-kafka_ex_pool-process-sup' had child 'wpool_pool-kafka_ex_pool-9' started with wpool_process:start_link('wpool_pool-kafka_ex_pool-9', 'Elixir.KafkaEx.Server0P10AndLater', [...], [{queue_manager,'wpool_pool-kafka_ex_pool-queue-manager'},{time_checker,'wpool_pool-kafka_ex_pool-time-checker'},...]) at <0.2832.0> exit with reason no case clause matching {badmatch,{error,closed}} in wpool_process:handle_info/2 line 126 in context child_terminated

Shutting down worker #PID<0.2832.0>, reason: {{:case_clause, {:badmatch, {:error, :closed}}}, [{:wpool_process, :handle_info, 2, [file: './lib/worker_pool/src/wpool_process.erl', line: 126]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 637]}, {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 711]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}]}

The error messages relate these code in worker_pool because there's try catch: https://github.com/inaka/worker_pool/blob/4.0.1/src/wpool_process.erl#L126 https://github.com/inaka/worker_pool/blob/4.0.1/src/wpool_utils.erl#L58

After reviewing kafka_ex code, I think the only possible place where could cause {:badmatch, {:error, :closed}} is the following code: https://github.com/kafkaex/kafka_ex/blob/master/lib/kafka_ex/network_client.ex#L65-L73

dantswain commented 2 years ago

Hi @TC-aLi ! Hmm I think the code you linked would handle the {:error, :closed} pattern? Either here https://github.com/kafkaex/kafka_ex/blob/master/lib/kafka_ex/network_client.ex#L77 or here https://github.com/kafkaex/kafka_ex/blob/master/lib/kafka_ex/network_client.ex#L90

If you're getting this value then it seems like the broker closed the connection and it may be prudent for the network client to crash and reconnect? Is there any way you can get a full stack trace?

TC-aLi commented 2 years ago

Hi Dan, thank you for your reply. I made a fix on L65, and the crash disappeared. https://github.com/kafkaex/kafka_ex/blob/master/lib/kafka_ex/network_client.ex#L65

I think you're right that the broker closed the connection. I was seeing this issue relative frequently because the use case has:

  1. KAFKA_CONNECTIONS_MAX_IDLE_MS is set to 10 mins.
  2. https://github.com/kafkaex/kafka_ex/blob/master/lib/kafka_ex/server.ex#L991 uses Enum.find_value/2. If more brokers are in a cluster, it's more likely that one of the connections will be closed on the broker side.
  3. we're using a worker pool.

But still, I think L65 should be handled just L77 and L90 if the connection is closed remotely.