When using pass-through messages via ProducerMessage.passThrough in combination with Producer.committableSink, the incoming messages used to pass-through via ProducerMessage.passThrough should be correctly committed in the same way as if a "real" ProducerMessage was sent through to Producer.committaleSink.
Actual Behavior
The internal outstanding commit count in CommittingProducerSinkStage is not maintained correctly when pass-through messages (without a ProducerRecord) are handled, which means that the general accounting for how many commits are outstanding breaks. This causes the general logic around commit handling to break, and results in a stream that doesn't end at all -- instead, it enters a limbo state where it doesn't complete. If an exception occurs within the stream, the error is not reported correctly (or at all), and the stream still does not complete.
The core underlying cause appears to be that the awaitingCommitResult field of CommittingProducerSinkStageLogic does not get incremented for pass-through messages that are processed, but it does get decremented in the commit callback for all processed messages (i.e. including pass-through messages). This results in awaitingCommitResult having a negative value. As all stream-ending logic is based on waiting until awaitingCommitResult reaches 0, and the awaitingCommitResult can never reach 0 once it is negative, the whole stream state is corrupted and the stream fails to complete.
Reproducible Test Case
I'll add a PR that demonstrates the issue, as well as a simple fix.
Versions used
alpakka-kafka 2.0.0
Expected Behavior
When using pass-through messages via
ProducerMessage.passThrough
in combination withProducer.committableSink
, the incoming messages used to pass-through viaProducerMessage.passThrough
should be correctly committed in the same way as if a "real" ProducerMessage was sent through toProducer.committaleSink
.Actual Behavior
The internal outstanding commit count in CommittingProducerSinkStage is not maintained correctly when pass-through messages (without a
ProducerRecord
) are handled, which means that the general accounting for how many commits are outstanding breaks. This causes the general logic around commit handling to break, and results in a stream that doesn't end at all -- instead, it enters a limbo state where it doesn't complete. If an exception occurs within the stream, the error is not reported correctly (or at all), and the stream still does not complete.The core underlying cause appears to be that the
awaitingCommitResult
field ofCommittingProducerSinkStageLogic
does not get incremented for pass-through messages that are processed, but it does get decremented in the commit callback for all processed messages (i.e. including pass-through messages). This results inawaitingCommitResult
having a negative value. As all stream-ending logic is based on waiting until awaitingCommitResult reaches 0, and the awaitingCommitResult can never reach 0 once it is negative, the whole stream state is corrupted and the stream fails to complete.Reproducible Test Case
I'll add a PR that demonstrates the issue, as well as a simple fix.