kafkaex / kafka_ex

Kafka client library for Elixir
MIT License
596 stars 162 forks source link

Consumer timeouts #425

Open ulutomaz opened 3 years ago

ulutomaz commented 3 years ago

Versions:

OS: Ubuntu 16.04 LTS Erlang/OTP: 22.2.2 Elixir: 1.9.4 Kafka_ex: 0.11.0 Kafka cluster (AWS MSK): 2.2.1 Kafka topic (usual setup): 1 topic with 9 partitions

Overview:

We run our setup on AWS (managed Kafka - MSK) and our codebase lives on Linux based EC2. Our setup is distributed, where out of 3 nodes each takes 3 partitions. We run kafka_ex for a few years now, but lately, we experience this strange problem where we lose messages. We did some support / debugging with the AWS support team where on the side of managed Kafka (brokers, interconnection, etc) nothing seems to be wrong. No connection timeouts or anything can be found in logs.

Though on our side in logs we get errors like the one below (from time to time). Out of my head... the last error like this was about two weeks ago.

We did put some effort already into debugging but all things point to this due to the fact that missing messages somehow correlate with the time of this error.

Our codebase can handle this "interrupt" and runs further, but as said we experience message loss.

Error:

2020-11-17 10:06:27.594 [error] Receiving data from broker "<HERE IS OUR KAFKA ADDRESS>":9092 failed with :timeout
2020-11-17 10:06:28.810 [error] GenServer #PID<0.4413.0> terminating
** (MatchError) no match of right hand side value: {:error, :timeout}
    (kafka_ex) lib/kafka_ex/gen_consumer.ex:847: KafkaEx.GenConsumer.commit/1
    (kafka_ex) lib/kafka_ex/gen_consumer.ex:643: KafkaEx.GenConsumer.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: :timeout
State: %KafkaEx.GenConsumer.State{acked_offset: 197475917, api_versions: %{fetch: 0, offset_commit: 0, offset_fetch: 0}, auto_offset_reset: :earliest, commit_interval: 1000, commit_threshold: 100, committed_offset: 197475917, consumer_module: OurKafka.Consumer.Consumer, consumer_state: %{partition: 1, topic: "aws-connect"}, current_offset: 197475917, fetch_options: [auto_commit: false, worker_name: #PID<0.4414.0>], generation_id: 1, group: "aws-connect_consumer_group", last_commit: -576066016834, member_id: "kafka_ex-e2e8bd1c-25a3-4c7a-be69-6cbc12a6223d", partition: 1, topic: "aws-connect", worker_name: #PID<0.4414.0>}

Other

BlueCollarChris commented 3 years ago

@ulutomaz We are also experiencing the same issue when using MSK with ssl enabled

joshuawscott commented 3 years ago

We've fixed a number of issues around how the consumers and their supervision works, so it wouldn't surprise me to learn there are more issues.

That said, Kafka should not allow you to lose messages if you are not committing an offset until you fully process that offset. I would ensure this is not happening (often an issue is using something like Tasks or GenServer.cast, and allowing the handle_message_set function to return before ensuring all the messages are processed)

I've also seen a number of issues where KafkaEx will detect timeouts where the kafka server doesn't log them. I'm frankly not sure why this happens, but increasing the sync_timeout setting to something much higher than default (say 10-15 seconds) avoids this.

TC-aLi commented 2 years ago

@ulutomaz If you still have the issue, you can try an OTP upgrade https://github.com/kafkaex/kafka_ex/issues/389#issuecomment-1062515469 BTW, we're also on AWS MSK (2.3.1).

erapert-simplifi commented 1 year ago

Encountering this issue

01:09:37.043 [error] Receiving data from broker "REDACTED":9092 failed with :timeout
01:09:37.077 [error] GenServer #PID<0.2312.0> terminating
** (MatchError) no match of right hand side value: {:error, :timeout}
(kafka_ex 0.13.0) lib/kafka_ex/gen_consumer.ex:877: KafkaEx.GenConsumer.commit/1
(kafka_ex 0.13.0) lib/kafka_ex/gen_consumer.ex:668: KafkaEx.GenConsumer.handle_info/2
(stdlib 3.17.2) gen_server.erl:695: :gen_server.try_dispatch/4
(stdlib 3.17.2) gen_server.erl:771: :gen_server.handle_msg/6
(stdlib 3.17.2) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: :timeout

This appears to also be the culprit for sockets not being cleaned up and so the file descriptor count climbs eventually to the ulimit which then prevents more sockets from being opened and the program crashes.

This is happening on relatively recent versions of elixir + erlang on version 0.13 of KafkaEx:

Erlang/OTP 24 [erts-12.3.2.1] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:1]
Interactive Elixir (1.14.2) - press Ctrl+C to exit (type h() ENTER for help)

Update: using sync_timeout: 15000 reduces the number of timeouts but does not eliminate it in my case