akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka-kafka/current/
Other
1.42k stars 386 forks source link

Default PartitionAssignmentHandler behaviour on revoke+assing in transactional #1062

Open szymonm opened 4 years ago

szymonm commented 4 years ago

Short description

SingleSourceLogic clears the buffer of accumulated records of revoked partitions only after new partitions are assigned. I think that this causes blocking onRevoke callback to wait for all the records in the buffer to be drained. While this approach may work in certain cases, it is not universal and I propose to add a configuration option to allow dropping records just after receiving onRevoke.

Details

Looking into the following code from SingleSourceLogic:

override protected def addToPartitionAssignmentHandler(
      handler: PartitionAssignmentHandler
  ): PartitionAssignmentHandler = {
    val flushMessagesOfRevokedPartitions: PartitionAssignmentHandler = new PartitionAssignmentHandler {
      private var lastRevoked = Set.empty[TopicPartition]

      override def onRevoke(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit =
        lastRevoked = revokedTps

      override def onAssign(assignedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit =
        filterRevokedPartitionsCB.invoke(lastRevoked -- assignedTps)

    }
    new PartitionAssignmentHelpers.Chain(handler, flushMessagesOfRevokedPartitions)
  }

I can see that we call filterRevokedPartitionsCB only after the new partitions are assigned. I would like to understand what are the pros and cons of this approach vs dropping the messages in the buffer just after receiving onRevoke from the consumer.

While it seems reasonable not to drop records that have already been read from Kafka it leads to the following inefficiency:

  1. onRevoke is called in the thread that calls consumer.poll. After this callback, we start draining the transactional stream. This is done by blocking the KafkaConsumerActor.
  2. We don't have control over stream processing, so the stream is working as before, except it will not receive new messages from the KafkaConsumerActor.
  3. The source is keeping track of record offsets emitted and will unblock the KafkaConsumerActor only after all emitted messages are committed. It is possible that we will wait for all messages accumulated in buffer to be committed.
  4. onRevoke callback is blocking all consumers in the consumer group for the given topic. Eventually, all consumers for this consumer group and topic are waiting for the last stream buffer to be drained.

In our setting, where we have hundreds of KTPs, it means that all consumers are blocked waiting for the last one to finish draining the whole buffer. This slows down processing for the whole system. I think that a strategy when we drop buffer records immediately after receiving onRevoke is not worse.

I would like to prove it using some benchmark and I'm happy to contribute the change, but I need the following:

  1. Confirmation that my understanding of the code is correct.
  2. Some guidance about how to start benchmarking this rebalancing scenario.
  3. Acceptance of adding the configuration option.
seglo commented 4 years ago

@szymonm Thanks for raising this issue.

SingleSourceLogic clears the buffer of accumulated records of revoked partitions only after new partitions are assigned. I think that this causes blocking onRevoke callback to wait for all the records in the buffer to be drained.

AFAICT (it's been awhile since I've looked at TxSources) the SingleSourceLogic code you highlighted isn't actually run in transactional workloads. If you look at TransactionalSourceLogic it implements its own addToPartitionAssignmentHandler which does nothing on partition assignment, but will block on revoke while it waits for in-flight messages to be drained.

Before I address your other observations I want to make sure we're on the same page here, or if I've misunderstood.

szymonm commented 4 years ago

You are right, @seglo. TransactionalSource is not using the default handler defined in SingleSourceLogic. This makes symptoms I described even worse.

So should we include the call filterRevokedPartitionsCB.invoke(revokedTps) in TransactionalSource too? If yes, I suggest we add them to the onRevoke callback.

seglo commented 4 years ago

@szymonm This sounds reasonable to me. To rephrase for my own understanding:

You would like to filter buffered messages that have not been emitted from the Transactional Source, but keep the existing in-flight message draining in place.

The in-flight message draining is crucial to make sure duplicates don't show up after a rebalance with transactions. It's tough to say if the proposed change will make a substantial difference in your use case, but I think it's worth benchmarking.

Before merging any changes I'll want to make certain our transactional integration tests pass. We've had trouble ensuring transactional guarantees in various rebalance use cases. I'll follow up in the PR about what we can do to feel more comfortable that the changes don't introduce regressions.

szymonm commented 4 years ago

@seglo Your understanding is correct.

I got your point about testing it rigorously. Created a PR with the change only. No tests and benchmarks so far. So far the tests are passing (except Scala 2.13 JDK 8 for reasons unrelated to this change I think).

We also have a bunch of tests on our side that discovered issues with transactions before. I'm pushing the version from my PR to some of our testing environments to check if there are benefits with the real load.

seglo commented 4 years ago

We also have a bunch of tests on our side that discovered issues with transactions before.

It would be great to get an idea of the issues you've experienced, and how the tests are setup. We could use more automated testing here.