When an externally-provided Producer is used for a sink (i.e. a producer that is not instantiated by the Sink itself), the Producer should never be automatically closed by the Sink (e.g. due to stream failure).
For example, something like
val mySharedProducer = ...
val sink = Producer.plainSink(producersettings.withProducer(mySharedProducer))
should never close mySharedProducer on its own, regardless of what happens within the stream, as this could break other in-progress streams that are making use of the same shared Producer.
Actual Behavior
On a Kafka send failure, the value of ProducerSettings.closeProducerOnStop is ignored in DeferredProducer.closeProducerImmediately, meaning that a Producer send failure in one stream will immediately close a shared Producer and break all other in-progress streams.
I'll add a PR with a test that demonstrates the issue and the fix.
Versions used
alpakka-kafka 2.0.1
Expected Behavior
When an externally-provided Producer is used for a sink (i.e. a producer that is not instantiated by the Sink itself), the Producer should never be automatically closed by the Sink (e.g. due to stream failure).
For example, something like
should never close
mySharedProducer
on its own, regardless of what happens within the stream, as this could break other in-progress streams that are making use of the same shared Producer.Actual Behavior
On a Kafka send failure, the value of
ProducerSettings.closeProducerOnStop
is ignored inDeferredProducer.closeProducerImmediately
, meaning that a Producer send failure in one stream will immediately close a shared Producer and break all other in-progress streams.I'll add a PR with a test that demonstrates the issue and the fix.