akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka-kafka/current/
Other
1.42k stars 386 forks source link

Provide a Schema Registry compatible deserialization flow #965

Open seglo opened 5 years ago

seglo commented 5 years ago

Short description

When using the kafka-avro-serializer deserializers in your Kafka Consumer any SerializationException are raised internally to the client and there's no convenient way for an Alpakka Kafka user to decide on how to handle the problem.

It would be nice to have a feature like Kafka Streams which allows the user to provide a SerializationException event handler to decide if the stream should stop, or if we should skip that record and move on to the next.

Details

This problem could be solved in a few different ways, but an idiomatic Akka Streams solution would be to not use the Avro deserializer in the Kafka Consumer. Instead use a simple ByteDeserializer and then do the avro deserialization and schema registry schema version check in a flow downstream from a consumer source. Then the user can use standard Akka Streams error handling techniques to decide if they want to continue, stop, or skip offsets from being processed. This flow could be bundled with Alpakka Kafka for users to include.

Something similar could be considered for the producer use case.

seglo commented 5 years ago

One solution may be to re-use the Confluent SchemaRegistryClient and KafkaAvroDeserializer. We could create a flow, i.e. akka.kafka.scaladsl.Serialization.confluentAvroDeserializer of type Flow[ConsumerRecord[K,Array[Byte]], GenericRecord, _] that will use the KafkaAvroDeserializer.deserialize method to deserialize the Array[Byte] into an Avro GenericRecord (or some other Avro type), and then use SchemaRegistryClient somehow to validate assert the schema and version.

chipola commented 4 years ago

We use something similar where we have a flow Flow[ComittableMessage[Array[Byte], Array[Byte]], (ComittableMessage[Array[Byte], Array[Byte]], Try[(K, V)]), NotUsed] that takes and uses a provided org.apache.kafka.common.serialization.Deserializer[T].

Another option is instead of outputting a tuple is to output CommitableMessage[K, V], i.e. Flow[CommittableMessage[Array[Byte], Array[Byte]], Try[CommittableMessage[K, V]], NotUsed]. Let me know if you have any thoughts I'd be happy to publish a sample PR.

seglo commented 4 years ago

Hi @chipola. Your second option sounds nice. One challenge would be to make this API Java-compatible too. I think instead of wrapping the exception to send downstream we could guide the user through documentation to use a recover operator to handle it.

It would be great if you could contribute a PR to prototype this feature! We can work with you to get it into Alpakka Kafka.

chipola commented 4 years ago

recover will be great assuming we wrap the failure in a custom exception type which provides access to the original ConsumerMessage so users can log/deadletter/etc. as they see fit.Try has proven convenient to allow piping the results through a flexiFlow to deadletter poison messages, even though we use the Java API we weren't too bothered having Try in a few stages.

I'll give it a go.

seglo commented 4 years ago

You're right, that makes sense. There are other precedents of using Try in Alpakka, so perhaps it wouldn't be too bad for Java users. I'm looking forward to seeing what you come up with.