Closed Arkatufus closed 2 years ago
What happens in these scenarios, per Kafka documentation - vs. what happens in our library?
Documentation burried in the kafka client documentation:
The transactional producer operates on top of the idempotent producer, and provides full exactly-once semantics (EOS) for Apache Kafka when used with the transaction aware consumer (isolation.level=read_committed
).
A producer instance is configured for transactions by setting the transactional.id
to an identifier unique for the application. This id will be used to fence stale transactions from previous instances of the application, typically following an outage or crash.
After creating the transactional producer instance using rd_kafka_new()
the transactional state must be initialized by calling rd_kafka_init_transactions()
. This is a blocking call that will acquire a runtime producer id from the transaction coordinator broker as well as abort any stale transactions and fence any still running producer instances with the same transactional.id
.
Once transactions are initialized the application may begin a new transaction by calling rd_kafka_begin_transaction()
. A producer instance may only have one single on-going transaction.
Any messages produced after the transaction has been started will belong to the ongoing transaction and will be committed or aborted atomically. It is not permitted to produce messages outside a transaction boundary, e.g., before rd_kafka_begin_transaction()
or after rd_kafka_commit_transaction()
, rd_kafka_abort_transaction()
, or after the current transaction has failed.
If consumed messages are used as input to the transaction, the consumer instance must be configured with enable.auto.commit
set to false
. To commit the consumed offsets along with the transaction pass the list of consumed partitions and the last offset processed + 1 to rd_kafka_send_offsets_to_transaction()
prior to committing the transaction. This allows an aborted transaction to be restarted using the previously committed offsets.
To commit the produced messages, and any consumed offsets, to the current transaction, call rd_kafka_commit_transaction()
. This call will block until the transaction has been fully committed or failed (typically due to fencing by a newer producer instance).
Alternatively, if processing fails, or an abortable transaction error is raised, the transaction needs to be aborted by calling rd_kafka_abort_transaction()
which marks any produced messages and offset commits as aborted.
After the current transaction has been committed or aborted a new transaction may be started by calling rd_kafka_begin_transaction()
again.
Retriable errors Some error cases allow the attempted operation to be retried, this is indicated by the error object having the retriable flag set which can be detected by calling rd_kafka_error_is_retriable(). When this flag is set the application may retry the operation immediately or preferably after a shorter grace period (to avoid busy-looping). Retriable errors include timeouts, broker transport failures, etc.
Abortable errors
An ongoing transaction may fail permanently due to various errors, such as transaction coordinator becoming unavailable, write failures to the Apache Kafka log, under-replicated partitions, etc. At this point the producer application must abort the current transaction using rd_kafka_abort_transaction()
and optionally start a new transaction by calling rd_kafka_begin_transaction()
. Whether an error is abortable or not is detected by calling rd_kafka_error_txn_requires_abort()
on the returned error object.
Fatal errors
While the underlying idempotent producer will typically only raise fatal errors for unrecoverable cluster errors where the idempotency guarantees can't be maintained, most of these are treated as abortable by the transactional producer since transactions may be aborted and retried in their entirety; The transactional producer on the other hand introduces a set of additional fatal errors which the application needs to handle by shutting down the producer and terminate. There is no way for a producer instance to recover from fatal errors. Whether an error is fatal or not is detected by calling rd_kafka_error_is_fatal()
on the returned error object or by checking the global rd_kafka_fatal_error()
code. Fatal errors are raised by triggering the error_cb
(see the Fatal error chapter in INTRODUCTION.md for more information), and any sub-sequent transactional API calls will return RD_KAFKA_RESP_ERR__FATAL
or have the fatal flag set (see rd_kafka_error_is_fatal()
). The originating fatal error code can be retrieved by calling rd_kafka_fatal_error()
.
Handling of other errors For errors that have neither retriable, abortable or the fatal flag set it is not always obvious how to handle them. While some of these errors may be indicative of bugs in the application code, such as when an invalid parameter is passed to a method, other errors might originate from the broker and be passed thru as-is to the application. The general recommendation is to treat these errors, that have neither the retriable or abortable flags set, as fatal.
Error handling example
retry:
rd_kafka_error_t *error;
error = rd_kafka_commit_transaction(producer, 10*1000);
if (!error)
return success;
else if (rd_kafka_error_txn_requires_abort(error)) {
do_abort_transaction_and_reset_inputs();
} else if (rd_kafka_error_is_retriable(error)) {
rd_kafka_error_destroy(error);
goto retry;
} else { // treat all other errors as fatal errors
fatal_error(rd_kafka_error_string(error));
}
rd_kafka_error_destroy(error);
NOTE: Some of these transaction codes might already be handled by the C# client, need to figure out what it does under the hood and adapt to it.
From these documentation in dotnet kafka: https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/ProducerBuilder.cs#L210-L222
/// <summary>
/// Set the handler to call on error events e.g. connection failures or all
/// brokers down. Note that the client will try to automatically recover from
/// errors that are not marked as fatal. Non-fatal errors should be interpreted
/// as informational rather than catastrophic.
/// </summary>
/// <remarks>
/// Executes on the poll thread (by default, a background thread managed by
/// the producer).
///
/// Exceptions: Any exception thrown by your error handler will be silently
/// ignored.
/// </remarks>
I guess it is a bad idea to base our failure decision based on the error handler, unless it is a fatal error. It claims that it will try to recover to anything else besides a fatal error, though I don't know if we'll ever get a callback if their recovery failed.
Based on this issue, partition EOF does not mean that the partition is empty, it is just a "suggestion" that it might. Will need to ignore this.
Comsumer<TKey, TValue>.Consume()
can throw exceptions:
ConsumeException This exception can be recovered, the original message is embedded inside the exception as a ConsumeResult<byte[], byte[]>. Thrown when:
Any exceptions that was thrown in any handler (statistics, rebalance, and commit) When this is encountered (probably caused by async calls), the current message is instantly destroyed and the exception re-thrown instead.
OperationCancelledException, when Consume is called using a CancellationToken and token is cancelled.
Need to improve how the producer code handles Kafka Error report by wrapping
IProducer<K, V>.Produce
with try...catch and only fail the stage based on an exception thrown from there. Error object coming from the delivery report should not cause a failed stage.