akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka-kafka/current/home.html
Other
1.42k stars 387 forks source link

Transactional Source with Multiple Producer Messages #711

Open mehdimas opened 5 years ago

mehdimas commented 5 years ago

Versions used

Alpakka Kafka version: 1.0-RC1 and 1.0-RC1+19-1fd51211

Akka version: 2.5.19

Kafka version: 2.1.0 and 2.1.1-rc1

Expected Behavior

When using a transactional source connected to a transactional flow, I expect that I can map one message from a source topic to multiple messages in a destination topic using ProducerMessage.multi(...)

The expected behavior is that the source topic messages will be processed once, and the destination topic will receive the correct number of messages (as defined in the number of messages specified in ProducerMessage.multi(...)).

See the code example below.

Transactional
        .source(consumerSettings, Subscriptions.topics("sourceTopic"))
        .via(
          Flow[ConsumerMessage.TransactionalMessage[String, DestinationMessage]]
            .map { msg =>
              ProducerMessage.multi(
                List[ProducerRecord[String, DestinationMessage]](
                  new ProducerRecord(
                    "destinationTopic",
                    msg.record.key,
                    DestinationMessage("foo")
                  )
                ),
                msg.partitionOffset
              )
            }
        )
        .via(Transactional.flow(producerSettings, transactionalId))

Actual Behavior

If the source topic has 100 partitions and I produce 1000 messages (with roughly equal distribution across the partitions), then I get approximately 0-30 counts of lag in each partition and approximately 300 or 400 extra records in the destination topic.

If I set up the transactional flow with a ProducerMessage.single(...) there seems to be no remaining lag in the partitions.

Used Alpakka Kafka configuration

This should be reproducible with the default settings for Alpakka Kafka. I tried this with a single Kafka broker and a cluster of three Kafka brokers.

I'm also running the latest Kafka broker version: 2.1.0, but I've also tried it out on previous versions.

I've tried this out on a few alpakka kafka versions including the latest snapshot: 1.0-RC1+19-1fd51211.

ennru commented 5 years ago

Thank you for trying this out and reporting this misbehaviour.

cc @seglo

seglo commented 5 years ago

Apologies for the late reply. This could be related to https://github.com/akka/alpakka-kafka/pull/742 . @mehdimas could you please test against master?

mehdimas commented 5 years ago

@seglo We've switched over to a non-transactional setup, but I will setup a test in the next week.

mehdimas commented 5 years ago

I haven't forgotten about this, but we've been busy trying to reduce duplication for a non-transactional, committable, partitioned consumer during rebalancing.

dineyw23 commented 4 years ago

Came across this issue while trying to search for examples to use ProducerMessage.multi(...) with Transactional.source/Transactional.flow.

Not sure if I'm understanding this correctly but does this mean as of Alpakka Kafka Version 2.0.2, ProducerMessage.multi(...) would produce duplicates in the destination topics when used with a Transactional.source connected to a Transactional.flow?

seglo commented 4 years ago

does this mean as of Alpakka Kafka Version 2.0.2, ProducerMessage.multi(...) would produce duplicates in the destination topics when used with a Transactional.source connected to a Transactional.flow?

This should not happen. We have a test in place that tests this combination of APIs. https://github.com/akka/alpakka-kafka/blob/master/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala#L245

geekthanos commented 4 years ago

@seglo I can't comprehend if we send it to 2 different topics which commit id it takes in transactional kafka? What if 1 topic is commited and other is not? Will it consider not commited?

seglo commented 4 years ago

@kolapkardhaval All messages produced to Kafka within a transaction will be committed as part of the same transaction. If a message is produced, but not acknowledged the transaction won't be committed. It's all or nothing.

You can ensure your consumer side only consumes committed messages by using the consumer property isolation.level set to read_committed.