karafka / karafka

Ruby and Rails efficient Kafka processing framework
https://karafka.io
Other
2.06k stars 175 forks source link

Exposed Consumer Heartbeat does not seem to work as expected #543

Closed OPhamster closed 5 years ago

OPhamster commented 5 years ago

Background

We use karafka to process messages in consumers that interact with 2 separate databases. In certain cases - due to the way we handle the operations defined in the messages - the consumer is stuck waiting for the database(s) to complete an expensive update. Ultimately the problem lies with each message not translating to exactly one operation - but changing this would be extremely time consuming. Its difficult to estimate the cost of a single message as this changes with time. So as an alternative we decided to use the exposed consumer heartbeat to send heartbeats periodically to signify that the consumer was alive even though it's stuck processing the same batch.

Expected behavior

Expected the consumers to send heartbeats periodically inspite of the consumer not having finished processing the params_batch within the session_timeout and not be rebalanced every so often when an expensive message(s) comes along.

Actual behavior

The consumer doesn't seem to send hearbeats when we trigger them manually (it does not seem to show up in the logs). This causes the consumers to respond late and commit an offset + send a heartbeat after kafka has decided that the consumer is dead and triggered a rebalance. This causes the cluster to be stuck in a barrage of rebalance operations for some time.

Steps to reproduce the problem

The code given below is the asynchronous heartbeat controller that we've implemented to handle sending the heartbeats in a separate timed thread - ignoring whatever the state of the consumer is. We have only instance of this controller start up when the karafka server starts up.

# frozen_string_literal: true

module Kafka
  module Utils
    class AsyncHeartbeatController
      include Singleton
      attr_reader :task
      HEARTBEAT_CALL = :trigger_heartbeat!

      class << self
        def logger
          @logger ||= if ENV['KARAFKA_ENV'].present?
                        Karafka::App.logger
                      else
                        Rails.logger
                      end
        end
      end

      # @note the consumer_map is a map of <consumer_object_id:consumer_object>
      #       which is used to maintain of all the consumers that are currently
      #       running in the server.
      def initialize
        @consumer_map = Concurrent::Map.new
        periodic_heartbeat
      end

      # @note registers the consumer - attempts this every time consume is
      #       called.
      # @param consumer [BaseConsumer]
      def register_consumer(consumer)
        @consumer_map.compute_if_absent(consumer.object_id) do
          consumer
          self.class.logger.debug 'Registered consumer'
        end
      end

      # @note drops the consumer from the heartbeat list
      # @param consumer [BaseConsumer]
      def deregister_consumer(consumer)
        self.class.logger.debug 'De-Registering consumer'
        @consumer_map.delete(consumer.object_id)
      end

      # @note performs the periodic heartbeat of all the live consumers
      def periodic_heartbeat
        @task = Concurrent::TimerTask.new(
          execution_interval: Settings.kafka_heartbeat_in_seconds
        ) do
          trigger_heartbeat
        end
        @task.execute
      end

      # trigger_heartbeat of all consumers
      def trigger_heartbeat
        self.class.logger.debug 'Triggering Heartbeats Manually'
        @consumer_map.values.each do |consumer|
          consumer.send HEARTBEAT_CALL
        end
      end

      def stop
        task&.shutdown
      end
    end
  end
end

Your setup details

Karafka framework version: 1.2.13
Ruby-kafka version: ruby-kafka (0.7.10)
Ruby version: 2.6.3
Application client id: production_ruby
Backend: inline
Batch fetching: true
Batch consuming: true
Boot file: /app/karafka.rb
Environment: production
Kafka seed brokers: ["kafka://kafka-1:9092",
                     "kafka://kafka-2:9092",
                     "kafka://kafka-3:9092",
                     "kafka://kafka-4:9092"]
mensfeld commented 5 years ago

@OPhamster wow that is a high quality bug report! Kudos!

First things first: can you try 1.3.0?

Let me reproduce that myself today.

OPhamster commented 5 years ago

Ah sure thing - will try it out and get back to you on this.

mensfeld commented 5 years ago

@OPhamster I created a simple POC based on the example app. Here's the controller and the logger. Please be aware, that the heartbeat info is logged only in the debug mode.

class App < Karafka::App
  setup do |config|
    # Karafka will auto-discover kafka_hosts based on Zookeeper but we need it set manually
    # to run tests without running kafka and zookeeper
    config.kafka.seed_brokers = [ENV['KAFKA_HOST'] || 'kafka://127.0.0.1:9092']
    config.client_id = 'example_app'

    logger = ::Karafka::Instrumentation::Logger.new
    logger.level = 0

    config.logger = logger
  end

  monitor.subscribe('app.initialized') do
    WaterDrop.setup { |config| config.deliver = !Karafka.env.test? }
  end
end
# frozen_string_literal: true

# Namespace for everything related to our small ping-pong game
module Pong
  # Catches the ping and uses PingResponder to respond on a pong topic
  class PingConsumer < ApplicationConsumer
    # We increase the pings counter and respond
    def consume
      cclient = client
      @a ||= Thread.new do
        loop do
          begin
            cclient.trigger_heartbeat!
          rescue Exception => e
            p e
          end
          p 'aaaaaaaaaaaaaa'
          sleep(1)
        end
      end
      sleep(60)
      counter = params_batch.last.payload['counter'] + 1
      # The initial ping needs to be triggered via the rake task
      respond_with(counter: counter)
    end
  end
end

example output where you can see the heartbeat being sent every 1 second

D, [2019-09-12T13:41:59.862854 #32669] DEBUG -- : [heartbeat] Received response 149 from 127.0.0.1:9092
"aaaaaaaaaaaaaa"
D, [2019-09-12T13:41:59.909285 #32669] DEBUG -- : [[example_app_batched_group] {xml_data: 0; inline_batch_data: 0; callbacked_data: 0}:] [fetch] Received response 144 from 127.0.0.1:9092
D, [2019-09-12T13:41:59.909785 #32669] DEBUG -- : [[example_app_batched_group] {xml_data: 0; inline_batch_data: 0; callbacked_data: 0}:] Fetching batches
D, [2019-09-12T13:41:59.910410 #32669] DEBUG -- : [[example_app_batched_group] {xml_data: 0; inline_batch_data: 0; callbacked_data: 0}:] [fetch] Sending fetch API request 145 to 127.0.0.1:9092
D, [2019-09-12T13:41:59.911038 #32669] DEBUG -- : [[example_app_batched_group] {xml_data: 0; inline_batch_data: 0; callbacked_data: 0}:] [fetch] Waiting for response 145 from 127.0.0.1:9092
^CI, [2019-09-12T13:42:00.484529 #32669]  INFO -- : Received SIGINT system signal
I, [2019-09-12T13:42:00.484676 #32669]  INFO -- : Stopping Karafka server 32669
W, [2019-09-12T13:42:00.708872 #32669]  WARN -- : [[example_app_async_pong] {ping: 0; pong: 0}:] Reached max fetcher queue size (10), sleeping 1s
D, [2019-09-12T13:42:00.863116 #32669] DEBUG -- : Sending heartbeat...
D, [2019-09-12T13:42:00.863201 #32669] DEBUG -- : [heartbeat] Sending heartbeat API request 150 to 127.0.0.1:9092
D, [2019-09-12T13:42:00.863290 #32669] DEBUG -- : [heartbeat] Waiting for response 150 from 127.0.0.1:9092
D, [2019-09-12T13:42:00.863755 #32669] DEBUG -- : [heartbeat] Received response 150 from 127.0.0.1:9092
"aaaaaaaaaaaaaa"
D, [2019-09-12T13:42:00.914092 #32669] DEBUG -- : [[example_app_batched_group] {xml_data: 0; inline_batch_data: 0; callbacked_data: 0}:] [fetch] Received response 145 from 127.0.0.1:9092
D, [2019-09-12T13:42:00.914570 #32669] DEBUG -- : [[example_app_batched_group] {xml_data: 0; inline_batch_data: 0; callbacked_data: 0}:] Handling fetcher command: stop
I, [2019-09-12T13:42:00.914700 #32669]  INFO -- : [example_app_batched_group] {xml_data: 0; inline_batch_data: 0; callbacked_data: 0}: Fetcher thread exited.
I, [2019-09-12T13:42:00.914898 #32669]  INFO -- : [[example_app_batched_group] {}:] Disconnecting broker 1001
D, [2019-09-12T13:42:00.914990 #32669] DEBUG -- : [[example_app_batched_group] {}:] Closing socket to 127.0.0.1:9092
I, [2019-09-12T13:42:00.915126 #32669]  INFO -- : [[example_app_batched_group] {}:] Leaving group `example_app_batched_group`
D, [2019-09-12T13:42:00.915207 #32669] DEBUG -- : [[example_app_batched_group] {}:] [leave_group] Opening connection to 127.0.0.1:9092 with client id example_app...
D, [2019-09-12T13:42:00.915356 #32669] DEBUG -- : [[example_app_batched_group] {}:] [leave_group] Sending leave_group API request 1 to 127.0.0.1:9092
D, [2019-09-12T13:42:00.915456 #32669] DEBUG -- : [[example_app_batched_group] {}:] [leave_group] Waiting for response 1 from 127.0.0.1:9092
D, [2019-09-12T13:42:00.916666 #32669] DEBUG -- : [[example_app_batched_group] {}:] [leave_group] Received response 1 from 127.0.0.1:9092
W, [2019-09-12T13:42:01.709037 #32669]  WARN -- : [[example_app_async_pong] {ping: 0; pong: 0}:] Reached max fetcher queue size (10), sleeping 1s
OPhamster commented 5 years ago

Ah - probably something wrong on my end. To be clear - this is using 1.3.0 right ? I don't think I recognize payload as a callable method. If so - Then I'll update the code to use this version and get back on it. Thanks :+1:

mensfeld commented 5 years ago

@OPhamster yes. The example app is 1.3.0. Upgrade is fairly simple. Here you have the changelog + upgrade notes: https://mensfeld.pl/2019/09/karafka-framework-1-3-0-release-notes-ruby-kafka/

Please close once works. If problem still persists, ping me and I will do my best to help you out!

mensfeld commented 5 years ago

Closing due to lack of activity (and it seems to work).