zio / zio-kafka

A Kafka client for ZIO and ZIO Streams
https://zio.dev/zio-kafka
Apache License 2.0
337 stars 138 forks source link

Implement 'produce' aliases in the Producer trait, making producer co… #1048

Closed soujiro32167 closed 1 year ago

soujiro32167 commented 1 year ago

Implement 'produce' aliases in the Producer trait, making producer composition easier

CLAassistant commented 1 year ago

CLA assistant check
All committers have signed the CLA.

soujiro32167 commented 1 year ago

Consider a simple producer:

trait Producer:
  def produce[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]]

I want to create a producer that, for each record, applies a function that modifies the record, and may tack on an effect, like logging each send call:

trait Producer:
  def produce[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]]

def modifyHeadersAndLoggingProducer(producer: Producer): Producer = 
   new Producer:
      def produce[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] = 
           producer.produce(record.addHeader("foo", "bar"))
                .map(_ *> ZIO.logDebug("ack received")) *> ZIO.logDebug("record sent")
            )

Without this MR, to compose producers, you'd have to implement all the produce/async/serialize methods all over again.

Even without this use case, all the methods other than produceChunkAsyncWithFailures can be implemented in terms of it, so why leave them unimplemented?

soujiro32167 commented 1 year ago

Here is an example of how painful it is without these changes: https://github.com/kaizen-solutions/trace4cats-zio-extras/blob/main/zio-kafka/src/main/scala/io/kaizensolutions/trace4cats/zio/extras/ziokafka/KafkaProducerTracer.scala

soujiro32167 commented 1 year ago

@yisraelU fyi

guizmaii commented 1 year ago

I don't think that's how you should do that

Here's how I'd do it:

package my.example

import zio.kafka.Producer as ZioProducer

trait MyProducer {
  def produce[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]]
}

object MyProducer {
  val live: URLayer[ZioProducer, MyProducer] = 
    ZLayer {
      for {
        zioProducer <- ZIO.service[ZioProducer]
      } yield new MyProducerLive(zioProducer)
    } 
}

final class MyProducerLive(zioProducer: ZioProducer) extends MyProducer {
  override def produce[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] = {
    zioProducer
      .produce(record.addHeader("foo", "bar"))
      .map(_ *> ZIO.logDebug("ack received")) *> ZIO.logDebug("record sent"))
  }
}

You should prefer composition and should not depend on the interface that we're exposing.

soujiro32167 commented 1 year ago

@guizmaii this way of composition makes sense, but has some shortcomings:

  1. What I'm after is a middleware, similar to what zio-http has.
  2. The end interface I'm after is the same interface as Producer. It may need produceAsync or produceChunkAsync and all their variations. That is because my end user doesn't care whether the producer is instrumented or not - they just need a Kafka producer
  3. Imagine integrating an instrumentation library into your code base. The consumer of a Producer would have to explicitly ask for MyProducer if they want an instrumented version, instead of handling instrumentation at the layer level
guizmaii commented 1 year ago
  1. About your point 2, here's how I'd do:
    
    package my example

import zio.kafka.Producer as ZioProducer

// Because you extends the zio-kafka Producer, your implementation will have to implement all the zio-kafka Producer interface trait MyProducer extends ZioProducer

object MyProducer { val live: URLayer[ZioProducer, ZioProducer] = ZLayer { for { zioProducer <- ZIO.service[ZioProducer] } yield new MyProducerLive(zioProducer) } }

final class MyProducerLive(zioProducer: ZioProducer) extends MyProducer {

// example of methods where you do something around the producing override def produce[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] = zioProducer .produce(record.addHeader("foo", "bar")) .map(_ > ZIO.logDebug("ack received")) > ZIO.logDebug("record sent"))

// example of methods where you do nothing around the producing, just call the zioProducer method override def produceAsync[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] = zioProducer.produceAsync(record)

... // all the other methods of the interface }


If we merge your PR, you'll not be able to do this anymore or you'll have to copy the code you moved from the `ProducerLive` to the `Producer` trait into your code, which is not what you want/what you should have to care about as a zio-kafka user.
So the code will end up being less composable than it is today.

2. About your point 3, here is how I implement manual instrumentation of my services:
```scala
package my.example

import zio.telemetry.opentelemetry.Tracing

trait MyService {
  def myUsefulMethod(...): Task[Unit]
}

object MyService {
  val live: URLayer[... & Tracing, MyService] =
    for {
      ...     <- ...
      tracing <- ZIO.environment[Tracing]
      live    = new MyServiceLive(...)
      traced  = new MyServiceTraced(tracing)(live)
    } yield traced
}

final class MyServiceLive(...) extends MyService {

  override def myUsefulMethod(...): Task[Unit] = ...

}

// Copilot is very good at writing this boring code automatically BTW
final class MyServiceTraced(tracing: ZEnvironment[Tracing])(delegator: MyService) extends MyService {
  import zio.telemetry.opentelemetry.TracingSyntax.*

  override def myUsefulMethod(...): Task[Unit] = 
    delegator
      .myUsefulMethod(...)
      .span("MyService::myUsefulMethod")
      .provideEnv(tracing)

}

With your Producer needs, that'd give:

package my.example

import zio.kafka.Producer as ZioProducer
import zio.telemetry.opentelemetry.Tracing

object MyTunedProducer {
  val live: URLayer[Tracing & ZioProducer, ZioProducer] = 
    ZLayer {
      for {
        zioProducer <- ZIO.service[ZioProducer]
        tracing     <- ZIO.environment[Tracing]
        headers     = new HeaderAddingProducer(producer)
        logged      = new LoggedProducer(headers)
        traced      = new MyTracedProducerLive(tracing)(logged)
      } yield traced
    }
}

final class HeaderAddingProducer(delegator: ZioProducer) extends ZioProducer {

  private def addHeaders([re](record: ProducerRecord[K, V])) = record.addHeader("foo", "bar")

  override def produce[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] =
    delegator
      .produce(addHeaders(record))

  override def produceAsync[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] =
    delegator
      .produceAsync(addHeaders(record))

  ...
}

final class LoggedProducer(delegator: ZioProducer) extends ZioProducer {

  override def produce[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] =
    delegator
     .produce(record)
     .map(_ *> ZIO.logDebug("ack received")) *> ZIO.logDebug("record sent"))

  // not logged
  override def produceAsync[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] =
    delegator.produceAsync(record)

  ...
}

final class MyTracedProducerLive(tracing: ZEnvironment[Tracing])(delegator: ZioProducer) extends ZioProducer {
  import zio.telemetry.opentelemetry.TracingSyntax.*

  override def produce[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] =
    delegator
      .produce(record)
      .span("Producer::produce")
      .provideEnv(produce)

  override def produceAsync[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] =
    delegator
      .produceAsync(record)
      .span("Producer::produceAsync")
      .provideEnv(produce)

  ... 
}
soujiro32167 commented 1 year ago

final class MyServiceTraced is exactly what I'm trying to avoid: my service should not care if its Kafka producer is traced or not.

Here is how my current code looks like:

object MyTunedProducer {
  val live: URLayer[Tracing & ZioProducer, ZioProducer] = 
    ZLayer {
      for {
        zioProducer <- ZIO.service[ZioProducer]
        tracing     <- ZIO.environment[Tracing]
        headers     = new HeaderAddingProducer(producer)
        logged      = new LoggedProducer(headers)
        traced      = new MyTracedProducerLive(tracing)(logged)
      } yield traced
    }
}

trait MyService:
  def foo: UIO[Unit]

case class MyServiceLive(producer: Producer) extends MyService:
  def foo: UIO[Unit] = producer.produce(...)

object MyServiceLive:
  val live: ZLayer[Producer, MyService] = ZLayer.fromFunction(MyServiceLive(_))

Only in Main does the tracing come out:

object Main extends ZIOAppDefault:
   def run = app.provide(
      MyTunedProducer.live,
      MyServiceLive.live
   )
soujiro32167 commented 1 year ago

Besides composition: Producer has 11 aliases in total to produce. All of them are implemented in terms of produceChunkAsyncWithFailures.

What is the value of letting users composing with Producer re-implement all 11 aliases?

In your example, both HeaderAddingProducer and HeaderAddingProducer would have to re-implement all 11 aliases again, exactly in the same way, taking a risk of deviating from the canonical LiveProducer implementation

soujiro32167 commented 1 year ago

If we merge your PR, you'll not be able to do this anymore or you'll have to copy the code you moved from the ProducerLive to the Producer trait into your code, which is not what you want/what you should have to care about as a zio-kafka user.

Why not? All I'm doing is removing the need to reimplement aliases, which (arguably) have no other useful implementations

  // example of methods where you do nothing around the producing, just call the zioProducer method
  override def produceAsync[K, V](record: ProducerRecord[K, V]): UIO[UIO[Unit]] =
    zioProducer.produceAsync(record)

  ... // all the other methods of the interface 

During the implementation of the traced producer, I had to carefully check and make sure if any of the 11 aliases actually did something other than call produceChunkAsyncWithFailures.

If they did, I'd have to instrument them as well. Since they don't, I can now confidently instrument one method and know all other methods are instrumented as well.

soujiro32167 commented 1 year ago

Look how clean the FS2 Producer implementation is.

The interface has 1 produce method, with 4 aliases like produceOne all implemented in terms of it. Thats where I'd like us to be

guizmaii commented 1 year ago

final class MyServiceTraced is exactly what I'm trying to avoid: my service should not care if its Kafka producer is traced or not.

Your service doesn't know if the passed Producer instance is traced or not:

// my/package/MyService.scala
trait MyService {
  ...
}

object MyService {
  val live: URLayer[ZioProducer, MyService] = ...
}

final class MyServiceLive(producer: ZioProducer) extends MyService {
  ...
}

// my/package/Main.scala
object Main extends zio.App {

  val kafkaProducer: ULayer[ZioProducer] = 
    ZLayer.make[ZioProducer](
      Tracing.live,
      MyTunedProducer.live
    )

  def run = 
    ( ... ).provide(kafkaProducer, MyService.live)

}
soujiro32167 commented 1 year ago

Ok, we agree that instrumentation should be transparent 👍

To the matter at hand, and the reason I submitted this PR:

  1. The change from 2.4.0 and 2.4.2 introduced yet another alias to produce

  2. Because the 11 produce aliases remain unimplemented, the instrumentation library I maintain had to be significantly changed to account for the new alias

  3. The change I'm proposing shifts maintenance of aliases to zio-kafka, the only library that needs to know how to implement produce in terms of produceAsyncChunkWithFooOrWhatever

  4. The solution you propose @guizmaii does not solve this issue, since it requires downstream consumers of zio-kafka to know how to implement the aliases.

erikvanoosten commented 1 year ago

@soujiro32167 I agree with you. I find this solution more elegant.

soujiro32167 commented 1 year ago

@guizmaii i cannot merge this MR with your objection

svroonland commented 1 year ago

Even with the default implementations of the aliases in Producer like in this PR, they are just defaults, but not guarantees that all implementations of Producer are like that. To deal with that, if you create a wrapper, you'd have to forward all method calls to the delegated Producer anyway. Especially if you start composing wrappers that may have slightly different behavior for each of the produce* methods.

But because of the specifics of our Producer, I'd say that we can safely consider everything but produceChunkAsyncWithFailures to be convenience methods on the interface, not something that may vary between implementations. If we make those methods final defs, we don't have to worry about the above situation.

erikvanoosten commented 1 year ago

Sorry @soujiro32167, the other maintainers convinced me that this is not a good idea. @svroonland explained it well in the previous comment.

To support your use case, perhaps it is better to introduce something like the Diagnostics trait that is available on consumers.

soujiro32167 commented 1 year ago

they are just defaults, but not guarantees that all implementations of Producer are like that

Absolutely @svroonland, the idea is to create better defaults. If an implementation needs different behaviors for produce*, they are free to re-implement

soujiro32167 commented 1 year ago

@erikvanoosten could you say more about the Diagnostics trait?

Is that a sub-trait with produce* implemented and produceChunkAsyncWithFailures left unimplemented? Nvm, found it . Very cool! I'd love something like that for the producer.