akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka-kafka/current/home.html
Other
1.42k stars 386 forks source link

Repeated records after rebalance #1038

Open gabrielreid opened 4 years ago

gabrielreid commented 4 years ago

Versions used

2.0.1

Expected Behavior

Assuming external (or kafka-internal) commits are processed properly and loaded properly, it is expected that we will get exactly-once processing of records based on the committed offsets, even over rebalances.

Actual Behavior

Incoming records are currently buffered within alpakka-kafka, and then distributed to streams based on demand. With automatic kafka-based partitionining, when a rebalance occurs, the buffered records are not discarded, and are instead still distributed downstream. The internal consumer then seeks to the committed offset (which will typically be earlier than the records in the buffer) and starts reading, which results in the same records being passed into the stream a second time.

This is particularly problematic if commits are stored in an external system (i.e. using Consumer.plainPartitionedManualOffsetSource), and commits only occur every 100 or 100 records for a given partition, for example. In this case, the following sequence of events can occur:

  1. Consumer is processing streams for partition 0 and partition 1
  2. The last commits are at offset 1000 for both partitions, but records up to offset 1200 are in the internal alpakka-kafka buffer
  3. A rebalance occurs, with both partitions being revoked, and then partition 0 being assigned to the same consumer again
  4. The committed offset of 1000 is provided externally
  5. Internally, alpakka-kafka seeks to offset 1000 in Kafka
  6. The stream receives all records up to offset 1200 from the buffer
  7. The stream then again starts receiving records starting from offset 1000

There is currently logic to filter out buffered records from a revoked partition from the buffer, but no such logic for partitions which have been revoked and then directly assigned again as the result of a rebalance.

I realize that it's probably not feasible to fully resolve this issue, as records are asynchronously passed into the stream (as described in the docs in SourceLogicBuffer.scala, but I do believe that the current situation (where hundreds or thousands of records could be passed into a stream a second time) can be greatly improved.

This can/will also probably be resolved by using the incremental rebalance protocol (as pointed out in #790), although there might also be a less invasive option in the short term.

gabrielreid commented 4 years ago

An idea that I had as a (hopefully) quite quick and easy way to resolve this would be to tag records with a "generation id" of the subscription that they were read based on.

This would involve using an AtomicLong or something similar that would get incremented on each call of onPartitionsAssigned on the internal ConsumerRebalanceListener. Each incoming ConsumerRecord could then be wrapped in a ConsumerRecordWithGenerationId that contains the value of the AtomicLong at the time the record was read.

The logic to filter out records from revoked partitions in SourceLogicBuffer could then be updated to just filter based on the current generation id -- this would work for dropping records from revoked partitions, as well as dropping records from partitions that are still assigned but were read with a now-obsolete generation id.

I'd be happy to put together a PR for this, but would first like some feedback on the general approach that I'm proposing here (i.e. does this sound right, and are there any obvious problems with it?)

seglo commented 4 years ago

Hi @gabrielreid. Yes, this is a recurring issue for users that often comes up. As you mentioned, users can introduce an arbitrary number of asynchronous boundaries in their graph, which makes it very difficult to invalidate inflight messages for revoked partitions without a lot of extra intervention and book keeping. This has been partially tackled in our preliminary transactional support, but that's a more specific use case where we can make more assumptions and assign more caveats to how users construct their streams to achieve EoS guarantees.

I think your idea is worth exploring. Just so that I understand: this is only a solution to make invalidation of buffered messages more robust in the Source, and not further downstream?

gabrielreid commented 4 years ago

I think your idea is worth exploring. Just so that I understand: this is only a solution to make invalidation of buffered messages more robust in the Source, and not further downstream?

Yes, correct, I'm just talking about clearing the internally-buffered messages in the Source.

seglo commented 4 years ago

Ok, I'm looking forward to seeing an implementation.

silles79 commented 4 years ago

hi, is there any known workaround to this? like disable internal buffer? I'm batch processing big chunks of messages and pushing them to HDFS and getting loads of duplicates when re-balance happens.

seglo commented 4 years ago

The issue is that in-flight messages can't be invalidated once they leave Alpakka Kafka stages in the graph. Due to the asynchronous nature of akka streams, it's possible that there are some messages from revoked partitions downstream of the consumer. In transactional streams we expect a consume, transform, produce workflow, so even though messages are processed by non-Alpakka Kafka stages, they eventually reach the producer where messages can be invalidated.

There are ways tackle this issue, but it would require something like what @gabrielreid proposed initially in this issue as well as some user cooperation to filter out in-flight messages in downstream stages.

anshchauhan commented 3 years ago

Has anyone found any feasible workaround for this? We've a processing pipeline which aggressively scales along with bursts of data we receive in Kafka. As a result of this lot of partitions are revoked and reassigned frequently. We are seeing tons of duplicates and it's causing a lot of problems for us.

I saw there's an emergency commit introduced in this PR but throwing an exception from onRevoke didn't help. I'm still seeing a lot of duplicates. Has anyone had any success with @gabrielreid's approach?

sebastian-alfers commented 6 months ago

This issue was reported to be fixed by using the CooperativeStickyAssignor, which can be enabled as follows in alpakka-kafka:

consumerSettings.withPartitionAssignmentStrategyCooperativeStickyAssignor()

This is also mentioned in our docs: https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#settings

sebastian-alfers commented 6 months ago

Reopening, since it seems that CooperativeStickyAssignor does not help if a partition gets revoked, but helps the consumer that gets get an partition assigned.