compstak / KafkaStreams4s

Kafka Streams for Scala. Integrates KafkaStreams with the cats-effect ecosystem
Apache License 2.0
90 stars 14 forks source link

How to safely run effects in Kafka Streams topology? #79

Open niqdev opened 3 years ago

niqdev commented 3 years ago

Hello, I have a generic questions and I'm genuinely curious to know if you ever tried and get your feedback.

I worked on a private implementation similar to KafkaStreams4s but using syntax, like in this example and for fun I tried to re-implement it using ZIO, either way I haven't found a "clean" solution to safely run effects.

To be more specific, in my case I need to invoke an http endpoint before mapping a value and even if we can argue that the logic could/should be moved outside in a different service and then I could join the 2 streams, I might still want to send metrics and log in a "pure" way. In STable I don't see anything similar and I'm just curious to know how would you implement it and what you think about it.

A very generic signature would looks like

// O could be (KO, VO) or Unit depending on the use case
def runMyF[O](f: (K, V) => F[O])(
  implicit F: Sync/Async[F]
): F[???] =

I didn't give a name to the method on purpose, cos the map and flatMap in KafkaStreams aren't really what in fp world one might think and the return type usually is KStream or KTable. I've tried both with the streams dsl and the low-level processor api but I haven't found a nice way other that passing a SyncIO and invoke runUnsafe which defeat all the purposes of passing around an F.

Any thoughts or feedback is very much appreciated, thanks in advance! 😊

LukaJCB commented 3 years ago

Hi @niqdev, I'm not sure if KafkaStreams supports this nicely, given that the DSL we create in this project basically only constructs a Topology which is then interpreted into a running kafakstreams program. I'm not sure hooking effects into streams would actually be deterministic. Happy to be proven wrong here though!

niqdev commented 3 years ago

Thanks for your feedback and yes, I agree with you, I'm not sure either it's a good idea to bake effects into the topology and how the app would behave when scaling... As I said, as a rule of thumb I would probably move the logic in a different service and use something like fs2 in this kind of situation, so that I can easily deal with errors, retries, throttling for instance and join the 2 streams afterwards.

If you don't mind, can we please leave this issue open to gather more feedback? Maybe someone will stumble on this issue and recommend a different approach 🤷‍♂️

LukaJCB commented 3 years ago

Yes for sure! And if you find a way to make this work, I'd be happy to review it too!

niqdev commented 3 years ago

Well... I found a way only cos I had to make it work, which doesn't mean I like it or I would recommend it at all!!!

It was just a POC, so I'm not too concerned on the performance at this stage... anyway, this is the only way I found to "hide" everything inside the library, in the end it's Java!

Just for reference, this is the implementation redacted with a bunch of comments and few manual changes (not sure it compiles)

/**
   * Transforms each record of the input stream into a new record
   * in the output stream running an effect: unsuccessful effects are logged and discarded.
   *
   * Metrics are exported via JMX in the following format:
   * kafka.streams:type=stream-REDACTED-metrics,thread-id=<THREAD_ID>,REDACTED-id=<METRIC_NAME>
   *  - flatMapIO-success-rate
   *  - flatMapIO-success-total
   *  - flatMapIO-error-rate
   *  - flatMapIO-error-total
   *
   * Running effects inside kafka-streams has proven to be challenging.
   * The current implementation uses the low-level processor API to
   * unsafely run `cats.effect.SyncIO`.
   *
   * See feedback in https://github.com/compstak/KafkaStreams4s/issues/79
   */
  def flatMapIO[KO, VO](f: (K, V) => cats.effect.SyncIO[(KO, VO)])(
    implicit F: Sync[F]
  ): F[KStream[KO, VO]] =
    F.delay {
      stream.transform(() =>
        new Transformer[K, V, KeyValue[KO, VO]] {
          // there is no way to avoid `var` here ;-(
          private[this] var sensorSuccess: Sensor = _
          private[this] var sensorError: Sensor   = _

          override def init(context: ProcessorContext): Unit = {
            sensorSuccess = context
              .metrics()
              .addRateTotalSensor(
                "SCOPE_NAME_REDACTED",
                "ENTITY_NAME_REDACTED",
                "flatMapIO-success", // OPERATION_NAME
                Sensor.RecordingLevel.INFO
              )
            sensorError = context
              .metrics()
              .addRateTotalSensor(
                "SCOPE_NAME_REDACTED",
                "ENTITY_NAME_REDACTED",
                "flatMapIO-error", // OPERATION_NAME
                Sensor.RecordingLevel.INFO
              )
          }

          // FIXME unsafeRunSync might blocks running thread: possible performance issues
          override def transform(key: K, value: V): KeyValue[KO, VO] =
            f(key, value)
              .map(keyValue => Right(KeyValue.pair(keyValue._1, keyValue._2)))
              .handleErrorWith { error =>
                // this logging is not "pure"
                logger.error(s"""
                  |ERROR: flatMapIO.transform
                  |metricName=REDACTED
                  |key=$key
                  |value=$value
                  |error=$error
                  |""".stripMargin)
                cats.effect.SyncIO(Left(error))
              }
              .unsafeRunSync()
              .fold(
                _ => {
                  sensorError.record()
                  // discard failures: java api
                  null
                },
                keyValue => {
                  sensorSuccess.record()
                  keyValue
                }
              )

          override def close(): Unit = {}
        }
      )
    }