akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka-kafka/current/home.html
Other
1.42k stars 387 forks source link

At least once guarantees of Producer.committableSink #1242

Open PrathikPuthran opened 3 years ago

PrathikPuthran commented 3 years ago

Producer.committableSink seems to drop messages when the consumer is shutting down. It seems the offset commits are out of order. This could be a problem when the consumer is shutting down and partitions are rebalanced to other consumer because higher offsets are committed to kafka even though the messages corresponding to lower offsets are not successfully published to the target kafka topic. I found this while investigating lost messages in the stream and all the lost messages were during the consumer rebalance ( usually triggered by consumer process restart)

ennru commented 3 years ago

Thank you for reporting what you experienced.

What versions are you using? Can you explain your findings more exact? Only offsets of successfully produced records are added to be committed. Did you inspect records being produced out of order?

PrathikPuthran commented 3 years ago

Thanks @ennru for looking into it. I was using the below version compile group: 'com.typesafe.akka', name: 'akka-stream-kafka_2.12', version: '2.0.1'

Yes the kafkaproducer can produce records out of order as described in the doc https://kafka.apache.org/documentation/#retries We have 10 consumers subscribing to a kafka topic. It does some mutation to the data and sends them to target kafka topic. This system processes around 15-20K events/second. I was investigating missing messages where some messages in source topic was not available in target topic. On further investigation the lost messages were during consumer restarts. After digging into the implementation of Producer.committableSink() it looks like it commits offsets as soon as it gets acks without accounting for the ordering. So after partition re-assignment the new consumer would start consuming from the higher committed offset even though the messages from the lower offsets for that partition were not successfully produced to target kafka topic.

I changed the implementation to use combination of Producer.flexiFlow() and Consumer.committableSink. Producer.flexiFlow guarantees ordering because it uses mapAsync. I haven't seen missing messages issue since then.

ennru commented 3 years ago

Yes, you are right. committableSink will allow committing of offsets out of order if the producer callbacks finish in a different order than the send was issued.

silles79 commented 3 years ago

I think i am having the same bug, my stream looks like

Consumer.committableSource(consumerSettings, Subscriptions.topicPattern(config.kafkaConsumer.topicsIn))
      .map(process)
      .via(Producer.flexiFlow(producerSettings))
      .map(_.passThrough)
      .toMat(Committer.sink(CommitterSettings(system).withMaxBatch(20)))(DrainingControl.apply)
      .run()

def process is simple and just passes back the payload.committableOffset

Looks like I'm also losing messages. Using theses versions:

val AkkaVersion = "2.5.32"
val AkkaKafkaVersion = "2.0.5"
val KafkaVersion = "2.4.1"

Is there any workaround?

seglo commented 3 years ago

@silles79 When using Producer.flexiFlow we ensure ordering of Envelope (input) to Result, even when retries may occur (by using mapAsync). Therefore when used in combination with Committer.sink we shouldn't be able to lose messages, even when a commit fails, because either a subsequent commit will override it (by committing a larger offset), or the commit won't be acknowledged due to some other failure. In the latter case you might observe duplicates when processing resumes or a partition is rebalanced, but not gaps.

The Producer.committableSink OTOH will commit an offset as soon as a produced message is acknowledged. If the acknowledgement arrives out of order (due to retries, most likely) then it's possible to commit a higher offset that skips the processing of a message with a lower offset.

silles79 commented 3 years ago

@seglo Thanks for the reply. That was my understanding, unfortunately I'm still loosing messages. I have re-run many times, sometimes without loosing any messages, sometimes i loose a few. Trying to reproduce it in a local tests, I'll report back if i manage.

silles79 commented 3 years ago

@seglo Look like i missing the data somewhere else:

question about Producer.flexiFlow

    Flow[ProducerMessage.Envelope[String, String, PassThrough]]
      .async
      .via(Producer.flexiFlow(producerSettings))
      .map(rawData => { })
      .reduce((acc: (PassThrough, Long), latest: (PassThrough, Long)) => {
        if (acc._2 % 10000 == 0) logger.info(s"Processed ${acc._2} rows, latest offsets: ${offsetStore.show}")
        else logger.debug(s"Processed ${acc._2} rows, latest offsets: ${offsetStore.show}")
        (latest._1, acc._2 + latest._2)
      })
      .map(_._2)
      .viaMat(KillSwitches.single)(Keep.right)
      .toMat(Sink.headOption)(Keep.both)

Does it guarantee that messages are fully committed kafka before it calls map? or is this a bug? or i'm doing something wrong?

I'm reading from a db using slick and pushing the data to kafka and looks like sometimes i miss the last couple of messages from the table, but I can see them in map(rawData => ) and got the correct count at the end. But messages are missing from kafka.

Cheers,

seglo commented 3 years ago

@silles79 Producer.flexiFlow will only emit Results that have been successfully acknowledged by the broker (via a Kafka Callback). Internally, the Alpakka Kafka Producer creates a Promise and its future is emitted immediately after the message is produced to Kafka asynchronously. The Promise is only resolved when the callback is returned. Since a mapAsync(identity) follows the Alpakka Kafka Producer it guarantees the order of results.

Based on the info provided I can only think of edge cases where this breaks down: if the broker goes down before the message is replicated.

Do you have any non-default settings in your ProducerSettings?

PrathikPuthran commented 3 years ago

It could happen if acks is not set to all https://kafka.apache.org/documentation/#acks

seglo commented 3 years ago

@PrathikPuthran have you tried using enable.idempotence=true with your ProducerSettings when using Producer.committableSink? I think this should resolve the issue you experienced. The setting has no impact on our internal benchmarks, but it may on a production cluster.

I created a draft PR to enforce this setting with Producer.committableSink https://github.com/akka/alpakka-kafka/pull/1265

silles79 commented 3 years ago

Do you have any non-default settings in your ProducerSettings?

I only have

 ProducerSettings(system, new StringSerializer, new StringSerializer)
      .withBootstrapServers(kafkaBrokerConfig.bootstrapServers)
      .withProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, s"${kafkaProducerConfig.maxRequestSize}")
      .withProperty(ProducerConfig.LINGER_MS_CONFIG, s"${kafkaProducerConfig.lingerMs}")
      .withProperty(ProducerConfig.BATCH_SIZE_CONFIG, s"${kafkaProducerConfig.batchSize}")

where

 maxRequestSize: "100000000",
 partitions: "24",
 replication: "2",
 batchSize: "100000000"
 lingerMs: 0

Maybe the problem was not waiting for system.terminate() had something like

  def start(stream: => (UniqueKillSwitch, Future[Option[Long]])): Option[Long] = {
    val start = System.currentTimeMillis()

    val (killSwitch, results) = stream
    Await.result(results, Duration.Inf) 
    ...
    system.terminate()
    ...
   System.exit()

I was not waiting for system.terminate() I thought waiting for the "results" was enough. Maybe the messages were buffered somewhere? I'm re-running my tests with

 Await.result(system.terminate(), 120 seconds)

very strange issues

seglo commented 3 years ago

@silles79 Your producer properties shouldn't be a problem.

It's unclear from your code how the stream shuts down. Unless there's a cancellation signal from a stage, or an error occurs the stream should never shutdown. The best way to ensure a controller shutdown is to use the Draining Control.

drainAndShutdown() will return a Future that you can map/await to complete the shutdown of the actor system and your app.

silles79 commented 3 years ago

@seglo

I'm using Alpakka: https://doc.akka.io/docs/alpakka/current/slick.html

 Slick
          .source(
            sql"""select * from #${table}""")

and takes care of the shutdown once the table is fully read.

isn't drainAndShutdown for Consumers?

seglo commented 3 years ago

Got it. Yes, I would expect the completion signal to be emitted by the Slick.source and the stream to shutdown when all elements have successfully traversed. Your stream may be completing early because of your use of Sink.headOption.

Materializes into a Future[Option[T]] which completes with the first value arriving wrapped in Some , or a None if the stream completes without any elements emitted.

https://doc.akka.io/docs/akka/current/stream/operators/Sink/headOption.html

silles79 commented 3 years ago

@seglo I have reprocess my database:

INFO  c.e.e.fetcher.SqlFetcher$$anon$1 - Finished 1407202034 messages in 44061.624 sec - 31937 msg/sec

~50 tables and haven't lost a single message from the end.

Only change was to wait for system.terminate "Await.result(system.terminate(), 120 seconds)" and bumped akka-kafka from 2.0.3 to 2.0.5. Strange.