kafkaex / kafka_ex

Kafka client library for Elixir
MIT License
598 stars 162 forks source link

GenConsumer returning timeout #310

Closed jruhland closed 6 years ago

jruhland commented 6 years ago

We're having an issue with our GenConsumer intermittely reporting timeouts and I'm here seeking some assistance in root cause diagnosis. Current theory is consumer is aggressively timing out the requests and were not handling the returned timeout, but were mostly concerned with why its timing out in the first place.

Running elixir 1.5 so the stacktraces arent particularly helpful to us.

elixir stacktrace (appears intermittently)

GenServer #PID<0.23548.2> terminating
** (MatchError) no match of right hand side value: :timeout
    (kafka_ex) lib/kafka_ex/gen_consumer.ex:612: KafkaEx.GenConsumer.consume/1
    (kafka_ex) lib/kafka_ex/gen_consumer.ex:566: KafkaEx.GenConsumer.handle_info/2
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: :timeout

corresponding kafka log (this appears only once)

[2018-09-19 14:28:46,063] WARN Attempting to send response via channel for which there is no open connection, connection id 10.0.152.162:9092-10.0.164.54:48803-408409 (kafka.network.Processor)

this is all the stacktrace were getting, and our kafka brokers themselves are not reporting this issue 1 to 1, more like 25 to 1, or less. It happens only after long lulls in message throughput. (like in the middle of the night when were not actively using our development environment).

Appreciate any feedback, and happy to submit patches if we can find root cause.

joshuawscott commented 6 years ago

Are you on the latest 0.8.3 release @jruhland ?

jruhland commented 6 years ago

yessir

joshuawscott commented 6 years ago

I rarely see this issue now that I have a heartbeat of 1 second. I am not entirely sure if that was the fix or not. e.g. with a child spec:

alias KafkaEx.ConsumerGroup
%{
  id: ConsumerGroup,
  start: {
    ConsumerGroup,
    :start_link,
    [MyGenConsumer, consumer_group_name, topic_names, [heartbeat_interval: 1_000]]
  },
  type: :supervisor
}

(basically passing a keyword list with heartbeat_interval to the ConsumerGroup.start_link/4 function)

If you want to have a more live discussion, feel free to ping me on the Elixir slack https://elixir-slackin.herokuapp.com/ in the #kafkaex channel

jruhland commented 6 years ago

👍 ill try it out. going to close and chat there if needed

aportnov commented 5 years ago

I am having the same problem:

GenServer #PID<0.576.0> terminating (stop) exited in: GenServer.call(#PID<0.577.0>, {:fetch, %KafkaEx.Protocol.Fetch.Request{auto_commit: false, client_id: nil, correlation_id: nil, max_bytes: 1000000, min_bytes: 1, offset: 1002, partition: 1, topic: "elixir.echo.updates", wait_time: 10}}, 5000) (EXIT) time out (elixir) lib/gen_server.ex:989: GenServer.call/3 (kafka_ex) lib/kafka_ex/gen_consumer.ex:628: KafkaEx.GenConsumer.consume/1 (kafka_ex) lib/kafka_ex/gen_consumer.ex:579: KafkaEx.GenConsumer.handle_info/2 (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4 (stdlib) gen_server.erl:711: :gen_server.handle_msg/6 (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3 Last message: :timeout State: %KafkaEx.GenConsumer.State{acked_offset: 1002, auto_offset_reset: :none, commit_interval: 1000, commit_threshold: 100, committed_offset: 1002, consumer_module: Common.Kafka.Consumers.Echo, consumer_state: nil, current_offset: 1002, fetch_options: [auto_commit: false, worker_name: #PID<0.577.0>], group: "elixir_chat_group", last_commit: -576457665677, partition: 1, topic: "elixir.echo.updates", worker_name: #PID<0.577.0>}

"kafka_ex": {:hex, :kafka_ex, "0.9.0", "d653b38f2584627965c71a118d9be69bf6bd3174d5f3dca29e071c928af2f53a", [:mix], [], "hexpm"},