karafka / waterdrop

Standalone Karafka library for producing Kafka messages
https://karafka.io
Other
254 stars 39 forks source link

Transactional sync producer raises a 'Purged in queue' exception when producing messages via the same producer that previously failed to produce messages #501

Closed Aerdayne closed 3 months ago

Aerdayne commented 3 months ago

It seems that when a transactional producer fails to produce messages (in this case when the message passes the local WaterDrop payload size validation due to a misconfiguration, and is then blocked by rdkafka's message size validation), the subsequent usage of such producer needs to be wrapped in a retry, as it will always raise a #<Rdkafka::RdkafkaError: Local: Purged in queue (purge_queue)> exception.

Is this the intended behaviour? If so, then even if there are multiple transactional producers wrapped in a connection pool, as the documentation recommends, would this mean that all message producing should be wrapped in a retry? Or should the producers be discarded instead?

Here's a reproduction script that should be ran against the current main branch.

OS: Darwin Kernel Version 22.6.0

librdkafka version: 2.1.1_1

Kafka was ran via WaterDrop's compose file locally.

#!/usr/bin/env ruby

require 'bundler/inline'

gemfile do
  source 'https://rubygems.org'

  gem 'waterdrop', path: '.'
  gem 'karafka' # Admin API
end

require 'prettyprint'
require 'securerandom'

topic = SecureRandom.hex(10)

max_message_bytes = 128
max_payload_size = 256

Karafka::App.setup do |karafka_config|
  karafka_config.kafka = {
    'bootstrap.servers': '0.0.0.0:9092'
  }
  karafka_config.max_wait_time = 500
  karafka_config.client_id = SecureRandom.alphanumeric(10)
end

Karafka::Admin.create_topic(topic, 1, 1, { 'max.message.bytes': max_message_bytes })

at_exit do
  Karafka::Admin.delete_topic(topic)
end

producer = ::WaterDrop::Producer.new do |wd_config|
  wd_config.id = SecureRandom.hex(6)
  wd_config.deliver = true

  wd_config.kafka[:'bootstrap.servers'] = '0.0.0.0:9092'
  wd_config.kafka[:'transactional.id'] = SecureRandom.hex(6)
  wd_config.kafka[:'max.in.flight'] = 5
  wd_config.kafka[:'enable.idempotence'] = true

  wd_config.max_wait_timeout = 10_000
  wd_config.max_payload_size = max_payload_size
end

producer.monitor.subscribe(WaterDrop::Instrumentation::LoggerListener.new(Logger.new(STDOUT)))

payloads =
  [
    { topic:, payload: 'a' },
    { topic:, payload: 'b' },
    { topic:, payload: 'c' },
    # This message will fail to get dispatched as it will definitely exceed max.message.bytes
    { topic:, payload: max_message_bytes.times.map { 'd' }.join },
    { topic:, payload: 'e' },
    { topic:, payload: 'f' }
  ]

begin
  # Wrapped in a transaction for explicitness
  producer.transaction do
    producer.produce_many_sync(payloads)
  end
rescue StandardError => e
  puts "\n#{'=' * 80}\nRaised by the first call:\n#{e.inspect}\n#{'=' * 80}\n\n"
end

retry_count = 0

# The subsequent production of messages has to be wrapped in a retry,
# as in this case the producer will raise a
# #<Rdkafka::RdkafkaError: Local: Purged in queue (purge_queue)>
# exception on the first attempt.
begin
  # Wrapped in a transaction for explicitness
  producer.transaction do
    producer.produce_many_sync([{ topic:, payload: 'g', headers: { 'a' => '1' } }])
  end
rescue StandardError => e
  raise unless retry_count < 3

  puts "\n#{'=' * 80}\nRaised by the second call:\n#{e.inspect}\n#{'=' * 80}\n\n"

  retry_count += 1
  retry
end

consumer_config = {
  'bootstrap.servers': '0.0.0.0:9092',
  'group.id': SecureRandom.alphanumeric(10),
  'auto.offset.reset': 'earliest',
  'enable.auto.offset.store': 'false',
  'partition.assignment.strategy': 'range,roundrobin',
  'allow.auto.create.topics': 'true'
}

consumer = Rdkafka::Config.new(
  Karafka::Setup::AttributesMap.consumer(consumer_config)
).consumer

sleep(5)

consumer_thread = Thread.new do
  consumer.subscribe(topic)

  loop do
    data = consumer.poll(500)
    pp data unless data.nil?
  end

  consumer.close
end

at_exit do
  consumer_thread.kill
end

sleep

Output (consumer output omitted, the final message will obviously be printed out):

I, [2024-06-06T12:39:41.122920 #74403]  INFO -- : [58752a731890] Starting transaction took 29.76900005340576 ms
E, [2024-06-06T12:39:41.164938 #74403] ERROR -- : [58752a731890] Error occurred: Broker: Message size too large (msg_size_too_large) - librdkafka.dispatch_error
E, [2024-06-06T12:39:41.165002 #74403] ERROR -- : [58752a731890] Error occurred: Broker: Message size too large (msg_size_too_large) - librdkafka.dispatch_error
E, [2024-06-06T12:39:41.165016 #74403] ERROR -- : [58752a731890] Error occurred: Broker: Message size too large (msg_size_too_large) - librdkafka.dispatch_error
E, [2024-06-06T12:39:41.165027 #74403] ERROR -- : [58752a731890] Error occurred: Broker: Message size too large (msg_size_too_large) - librdkafka.dispatch_error
E, [2024-06-06T12:39:41.165037 #74403] ERROR -- : [58752a731890] Error occurred: Broker: Message size too large (msg_size_too_large) - librdkafka.dispatch_error
E, [2024-06-06T12:39:41.165047 #74403] ERROR -- : [58752a731890] Error occurred: Broker: Message size too large (msg_size_too_large) - librdkafka.dispatch_error
E, [2024-06-06T12:39:41.165302 #74403] ERROR -- : rdkafka: [thrd:127.0.0.1:9092/1]: Current transaction failed in state InTransaction: ProduceRequest for 9a2a904926509e9dcc52 [0] with 6 message(s) failed: Broker: Message size too large (broker 1 PID{Id:2001,Epoch:0}, base seq 0): current transaction must be aborted (MSG_SIZE_TOO_LARGE)
E, [2024-06-06T12:39:41.165648 #74403] ERROR -- : [58752a731890] Error occurred: #<Rdkafka::RdkafkaError: Broker: Message size too large (msg_size_too_large)> - messages.produce_many_sync
I, [2024-06-06T12:39:41.178251 #74403]  INFO -- : [58752a731890] Aborting transaction took 12.416000008583069 ms

================================================================================
Raised:
#<WaterDrop::Errors::ProduceManyError: #<Rdkafka::RdkafkaError: Broker: Message size too large (msg_size_too_large)>>
================================================================================

I, [2024-06-06T12:39:41.179337 #74403]  INFO -- : [58752a731890] Starting transaction took 0.9839999675750732 ms
E, [2024-06-06T12:39:41.215598 #74403] ERROR -- : rdkafka: [thrd:127.0.0.1:9092/1]: Current transaction failed in state InTransaction: skipped sequence numbers (OUT_OF_ORDER_SEQUENCE_NUMBER, requires epoch bump)
E, [2024-06-06T12:39:41.215669 #74403] ERROR -- : [58752a731890] Error occurred: #<Rdkafka::RdkafkaError: Local: Purged in queue (purge_queue)> - messages.produce_many_sync
I, [2024-06-06T12:39:41.224242 #74403]  INFO -- : [58752a731890] Aborting transaction took 8.38100004196167 ms

================================================================================
Raised:
#<WaterDrop::Errors::ProduceManyError: #<Rdkafka::RdkafkaError: Local: Purged in queue (purge_queue)>>
================================================================================

I, [2024-06-06T12:39:41.224387 #74403]  INFO -- : [58752a731890] Starting transaction took 0.05800008773803711 ms
I, [2024-06-06T12:39:41.235355 #74403]  INFO -- : [58752a731890] Sync producing of 1 messages to 1 topics took 10.894999980926514 ms
D, [2024-06-06T12:39:41.235427 #74403] DEBUG -- : [58752a731890] [{:topic=>"9a2a904926509e9dcc52", :payload=>"g", :headers=>{"a"=>"1"}}]
I, [2024-06-06T12:39:41.238042 #74403]  INFO -- : [58752a731890] Committing transaction took 2.5579999685287476 ms
I, [2024-06-06T12:39:41.238098 #74403]  INFO -- : [58752a731890] Processing transaction took 13.77999997138977 ms
mensfeld commented 3 months ago

I can confirm that sub-sequent post raise error dispatch indeed causes an issue on next dispatch. Though I am getting a different error: out_of_order_sequence_number. Nonetheless the state is indeed leaking out of failed operation.

Aerdayne commented 3 months ago

FWIW, on my end the listener detects E, [2024-06-06T12:39:41.215598 #74403] ERROR -- : rdkafka: [thrd:127.0.0.1:9092/1]: Current transaction failed in state InTransaction: skipped sequence numbers (OUT_OF_ORDER_SEQUENCE_NUMBER, requires epoch bump), although it does not raise that.

mensfeld commented 3 months ago

Yes I can repro now both things. This is indeed not raised because it is intermediate and goes via events listener. I also get the error you mentioned. Let me reply to you on that:

in this case when the message passes the local WaterDrop payload size validation due to a misconfiguration

Well it is on you ;) There were reasons for me to introduce this check

Or should the producers be discarded instead?

Since it is working I can assume it would continue to work, though I do not like a solution that retries on operations of this nature always. I think that what needs to happen here is a retry on conditional raise on failure of last transaction, since transactional state does not have race conditions. For now granular and limited to the queue purge that needs to happen on broker errors.

mensfeld commented 3 months ago

Ok. I know what is happening. The PID is updated because of the broker error and on next request it needs to be bumped. inflight requests need to be drained for it to happen, hence the purge_queue. In theory after the bump things should work however the general recommendation from librdkafka code is to treat those errors as fatals and reload the producer. What that means is, that any broker error that bubbles up should cause a client reload.

mensfeld commented 3 months ago

@Aerdayne here's my proposition how to fix this: https://github.com/karafka/waterdrop/pull/503

It works well. Can you confirm? (I'll add relevant specs soon)

Aerdayne commented 3 months ago

Confirming that the fix works. Thanks!