getkyo / kyo

Toolkit for Scala Development
https://getkyo.io
Apache License 2.0
485 stars 39 forks source link

Kafka integration #392

Open calvinlfer opened 1 month ago

calvinlfer commented 1 month ago

Many microservices rely on Apache Kafka to send and receive data asynchronously in a more fault tolerant manner. It is also notoriously difficult to build high level Kafka consumers and producers accounting for concerns like backpressure, rebalancing, streaming, etc.

It would be a very nice proposition to tick this box and give users one more reason to select Kyo for their next project

fwbrasil commented 1 month ago

/bounty $500

algora-pbc[bot] commented 1 month ago

💎 $500 bounty • Kyo

Steps to solve:

  1. Start working: Comment /attempt #392 with your implementation plan
  2. Submit work: Create a pull request including /claim #392 in the PR body to claim the bounty
  3. Receive payment: 100% of the bounty is received 2-5 days post-reward. Make sure you are eligible for payouts

Thank you for contributing to getkyo/kyo!

Add a bounty • Share on socials

Attempt Started (GMT+0) Solution
🟢 @varshith257 May 27, 2024, 3:52:48 PM WIP
varshith257 commented 1 month ago

@fwbrasil I believe my experience with Kafka will get this solved, it needed codebase understanding to integrate it in Kyo. Putting my hat on to solve this issue

/attempt #392

fwbrasil commented 1 month ago

thank you @varshith257!

lukestephenson commented 1 month ago

I had a look at the producer side of Kafka (curiosity, not committing to picking up this work). There are some earlier building blocks in kyo that I have some questions about first.

Stream chunking

Producing / consuming from Kafka and streaming often go together. Currently the streaming implementation hides chunking.  For example, the transform operation is implemented as:

    def transform[V2: Flat, S2](f: V => V2 < S2)(
            using tag2: Tag[Streams[V2]]
        ): Stream[T, V2, S & S2] =
            val handler =
                new Handler[Const[Chunk[V]], Streams[V], Streams[V2] & S & S2]:
                    def resume[T, U: Flat, S3](
                        command: Chunk[V],
                        k: T => U < (Streams[V] & S3)
                    )(using Tag[Streams[V]]): (U | Resume[U, S3]) < (Streams[V2] & S & S2 & S3) =
                        command.map(f).map { c =>
                            Streams.emitChunkAndThen(c) {
                                Resume((), k(().asInstanceOf[T]))
                            }
                        }
                    end resume
            Streams[V].handle(handler)((), s)
        end transform

Note the chunk is used to avoid the overhead of individual elements through the Stream, but it is hidden as an implementation detail.

Producing to Kafka can be expensive for effect system if it is modelled per record. We can achieve better performance producing to Kafka if the produce operation is expressed on a Chunk[ProducerRecord] rather than individual records (less IOs / completion promises to create).

How do folks feel about changing the Streams implementation so that chunking is exposed externally? Or not even a concern of the Streams implementation and you just create a Stream where each element emitted is a Chunk?

Modelling completion of publishing to Kafka.

Typically publication to Kafka is modelled as an IO[IO[RecordMetadata]].

  1. The outer IO represents the effect of the Kafka producer accepting the record into its internal buffer (if it doesn't get accepted, the call blocks).
  2. The inner IO represents when the Kafka brokers have acknowledge the record.

Now in kyo, we can't represent the two side effects with RecordMetadata < (IOs & IOs), so I'm assuming we want to have the return type of the operations of which publish to Kafka as Promise[RecordMetadata < Aborts[Exception]] < IOs. In this the IOs represents the record being accepted by the local producer buffer, and the Promise representing the broker processing. Thoughts?

Making blocking calls

Are there any concerns in kyo with a call to IOs.apply causing the thread of execution to block? If the producer buffer is full, this will occur. i.e. in  IOs.apply(kafkaproducer.send(producerRecord)), the kafkaproducer.send call may block.

lukestephenson commented 1 month ago

I think I've answered by own question about the blocking calls in kyo. The blocking benchmark seems to indicate that it is acceptable in kyo to just use IOs.apply with no special indicator that the call is blocking.

https://github.com/getkyo/kyo/blob/a126fd5f1853cc361770679eafaf1e493476adca/kyo-bench/src/main/scala/kyo/bench/BlockingBench.scala#L13

lukestephenson commented 1 month ago

With regards to modelling the producing to kafka as Promise[RecordMetadata < Aborts[Exception]] < IOs, this becomes pretty difficult to work with because as per the docs:

Kyo performs checks at compilation time to ensure that nested effects are not used

Or at least that is what I think is happening when I gave this a go. When I try to start using that I get an error: Please provide an implicit kyo.Tag[kyo.core.<[org.apache.kafka.clients.producer.RecordMetadata, kyo.aborts$package.Aborts[scala.Throwable]]] parameter.

I can work around this by using Promise[Try[RecordMetadata]] < IOs, but that doesn't feel very kyo like avoiding the Aborts effect.