akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka-kafka/current/home.html
Other
1.42k stars 387 forks source link

CommittingProducerSinkStage never completes #1783

Open ilinandrii opened 1 day ago

ilinandrii commented 1 day ago

Akka version: 2.5.32 Alpakka version: 2.0.5 Scala version: 2.12.17 Testcontainers Kafka 1.19.0

Expected Behavior

Future[Done] materialized from Producer.committableSink is completed when CommittingProducerSinkStage finishes because of a producer exception.

Actual Behavior

Future[Done] materialized from Producer.committableSink never completes when CommittingProducerSinkStage finishes because of a producer exception.

Additional information

In my particular case a producer used within CommittingProducerSinkStage failed to register schema, this caused sink and consumer to complete, however isShutdown future on stream's DrainingControl built from sink's Future[Done] and Consumer.Control never completes, therefore shutdown hook I set up based on isShutdown never fired.

Reproducible Test Case

Here's a test. I emulate producer serialization error with mock serializer. control.isShutdown.futureValue completes successfully completion.futureValue runs forever, however stream is dead. Hence if I build DrainingControl from control and completion it's isShutdown will also never be completed.

Looking into implementation I suppose promise never completes in case of producer.send failure.

import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{
  CommitterSettings,
  ConsumerSettings,
  ProducerMessage,
  ProducerSettings,
  Subscriptions
}
import akka.stream.scaladsl.Keep
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.mockito.scalatest.IdiomaticMockito
import org.scalatest.concurrent.{Eventually, Futures, JavaFutures, ScalaFutures}
import org.scalatest.time.SpanSugar
import org.scalatest.{Matchers, WordSpec}
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.utility.DockerImageName

class ProducerCommitterSinkTest extends WordSpec
    with Futures
    with JavaFutures
    with ScalaFutures
    with Matchers
    with IdiomaticMockito
    with Eventually
    with SpanSugar {

  val imageName = "confluentinc/cp-kafka"
  val imageTag  = "7.1.4"
  val kafkaTestContainer =
    new KafkaContainer(DockerImageName.parse(s"$imageName:$imageTag"))

  kafkaTestContainer.withKraft().start()

  val config        = ConfigFactory.load()
  val consumerTopic = "consumer-test-topic"
  val producerTopic = "producer-test-topic"
  val serializer    = mock[StringSerializer]

  implicit val system: ActorSystem = ActorSystem("ProducerCommitterSinkTest", config)

  override implicit def patienceConfig: PatienceConfig =
    PatienceConfig(10.seconds, 1.second)

  "ProducerCommitterSink" should {
    "finish stream" when {
      "produce fails" in {

        val testProducer =
          ProducerSettings(system, new StringSerializer, new StringSerializer)
            .withBootstrapServers(kafkaTestContainer.getBootstrapServers)
            .withCloseTimeout(1.second)
            .createKafkaProducer()

        val producerSettings =
          ProducerSettings(system, new StringSerializer, serializer) // mock serializer
            .withBootstrapServers(kafkaTestContainer.getBootstrapServers)
            .withCloseTimeout(1.second)

        val id = "producer-committer-sink-test"
        val consumerSettings = ConsumerSettings(
          system,
          new StringDeserializer,
          new StringDeserializer
        )
          .withBootstrapServers(kafkaTestContainer.getBootstrapServers)
          .withGroupId(id)
          .withClientId(id)
          .withCloseTimeout(1.second)
          .withStopTimeout(0.seconds)
          .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

        val committerSettings = CommitterSettings(system)

        val (control, completion) =
          Consumer.committableSource(
            consumerSettings,
            Subscriptions.topics(consumerTopic)
          )
            .map { msg =>
              val record = new ProducerRecord[String, String](
                producerTopic,
                s"key: ${msg.record.key()}",
                s"value: ${msg.record.value}"
              )
              ProducerMessage.multi(List(record), msg.committableOffset)
            }
            .toMat(Producer.committableSink(producerSettings, committerSettings))(
              Keep.both
            )
            .run()

        serializer.serialize(*, *, *) throws new RuntimeException("failure in serializer")

        testProducer.send(new ProducerRecord(consumerTopic, "test", "test")).futureValue

        control.isShutdown.futureValue
        completion.futureValue
      }
    }
  }
}
ilinandrii commented 1 day ago

Here's test log that shows that consumer and sink actually stopped.

9926 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-3] ERROR akka.actor.RepointableActorRef Error in stage [akka.kafka.internal.CommittingProducerSinkStage@48ae68c8]: failure in serializer 
java.lang.RuntimeException: failure in serializer
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:903)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
    at akka.kafka.internal.CommittingProducerSinkStageLogic.$anonfun$produce$1(CommittingProducerSinkStage.scala:110)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at akka.kafka.internal.CommittingProducerSinkStageLogic.akka$kafka$internal$CommittingProducerSinkStageLogic$$produce(CommittingProducerSinkStage.scala:109)
    at akka.kafka.internal.CommittingProducerSinkStageLogic$$anon$1.onPush(CommittingProducerSinkStage.scala:223)
    at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:523)
    at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:409)
    at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:606)
    at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:485)
    at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:581)
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:749)
    at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:764)
    at akka.actor.Actor.aroundReceive(Actor.scala:539)
    at akka.actor.Actor.aroundReceive$(Actor.scala:537)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:671)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:614)
    at akka.actor.ActorCell.invoke(ActorCell.scala:583)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
    at akka.dispatch.Mailbox.run(Mailbox.scala:229)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
9926 [ProducerCommitterSinkTest-akka.kafka.default-dispatcher-17] DEBUG o.a.k.c.consumer.KafkaConsumer [Consumer clientId=producer-committer-sink-test, groupId=producer-committer-sink-test] Pausing partitions [consumer-test-topic-0] 
9926 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-3] DEBUG a.k.i.CommittingProducerSinkStage [cb534] CommittingProducerSink stopped 
9926 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-3] DEBUG a.kafka.internal.KafkaConsumerActor received handled message Poll(akka.kafka.internal.KafkaConsumerActor@132090ca,true) from Actor[akka://ProducerCommitterSinkTest/deadLetters] 
9938 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-3] DEBUG a.k.i.CommittingProducerSinkStage [cb534] Producer closed 
9939 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-3] INFO  a.kafka.internal.SingleSourceLogic [1aae0] Completing 
9940 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-3] DEBUG a.kafka.internal.KafkaConsumerActor received handled message StopFromStage(1aae0) from Actor[akka://ProducerCommitterSinkTest/system/StreamSupervisor-0/$$a#-2087746625] 
9940 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-3] DEBUG a.kafka.internal.KafkaConsumerActor [5db21] Received Stop from StageId [1aae0], stopping 
9947 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-2] DEBUG a.kafka.internal.KafkaConsumerActor stopping 
9949 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-2] DEBUG a.kafka.internal.ConnectionChecker stopped 
9950 [ProducerCommitterSinkTest-akka.kafka.default-dispatcher-21] DEBUG o.a.k.c.consumer.KafkaConsumer [Consumer clientId=producer-committer-sink-test, groupId=producer-committer-sink-test] Pausing partitions [consumer-test-topic-0] 
9950 [kafka-coordinator-heartbeat-thread | producer-committer-sink-test] DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer clientId=producer-committer-sink-test, groupId=producer-committer-sink-test] Heartbeat thread has closed 
9950 [ProducerCommitterSinkTest-akka.kafka.default-dispatcher-21] DEBUG o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=producer-committer-sink-test, groupId=producer-committer-sink-test] Executing onLeavePrepare with generation Generation{generationId=1, memberId='producer-committer-sink-test-5ee99cef-34f4-4fb2-b2d3-3909007cdb94', protocol='range'} and memberId producer-committer-sink-test-5ee99cef-34f4-4fb2-b2d3-3909007cdb94 
9950 [ProducerCommitterSinkTest-akka.kafka.default-dispatcher-21] INFO  o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=producer-committer-sink-test, groupId=producer-committer-sink-test] Revoke previously assigned partitions consumer-test-topic-0 
9951 [ProducerCommitterSinkTest-akka.kafka.default-dispatcher-21] INFO  o.a.k.c.c.i.AbstractCoordinator [Consumer clientId=producer-committer-sink-test, groupId=producer-committer-sink-test] Member producer-committer-sink-test-5ee99cef-34f4-4fb2-b2d3-3909007cdb94 sending LeaveGroup request to coordinator localhost:50240 (id: 2147483646 rack: null) due to the consumer is being closed 
9951 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-3] DEBUG a.kafka.internal.SingleSourceLogic [1aae0] Revoked partitions: Set(consumer-test-topic-0). All partitions: Set() 
9952 [ProducerCommitterSinkTest-akka.kafka.default-dispatcher-21] DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer clientId=producer-committer-sink-test, groupId=producer-committer-sink-test] Resetting generation due to consumer pro-actively leaving the group 
9958 [ProducerCommitterSinkTest-akka.kafka.default-dispatcher-21] DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer clientId=producer-committer-sink-test, groupId=producer-committer-sink-test] LeaveGroup request returned successfully 
9959 [ProducerCommitterSinkTest-akka.kafka.default-dispatcher-21] DEBUG o.a.k.c.consumer.KafkaConsumer [Consumer clientId=producer-committer-sink-test, groupId=producer-committer-sink-test] Kafka consumer has been closed 
9961 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-3] DEBUG a.kafka.internal.KafkaConsumerActor stopped 
9962 [ProducerCommitterSinkTest-akka.actor.default-dispatcher-4] INFO  akka.actor.RepointableActorRef Message [akka.kafka.internal.KafkaConsumerActor$Internal$StopFromStage] from Actor[akka://ProducerCommitterSinkTest/system/StreamSupervisor-0/$$a#-2087746625] to Actor[akka://ProducerCommitterSinkTest/system/kafka-consumer-1#860674792] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://ProducerCommitterSinkTest/system/kafka-consumer-1#860674792]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.