fd4s / fs2-kafka

Functional Kafka Streams for Scala
https://fd4s.github.io/fs2-kafka
Apache License 2.0
293 stars 101 forks source link

Allow setting headers from Serializer #576

Open IndiscriminateCoding opened 3 years ago

IndiscriminateCoding commented 3 years ago

Existing API (which I suppose was modeled after java API) have immutable Headers as an argument to serialize:

def serialize(topic: String, headers: Headers, a: A): F[Array[Byte]]

Immutability of Headers makes it impossible to set headers when deserializing a message. Probably, Headers should be moved to return position of serialize:

def serialize(topic: String, a: A): F[(Array[Byte], Headers)]
filosganga commented 3 years ago

I don't believe the java serialiser allows to set headers. Does it?

IndiscriminateCoding commented 3 years ago

@filosganga https://github.com/apache/kafka/blob/470e6f2b9ac45fc32bdeb77d0bf4c72d26c6f3fd/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java#L932

bplommer commented 3 years ago

Can't what you need to do be done in user code without being coupled to serialization? E.g.

def headersFor(topic: String, a: A): F[Headers] = ???

def makeProducerRecord(topic: String, k: K, a: A): F[ProducerRecord[K, V]] = headersFor(topic, a).map(headers => ProducerRecord(topic, k, a).withHeaders(headers))

Immutability of Headers makes it impossible to set headers when deserializing a message.

You mean when serializing, right?

bplommer commented 3 years ago

Or are you talking about wanting to be able to interop with an existing Java serializer that mutates Headers? If so I can see the case for that, though I don't think I'd want to change the main API. I guess it would need to be def serialize(topic: String, headers: Headers, a: A): F[(Array[Byte], Headers)], and behind the scenes we convert the Headers to the mutable Java type, pass it to the Java serializer, and then convert the mutated Java Headers back to the Scala ones.

IndiscriminateCoding commented 3 years ago

@bplommer

You mean when serializing, right?

Yes, sure, thanks for correction.

Can't what you need to do be done in user code without being coupled to serialization?

Sure, but I think it would be convenient to be able to manipulate headers based on scala (data)type, and it feels very natural to place this logic in a Serializer. There is some asymmetry between Deserializer (it is possible to deserialize based on headers) and Serializer (impossible to serialize and set some headers). In other words: for any datatype T, I expect that property deserialize(serialize(T)) == T always would be true. But it is not possible if some part of a T should be places in headers.

I guess it would need to be def serialize(topic: String, headers: Headers, a: A): F[(Array[Byte], Headers)]

I'm not insisting on any particular types here, but that is basically StateT.

felipehaack commented 3 years ago

Hello guys,

First of all, thank you for all hard work.

About this issue, we're widely using the fs2 kafka in all the projects we currently have in the company.. although right now we're having a bit of trouble to interop with the Java serialiser that mutates the headers.

Do you have any suggestion for workout? where we can set the headers back to produce the event to Kafka without holding a mutable instance between the serialiser and the producer.

Bellow is just a short example for testing purpose: (I added a Help in the lines I thought would need a help)

  def serializer[F[_], E <: SpecificRecord](
      properties: Map[String, String]
  )(implicit S: Sync[F], A: AvroEncryption[E]): Serializer[F, E] = {
    Serializer.instance[F, E] { (topic, _, event) =>
      S.delay {
        // Help - Java encryption that appends new headers
        A.serializer.serialize(topic, ??headers??, event)
      }
    }
  }
  def push[F[_] : ConcurrentEffect : ContextShift, E](
    properties: Map[String, String],
    events: Seq[E]
  )(
    implicit serializer: Serializer[F, E],
    ID: ID[E]
  ): F[List[Int]] = {
    val topic    = properties("topic")
    val settings = ProducerSettings[F, String, E].withProperties(properties)

    Stream
      .emits[F, E](events)
      .map { e =>
        ProducerRecords.one(
          ProducerRecord(topic, ID.id(e), e)
            .withHeaders(???) // Help - somehow, the headers must be set here in order to produce into the event, no? 
        )
      }
      .through(produce(settings))
      .map(_.records.size)
      .compile
      .toList
  }

Any suggestion would be really welcome :)

felipehaack commented 3 years ago

@bplommer, Hi :)

Thank you for your hard work first.

Is it possible for us to work on this enhancement of the library? It will benefit most of the projects we have in the company for us since we're using widely the library for Kafka computation.

If so, would you guide us a bit on, what would you expect? or where could we start? maybe a definition in your head too. Anything would be really appreciated.

bplommer commented 3 years ago

Hi @felipehaack, thanks for the offer of help and sorry for the slow reply! I’ll try to give it some thought and get back to you soon.

Assuming this is a breaking change, it would need to be part of the 3.x series - I’d be happy to release it as a milestone when ready, but it would be several months before it would be in a stable release. Is that ok for you?

bplommer commented 3 years ago

If the problem is just that the output of Headers#toJava is immutable, I think the following should work - it converts the headers to the Java class using a constructor that makes a (shallow) defensive copy.

Does this look like it will solve the issue? If so I'm happy to incorporate it into the library, but could you first try it out and let me know how it goes?

import scala.jdk.CollectionConverters._

def fromJava[F[_], A](serializer: KafkaSerializer[A])(
  implicit F: Sync[F]
): Serializer[F, A] =
  Serializer.instance[F, A] { (topic, headers, a) =>
    F.pure(
      serializer.serialize(
        topic,
        new RecordHeaders((headers.toChain: Chain[KafkaHeader]).toIterable.asJava),
        a
      )
    )
  }
dinchand commented 1 year ago

Hi @felipehaack, thanks for the offer of help and sorry for the slow reply! I’ll try to give it some thought and get back to you soon.

Assuming this is a breaking change, it would need to be part of the 3.x series - I’d be happy to release it as a milestone when ready, but it would be several months before it would be in a stable release. Is that ok for you?

@bplommer We also need this. Not sure if you still have plans to include it in 3.x ?