fd4s / fs2-kafka

Functional Kafka Streams for Scala
https://fd4s.github.io/fs2-kafka
Apache License 2.0
296 stars 101 forks source link

Add chunk-based consumer API #1281

Closed L7R7 closed 9 months ago

L7R7 commented 1 year ago

This adds an API to make the pattern of chunk-based consumers a first-class concept. The idea keeps popping up in the typelevel discord, and we've been successfully using it at $WORK for a while now, so it makes sense to add it.

General idea

The pattern aims at helping users to implement consumers without auto-commit to write their code without having to do too much work in order to achieve correct offset committing (no offsets must be lost, offsets should be committed only after messages have been processed, etc). It achieves that by switching from processing messages in a Stream[F, CommittableConsumerRecord[F, K, V]] to processing them chunks: Chunk[ConsumerRecord[K, V]] => F[Unit]. After each chunk, the offsets of the chunk are committed.

This has a couple of advantages (summarizing what Fabio said on the Discord) :

open questions

todos

There's still a lot of work to do, but I wanted to get some early feedback on the general concept before doing the busy work.

I'm looking forward to receiving some feedbacks and thoughts on this idea!

L7R7 commented 12 months ago

After adding the missing methods smiliar to the ones from the KafkaConsume trait, I'm wondering if it makes sense to simply "mirror" the signatures of the underlying methods. streamChunk is obvious, but with partitionedStreamChunk and maybe even partitionsMapStreamChunk, I'm not sure if we should expose the different streams to the user because I don't know what a good scenario would look like in which the user may want to do something with the streams after each chunk is processed and commited. At the moment, I think it's a tradeoff between symmetry to KafkaConsume and more convenience for the user.

EDIT:

What the library could also do is something like the following:

def consumeTopic[K, V](consumer: KafkaConsumer[IO, K, V], processor: Chunk[ConsumerRecord[K, V]] => IO[Unit]): IO[Unit] =
    consumer.partitionedStream
      .map(
        _.chunks
          .evalMap(consumeChunk(processor))
      )
      .parJoinUnbounded
      .compile
      .drain

(Generalized over F[_] instead of IO, and directly on the consumer instead of taking one as an argument, I just took that from the code we use at $WORK). SInce all the client could do afterwards is .parJoinUnbounded.compile.drain, the library could already do that

aartigao commented 12 months ago

Nice work! Let's start the discussion 🧵

This addition looks good, because it leverages the logic to the library, but it also means we're being more strict on the patterns we support. For example, our $WORK use case doesn't involve committing offsets, but we're working with Chunk for performance reasons. Does that mean that the lib should support that too? Or this is a user-land decision? 🤷🏽

So far, in my experience, working with Stream directly gives you the building blocks for crafting more complex patterns, like the one you expose.

The discussion this PR brings, philosophically, is somewhat like: do we think there's a canonical pattern for Kafka consumption around the Chunk[Record] => F[Unit] pattern?

And to be honest I don't know. I've mixed feelings, on one hand, by looking at the code, it's so simple that adding it to the library doesn't disrupt the rest of the API, but on the other hand, if it's that simple, why not having in in user-land? Otherwise looks to me that we maybe want this library to become more like a framework? Because we are loosing (not actually loosing, because the current API remains untouched, but you get the idea) the Stream foundations in favor of a more constrained API.

What I agree on is to properly document this use case, so the question doesn't pop up again in Discord.

I'd love to see what the original designers think about it 👀

SystemFw commented 11 months ago

My 2c: I think it's worth adding, because it's close to universally useful when working with Kafka for OLTP rather than OLAP use cases. In this case, what we're adding is not the small amount of simple code, but the expertise that this is how you can achieve performance, flexibility and correctness. In my extensive experience working with teams using fs2-kafka, that expertise is lacking and at best you get correct code which leaves a lot of performance on the table, and at worst you get incorrect code. The questions in Discord show a similar pattern.

However, what I would not do is add the whole set of variations mirroring the full api, I'd really just add the one method, otherwise we're back to the issue of people having to navigate which one to use. If you know enough that you need a variation, you can use the low-level Stream api directly.

Asides:

L7R7 commented 11 months ago

Thank you for your comments, really appreciate it.

I think it's a good idea to only add one, biased, method to give the users an obvious way of using the patterns that works well for most cases. I'll proceed in that direction and will also start to add some docs for it. These docs will also mention that this is the first place where the library automatically commits offsets, but not using auto-commit (@aartigao thank you for mentioning the auto-commit use cases, I didn't think about that initially).

@SystemFw regarding your points to the implementation details:

SystemFw commented 11 months ago

It should be an F[Nothing] (so, not a Stream, nor an IO of something else), because it represents a process that doesn't terminate, doesn't emit any results, and doesn't need to be composed as a Stream. Just compile.drain >> F.never

L7R7 commented 11 months ago

Oh, I see. But that relies on the assumption that the previous stream never finishes, right? If the stream would terminate without error, the whole action wouldn't terminate because of the never

L7R7 commented 11 months ago

Should we use consumeChunk in the quick example in the docs to nudge people in the "right direction" right from the beginning? I changed it to do so, and I think the result is quite nice, but i'm happy to hear your thoughts.

L7R7 commented 11 months ago

I'd say this is ready to be reviewed, the only questions I have is the one above regarding the F.never, and whether I should squash the commits into one single commit (I think it should be squashed, I left it unsquashed because it might be easier to review then)

L7R7 commented 9 months ago

@aartigao thank you for the review and thank you for helping me get through the last details yesterday. I enjoyed working on that PR :smiley: