zio / zio-kafka

A Kafka client for ZIO and ZIO Streams
https://zio.dev/zio-kafka
Apache License 2.0
341 stars 137 forks source link

Discussion: Trying to understand the motivations for the Serializer API #853

Open lukestephenson opened 1 year ago

lukestephenson commented 1 year ago

Below I'm listing a couple of things that don't feel right to me about the Serializer API. I'm not expecting anyone to go and change the API for me, I'm happy to do that. This was more to see if other contributors / users are in agreement with the points before I take the discussion further or submit a PR which isn't welcome and find out there are good reasons for the current API design. Thanks

Suggestion 1: Serializer trait rework

Currently we have:

trait Serializer[-R, -T] {
  def serialize(topic: String, headers: Headers, value: T): RIO[R, Array[Byte]]

And note that RIO is an alias for ZIO[R, Throwable, A]

Serialization should not fail. Look at libraries like Circe and zio-json (https://zio.dev/zio-json/) - encoding to json always works, we have compiler guarantees that all data can be encoded. Obviously the same can't be said for deserialization where the inbound data is outside of our control.

Suggestion 2 - Serializers / Deserializers should be safe by default

I'm doing Scala / FP I like avoiding bugs by taking advantage of a powerful type system. A building block for me in this regards is making use of Option to avoid any nasty NPEs. Give me this first before an effect system. Anyway, the Serializer / Deserializers invoked by zio-kafka can currently pass in null because the base representation of ConsumerRecord / ProducerRecord is the standard Java implementation. It's not safe by default. For example, if I want to handle that a key / value then I have to remember to add .asOption to my Deserializer. I feel like the safe behaviour, and highlighting that Kafka doesn't guarantee the key / value are present should be the default, and if I want to acknowledge that I'm happy to discard that type safety (of having Option[V] as the default), then I tell the Deserializer it is ok by calling .valueWillAlwaysExist (or some better method name).

Even on my Serializer I have to call asOption. And this is in my nice Scala code where I represent all my models with Option if appropriate, but throw that away at the last minute on the boundary to working with Java (ie when creating the kafka ProducerRecord). Then I get a callback which can potentially fail because the type safety has been given up too early.

erikvanoosten commented 1 year ago

Discussions around zio-kafka usually take place on Discord: https://discord.com/channels/629491597070827530/629497941719121960 Would you mind placing your question there?

Re. 'Serialization should not fail', serialization might depend on external systems. For example, you may have to fetch an id from a database.

IMHO letting the kafka library do the serializing/deserializing has always felt as a kludge to me. Therefore, I always use the byte array serializer/deserializer and do the conversion in my own code. This gives me the freedom to handle failures the way I want to. Also, it makes testing easier because you don't have to mess around with the serdes interfaces of someone else.

Your points about using Option instead of a nullable value are something to be looked at indeed. Though it might be hard to change the api in a backward compatible way.

guizmaii commented 1 year ago

Hey @lukestephenson,

I agree with your observations. I'm not a fan of the current design, either. I tried to remove the effect system from the De/Serializer interfaces of zio-kafka but didn't manage to find an interesting design.

I'd personally love to see what you can come up with.

lukestephenson commented 1 year ago

@erikvanoosten Thanks for the comments. I'll also kick off the Discord discussion soon.

Re. 'Serialization should not fail', serialization might depend on external systems. For example, you may have to fetch an id from a database.

I completely disagree with this statement. That is not Serialization in my opinion, that is a program. By the time we hand off the ProducerRecord to the Kafka, the only reason it should fail is because of issues with Kafka.

Though it might be hard to change the api in a backward compatible way.

Agree. What I am proposing will not be backwards compatible, nor do I think that should be a design goal. We can keep the existing API if we want, but the design goals of the API I'm proposing will not be backwards compatible.

@guizmaii Thanks as well for the feedback. I had a stab at what it could look like for the Producing side. Here is a rough idea of the API (it's not polished, just for demonstrating the idea).

What is exposed in zio-kafka could be something like:

package zio.kafka.producer

import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.serialization.{LongSerializer, StringSerializer}
import zio._

case class ZKafkaHeader(key: String, value: Array[Byte])
case class ZProducerRecord[K,V](topic: String, key: K, value: V, headers: List[ZKafkaHeader] = List.empty, partition: Option[Integer] = None, timestamp: Option[Long] = None)

object ZProducerRecord {
  // Fairly common to publish a message without a key, so a convenience method for that.
  def apply[V](topic: String, value: V): ZProducerRecord[Option[Array[Byte]], V] = new ZProducerRecord[Option[Array[Byte]], V](topic, Option.empty[Array[Byte]], value)
}

trait ByteArrayEncoder[A] {
  def apply(a: A): Option[Array[Byte]]
}

object Extensions {
  implicit class ZProducerRecordExtensions[K,V](zProducerRecord: ZProducerRecord[K,V]) {
    def encode(implicit keyEncoder: ByteArrayEncoder[K], valueEncoder: ByteArrayEncoder[V]): ZProducerRecord[Option[Array[Byte]], Option[Array[Byte]]] = {
      zProducerRecord.copy(key = keyEncoder(zProducerRecord.key), value = valueEncoder(zProducerRecord.value))
    }
  }
}

object Encoders {
  // Provided encoders
  implicit val stringEncoder = new ByteArrayEncoder[String] {
    private val kafkaSerializer = new StringSerializer()
    override def apply(a: String): Option[Array[Byte]] = Some(kafkaSerializer.serialize(null, a))
  }

  implicit val longEncoder: ByteArrayEncoder[Long] = new ByteArrayEncoder[Long] {
    val kafkaSerializer = new LongSerializer()
    override def apply(a: Long): Option[Array[Byte]] = Some(kafkaSerializer.serialize(null, a))
  }

  implicit val byteArrayEncoder: ByteArrayEncoder[Array[Byte]] = (a: Array[Byte]) => Some(a)

  implicit def optionEncoder[A](implicit encoder: ByteArrayEncoder[A]): ByteArrayEncoder[Option[A]] = (a: Option[A]) => a.flatMap(encoder.apply)
}

trait ProducerProposal {
  def produceAsync(record: ZProducerRecord[Option[Array[Byte]], Option[Array[Byte]]]): Task[Task[RecordMetadata]] = ???

  // IMO This makes a lot more sense than the current implementation which doesn't provide flexibility for the different
  // records in the Chunk to have different serialisation strategies, even though the Chunk could have records for many different topics
  def produceAsyncChunk(records: Chunk[ZProducerRecord[Option[Array[Byte]], Option[Array[Byte]]]]): Task[Task[RecordMetadata]] = ???
}

And for an end user, a sample application might look like:

package zio.kafka.producer

import Encoders._
import Extensions._

case class ExampleModel(value: String)

object EndUserExample {

  implicit val exampleModelEncoder: ByteArrayEncoder[ExampleModel] = (a: ExampleModel) => stringEncoder.apply(a.value)

  val producer: ProducerProposal = ???

  // I don't care about encoding, I've already provided the raw bytes. No call to `encode` required.
  producer.produceAsync(ZProducerRecord(topic = "my.topic", key = None, value = Some("hello".getBytes)))

  // Message with a value only
  producer.produceAsync(ZProducerRecord(topic = "my.topic", value = "hello").encode)

  // Message with a key and value
  producer.produceAsync(ZProducerRecord(topic = "my.topic", key = 1234L, value = "hello world").encode)

  // compacted topic with value provided
  val maybeValueIsPresent: Option[ExampleModel] = Some(ExampleModel("hello"))
  producer.produceAsync(ZProducerRecord(topic = "my.topic", key = 1234L, value = maybeValueIsPresent).encode)

  // compacted topic with tombstone
  val maybeValueIsATombstone: Option[ExampleModel] = None
  producer.produceAsync(ZProducerRecord(topic = "my.topic", key = 1234L, value = maybeValueIsATombstone).encode)
}

Feel free to share feedback.

erikvanoosten commented 1 year ago

Further comments from me will go to Discord.

domdorn commented 4 months ago

Discussions around zio-kafka usually take place on Discord: https://discord.com/channels/629491597070827530/629497941719121960 Would you mind placing your question there?

Its too bad that you do this.. I was interested in reading the rest of this discussion and its super cumbersome to do so.. first I need to have another account on another platform (ok, I have that) but then I have to search through thousands of messages to find messages that correspond to this topic which is impossible after some time has passed.

svroonland commented 1 week ago

Did you guys reach any resolution on this issue?

erikvanoosten commented 1 week ago

Did you guys reach any resolution on this issue?

If you ask me (you shouldn't 😉): you probably know by now what I think about how we support serialization/deserialization in zio-kafka (or how it is done in the java Kafka library for that matter): I think it should not even be there because it lives at the wrong abstraction level. Because of that we get discussions like this issue. Hence my recommendation to always work with Array[Byte] and do serialization/deserialization in your own code. Anyways, I am going to leave any resolutions up to someone else.

BTW, we did improve the documentation https://zio.dev/zio-kafka/serialization-and-deserialization a lot recently.

erikvanoosten commented 1 week ago

Did you guys reach any resolution on this issue?

BTW, @svroonland please stop writing from the future! 😄

svroonland commented 1 week ago

Probably repeating a lot of stuff that has been said already, but here's my 2c.

Agreed that serialization should not fail, especially in the Scala / type-safe world. Our Serializer was made to reflect the apache kafka Serializer interface, with some ZIO added to model the side-effects. Easy integration with for example the Confluent avro serde, which connects to a schema registry, is a good reason to keep this compatibility. Easy migration for users of apache kafka was in retrospect another benefit. If we drop the effectfulness of our Serializer, our users will have to make some major changes to their streams. On the other hand, we have an opportunity here to do it better and more type-safe and promote the strengths of Scala and ZIO. If we can find a good way to support functionality like the Confluent avro serde, either by having some convenience methods or good documentation/examples, I would be open to that.

As a side-note, I wonder how efficient the Confluent AVRO serializer is, serializing records one by one and checking the schema and if we could improve on that by having a custom implementation that performs on chunks.

Regarding deserializing in the stream, in essence it's a convenience that we provide the Deserializer abstraction and a stream of typed values of some type (actually a CommittableRecord[K, V]), instead of a library that just gives you meaningless bytes. In practice you have to create your own Serdes for complex record types (eg json or avro) and a deserializer is just a very thin wrapper over a simple function call. I do however like the composability via combinators like orElse or map, making a Deserializer a good and powerfl abstraction. Serde with its map & contramap is also a useful abstraction.

The same convenience argument applies to the possibility of all values coming from Kafka being null. At some point you have to specify that your values are of some type T, even though you can't prove that the bytes coming can always be deserialized to a T. Explicitly modeling the possibility that every value can be null does not change that.

Also:

If you ask me (you shouldn't 😉).. Disagreed! Your opinion is equally valid and appreciated. Do you still use Deserializer in some form your custom stream code or just directly do deserialization yourself?

So in summary, I would propose exploring: