zendesk / racecar

Racecar: a simple framework for Kafka consumers in Ruby
Apache License 2.0
486 stars 93 forks source link

Add observality for producer errors #336

Closed leonmaia closed 1 year ago

leonmaia commented 1 year ago

Currently, we lack observability to detect errors when using the Standalone Producer. This pull request introduces improvements to error handling and instrumentation in standalone producer. The primary goal is to enhance visibility into producer errors. It's using delivery callback to instrument events for both successful and failed deliveries.

Here are stats and exceptions in various scenarios

  # partition 99 does not exist and is used to trigger error
  # huge msg payload is used to trigger another error
  huge = "HUGE" * 10000

  # Sync: success
  # expected stats
  # [StatsD] racecar.producer.ack.messages 1|c| client:racecar
  # [StatsD] racecar.producer.produce.messages 1|c| client:racecar topic:messages
  #
  # No exceeption raised
  Racecar.produce_sync(value: "test message #{Time.now}", topic: "messages")

  # Sync: failure, invalid partition
  # expected stats
  # [StatsD] racecar.producer.produce.delivery.errors 1|c| client:racecar
  # [StatsD] racecar.producer.produce.errors 1|c| client:racecar topic:messages
  # [StatsD] racecar.producer.produce.messages 1|c| client:racecar topic:messages
  #
  # Following exception is raised
  # #<Racecar::MessageDeliveryError: Message delivery finally failed:
  # Local: Unknown partition (unknown_partition)
  #
  Racecar.produce_sync(value: "test message #{Time.now}", topic: "messages", partition: 99)

  # Sync: failure, huge message
  # expected stats
  # [StatsD] racecar.producer.produce.errors 1|c| client:racecar topic:messages
  # [StatsD] racecar.producer.produce.messages 1|c| client:racecar topic:messages
  #
  # Following exception is raised
  # #<Racecar::MessageDeliveryError: Message delivery finally failed:
  # Broker: Message size too large (msg_size_too_large)
  #
  Racecar.produce_sync(value: "test message #{huge} #{Time.now}", topic: "messages")

  # Async: failure, huge message
  # expected stats
  # [StatsD] racecar.producer.produce.errors 1|c| client:racecar topic:messages
  # [StatsD] racecar.producer.produce.messages 1|c| client:racecar topic:messages
  #
  # Following exception is raised
  # #<Racecar::MessageDeliveryError: Message delivery finally failed:
  # Broker: Message size too large (msg_size_too_large)
  #
  Racecar.produce_async(value: "test message #{huge} #{Time.now}", topic: "messages")

  # Async: failure, invalid partition
  # expected stats
  # [StatsD] racecar.producer.produce.messages 1|c| client:racecar topic:messages
  # [StatsD] racecar.producer.produce.delivery.errors 1|c| client:racecar
  #
  # No Exception raised
  #
  Racecar.produce_async(value: "test message #{Time.now}", topic: "messages", partition: 99)

  # Async: success
  # expected stats
  # [StatsD] racecar.producer.produce.messages 1|c| client:racecar topic:messages
  # [StatsD] racecar.producer.ack.messages 1|c| client:racecar
  #
  # This should not raise any exception
  Racecar.produce_async(value: "test message #{Time.now}", topic: "messages")

NOTE: In some cases there error is instrumented twice, once via delivery callback and once via exception handling, for example using sync producer with invalid partition. This happens with sync because handle.wait raises an error, and delivery callback also reports the error.