confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
278 stars 3.15k forks source link

Question: non-queue full error handling in produce() #2648

Closed sourcedelica closed 4 years ago

sourcedelica commented 4 years ago

Please provide the following information:

Question

I am a new librdkafka user and I'm building a Idempotent Producer. Here is an excerpt from produce(), mostly stolen from the examples:

        RdKafka::ErrorCode err = producer->produce(...)

        if (err != RdKafka::ERR_NO_ERROR) {
            if (err == RdKafka::ERR__QUEUE_FULL) {
                producer->poll(1000);
                continue;
            } else {
                Logger::logErrorL("{} Failed to produce {} to topic {}: {}", logPrefix(), keyStr, topic, RdKafka::err2str(err));

                std::string fatalErr;
                if (producer->fatal_error(fatalErr)) {
                    Logger::logErrorL("{} Fatal error: {}", logPrefix(), fatalErr);
                    throw KafkaFatalException(fatalErr);
                }

                // TODO: what if not fatal? is it possible? ask edenhill
            }
        }

In the case where the error is not "queue full" and is not fatal, what kind of errors can happen here, if any?

From the Error Handling wiki page it sounds like most errors are handled in the event and delivery callbacks.

Thanks,

edenhill commented 4 years ago

The errors that may be returned from produce() are specified here: https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L3684 (ignore the errnos, look at the RD_KAFKA_RESPERR.. codes)

You're correct that most errors will be reported through the delivery report. As with the delivery report errors, you should consider the message permanently fail if produce() fails with a non-queue error, that is: it is unlikely to be successfully delivered if you were to retry producing it.

The fatal errors on the other hand indicates a fatal state synchronization problem between the idempotent producer and the broker, these are highly unlikely to occur (typically only when the cluster side state is corrupted or on producer or broker bugs), but you still need to handle it since the producer will shut down and be inoperable. For this case you will need to make a decision what to do with the queued messages; since the idempotent delivery gurantees (in order, exactly once) it is up to your business logic to decide if creating a new producer and producing the messages again is a viable option, or if you need to take other steps (dead letter queue, manual intervention, etc).

Also see https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#producer-message-delivery-failure

sourcedelica commented 4 years ago

Thanks!

sourcedelica commented 4 years ago

I have a follow up question:

Below is a sketch of my delivery callback:

Am I making the right assumptions regarding these are the errors we could see in the delivery callback and how to handle?

void KafkaClient::DeliveryReportCb::dr_cb(RdKafka::Message &message) {
    if (message.err()) {
        // Retryable errors - librdkafka will retry:
        //    ERR__TIMED_OUT_QUEUE, ERR__TIMED_OUT, ERR__TRANSPORT
        //    ERR_REQUEST_TIMED_OUT, ERR_NOT_ENOUGH_REPLICAS, ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND
        //    ERR_LEADER_NOT_AVAILABLE, ERR_NOT_LEADER_FOR_PARTITION
        // See https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#producer-message-delivery-failure
        if (message.err() is retryable) {
            Logger::logDebugL("{} Message {} on {} retryable error: {});
            return;
        }

        Logger::logErrorL("{} Message {} delivery failed to topic {}: {});

        // TODO: Handle: ERR__MSG_TIMED_OUT
        //   https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#error-local-time-out
        // TODO: Handle message.status() == RD_KAFKA_MSG_STATUS_{NOT, POSSIBLY}_PERSISTED (Idempotent Producer)
        //   https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#message-persistence-status
        // TODO: All other errors are permanent
        //   https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#error-permanent-errors

    } else if (client.isDebug) {
        Logger::logDebugL("{} Message {} delivered to topic {} [{}] at offset {}");
    }
}
edenhill commented 4 years ago

You should consider any error you get in the delivery report as permanent, the producer will already have retried any retryable errors under the constrains of message.timeout.ms and retries (keep high, no point in limiting the number of retries). You should thus not perform any retries for these failed messages.

sourcedelica commented 4 years ago

Does that mean that these errors will not be seen in the delivery callback and I don't need to ignore them?

  ERR__TIMED_OUT_QUEUE, ERR__TIMED_OUT, ERR__TRANSPORT
  ERR_REQUEST_TIMED_OUT, ERR_NOT_ENOUGH_REPLICAS, 
  ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND
  ERR_LEADER_NOT_AVAILABLE, ERR_NOT_LEADER_FOR_PARTITION
edenhill commented 4 years ago

You may see most of those in the delivery report, with the exception of the LEADER errors and TRANSPORT.

Since the producer has done all in its power to produce the messages, the error code itself is not that meaningful since there isn't much an application can do at this point. The error code is mostly for informational use.

sourcedelica commented 4 years ago

Ok - thanks. I wanted to know how to log and/or report to the application - as an error, warning or not at all.