akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka-kafka/current/home.html
Other
1.42k stars 387 forks source link

Akka Stream die and stop consuming #1130

Open politrons opened 4 years ago

politrons commented 4 years ago

Hi, having a committableSource with KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG

val streamSource = Consumer.committableSource(getConsumerSettings, Subscriptions.topics(topicSubscription))

def getConsumerSettings {
    val kafkaMBAddress = consumerProperties("bootstrap.servers")
    val groupID = consumerProperties("groupId")
    ConsumerSettings(system, new ByteArrayDeserializer, newKafkaAvroDeserializer(consumerProperties))
      .withBootstrapServers(kafkaMBAddress)
      .withGroupId(groupID)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      .withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
      .withProperties(sslConfig)
}

When I receive an event not with the correct Avro schema, my stream die and stop consuming events.

To restart my Stream in case of unhandled I implement this solution

    RestartSource
      .onFailuresWithBackoff(
        minBackoff = 0.seconds,
        maxBackoff = 30.seconds,
        randomFactor = 0.2
      ) { () =>
        streamSource.via(streamFlow)
          .watchTermination() {
            case (consumerControl, futureDone) =>
              log.error("Event Consumer: Unhandled error in Stream rebooting........")
              futureDone.recoverWith { case _ => consumerControl.shutdown() }
          }
      }.to(streamSink).run()

Now when I receive a wrong event, I can see the log "Event Consumer: Unhandled error in Stream rebooting........" all the time

I have Akka version 2.6 any suggestion what can I do?

johanandren commented 4 years ago

I think (but not 100% sure) this is about the Kafka connector rather than Akka streams, so moved it here.

seglo commented 4 years ago

Hi @politrons. This is a common issue with the basic Kafka consumer (and as a result, Alpakka Kafka). Since deserialization occurs within the KafkaConsumer itself an exception is thrown in the source stage which will fail the whole stream. We have discussed different ways to move SerDes concerns out of the consumer and producer #965, but we've yet to settle on a design.

However, I'm not sure if I addressed your question. Now that your stream is in the RestartSource it will be rerun continuously until you shutdown the inner stream or if you provide a cancellable sink to the outer stream. See Alpakka Kafka tests for examples on how we side effect out the draining control to control the stream from within a restart source, or look at the streams documentation to see how to use your own killswitch.

davideicardi commented 3 years ago

But I don't understand why the supervision strategy is not called? If the stream dies I would expect to receive an exception here ...

There are no way to detect a deserialization exception error other than taking the bytes and doing it manually in a map?

seglo commented 3 years ago

Supervision to resume (i.e. skip a message) wouldn't help here because Kafka client serdes exceptions are fatal.