akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka-kafka/current/home.html
Other
1.42k stars 387 forks source link

Alpakka Kakfa suddenly stopped consuming without errors. #1264

Open kkalavantavanich opened 3 years ago

kkalavantavanich commented 3 years ago

Versions used

Scala version: 2.12.11 Akka version: 2.6.3 Alpakka-Kafka version: 2.0.2

Expected Behavior

Please describe the expected behavior of the issue, starting from the first action.

  1. Start the application which starts the consumer
  2. Consumer continues to consume message.

Actual Behavior

Please provide a description of what actually happens, working from the same starting point.

  1. Start the application which starts the Kafka consumer.
  2. Consumer stop consuming message.

Relevant logs

Application logs was "committing xxx" (normal behavior) then it stopped coming without any errors. Application was deployed in docker and it didn't stop. Syslogs looks normal. Kafka audit logs looks normal. Other consumers still working and able to consume from Kafka. Lag for this consumer group increased.

Reproducible Test Case

My understanding of this bug is minimal. Bug is not reproducible. Please help provide provide possible ways to reproduce the bug.

Relevant code

class KafkaConsumer(
    settings: KafkaConsumerSettings,
    transactionService: TransactionService
)(implicit system: ActorSystem, executor: ExecutionContext) {
  val config: Config = settings.consumerConfig
  val schemaRegistry: String = "schemaregistry.company.com"
  private var consumerControl: Option[Consumer.Control] = None
  def consumerSettings: ConsumerSettings[String, Message] = {
    ConsumerSettings(config, new StringDeserializer, new CustomMessageDeserializer)
      .withBootstrapServers(settings.brokers)
      .withGroupId(settings.groupId)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, settings.resetStart)
      .withProperty(CustomMessageDeserializer.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry)
      .withProperty(
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        "com.company.deserializer.CustomMessageDeserializer"
      )
      .withProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, settings.metadataMaxAge)
      .withProperty(SaslConfigs.SASL_JAAS_CONFIG, s"""org.apache.kafka.common.security.scram.ScramLoginModule required
                                                     |    username="user"
                                                     |    password="passw0rd";
                                                     |""".stripMargin)
  }
  def run: Consumer.Control = {
    val control = consumer(consumerSettings).run()
    consumerControl = Some(control)
    control
  }
  private def consumer(consumerSettings: ConsumerSettings[String, Message]): RunnableGraph[DrainingControl[Done]] =
    Consumer
      .committableSource(consumerSettings, Subscriptions.topicPattern(settings.topic))
      .mapAsync(settings.workers)(KafkaMessageHandler(transactionService.handleTransactionMessage))
      .groupedWithin(settings.batchSize, 1.millis)
      .map(CommittableOffsetBatch(_))
      .mapAsync(settings.workers)(offset => {
        logger.debug(s"committing ${offset.getOffsets}")
        //TODO - migrate to undeprecated version
        offset.commitScaladsl()
      })
      .toMat(Sink.ignore)(Keep.both)
      .mapMaterializedValue(DrainingControl.apply)
  def stop: Unit = {
    consumerControl.map(_.shutdown())
  }
}
mrubin commented 3 years ago

We have been plagued by this as well. No changes other than swapping from Kafka's Java SDK to alpakka-kafka.

seglo commented 3 years ago

There's likely an exception that's being swallowed. This is usually a result of the materialized value of the stream not being handled correctly.

In OP's case they should handle the Future[Done] in DrainingControl.isShutdown. This will resolve when the stream shuts down due to an error within the stream, or from without when shutting down the stream using the draining control itself (DrainingControl.shutdown).

sandeepjindal commented 3 years ago

Thanks @seglo