zio / zio-kafka

A Kafka client for ZIO and ZIO Streams
https://zio.dev/zio-kafka
Apache License 2.0
337 stars 138 forks source link

Enable the consumer to commit an offset with metadata #1067

Closed flavienbert closed 11 months ago

flavienbert commented 1 year ago

Fixes #1066

erikvanoosten commented 1 year ago

@flavienbert I have in my plans to merge commits into a single commit. Is that still possible when there is metadata attached to the commit?

flavienbert commented 1 year ago

@erikvanoosten yes I think. The apache kafka consumer client take an OffsetAndMetadata so it should be OK.

erikvanoosten commented 1 year ago

@erikvanoosten yes I think. The apache kafka consumer client take an OffsetAndMetadata so it should be OK.

Ah I see. When we merge commits, per partition we keep the OffsetAndMetadata with the highest offset. It means we'll loose the metadata of earlier commits, but that is actually okay; those commits are no longer needed because they have been superseded by a newer one.

I'll study this PR a bit more (especially for breaking changes) but so far it looks good to me.

flavienbert commented 1 year ago

One point not handle by this PR: the metadata from the CommittableRecord.offset will be always empty: https://github.com/zio/zio-kafka/blob/6e451121b148e4997f1318b5807e83b8d0e6fdf9/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala#L48

This PR only able to publish metadata from consumer stream, not retrieve it when you consume a stream. Is it something you want to add too @erikvanoosten?

I don't really know how to handle it because the ConsumerRecord from org.apache.kafka.clients.consumer doesn't expose the metadata :

Creates a record to be received from a specified topic and partition (provided for compatibility with Kafka 0.9 before the message format supported timestamps and before serialized metadata were exposed).

I believe this feature is not possible. It seems we can only get metadata with the Admin client

erikvanoosten commented 1 year ago

This PR only able to publish metadata from consumer stream, not retrieve it when you consume a stream. Is it something you want to add too @erikvanoosten?

We prefer smaller PRs. So another issue/pr can be created for this.

flavienbert commented 1 year ago

As I mentioned in the previous comment, I think it's not possible to retrieve the metadata from a ZStream with the Consumer because the API ConsumerRecord doesn't expose the metadata.

erikvanoosten commented 1 year ago

As I mentioned in the previous comment, I think it's not possible to retrieve the metadata from a ZStream with the Consumer because the API ConsumerRecord doesn't expose the metadata.

Makes sense, the existence of records and commits are unrelated.

flavienbert commented 1 year ago

Maybe we should put this method private: private[consumer] def metadata: Option[String] in the Offset trait ? To make sure a user doesn't try to get the metadata of an offset from the ConsumerRecord. Because It will always return None anyway otherwise.

erikvanoosten commented 1 year ago

Maybe we should put this method private: private[consumer] def metadata: Option[String] in the Offset trait ? To make sure a user doesn't try to get the metadata of an offset from the ConsumerRecord. Because It will always return None anyway otherwise.

Are you referring to a CommittableRecord?

I don't like that because it will mean that you can set the metadata but you can't read it back.

flavienbert commented 1 year ago

yes I am referring to it: https://github.com/zio/zio-kafka/blob/6e451121b148e4997f1318b5807e83b8d0e6fdf9/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala#L48

It's true you can't read it back but you shouldn't need to read it. The purpose is to publish an offset with metadata.

    Consumer
      .plainStream(Subscription.topics("random"), Serde.long, Serde.string)
      .tap(r => Console.printLine(r.value))
      .map(_.offset)
      .aggregateAsync(Consumer.offsetBatches)
      .mapZIO(_.commit)
      .drain

In this code for sample a user can believe he can retrieve the metadata offset of the CommittableRecord using _.offset.metadata but in fact it will always return None. We should only able to use _.offset.withMetadata("metadata") from my point of view

erikvanoosten commented 1 year ago

Okay, I am changing my mind 🙂 The offset method from CommitableRecord should indeed not have a metadata field because that doesn't make sense. Unfortunately, that also means that the withMetadata method (the basis of this PR) doesn't make sense. (Off topic, there is more in Offset that doesn't make sense. In particular, there shouldn't be a commit method in it. @guizmaii is trying to change this part of zio-kafka for a good reason.)

What about adding methods commitWithMetadata and commitOrRetryWithMetadata?

flavienbert commented 1 year ago

Agree with you commit method shouldn't be here. I am not sure commitWithMetadata could work because we usually use commit after .aggregateAsync(Consumer.offsetBatches) this means we call commit on the most recent offset.

But in my case I need to set the metadata for each offset, and then the most recent offset is commit with the metadata set earlier for sample:

     ...
     offsetOpt  <- offsetState match {
                                 case Committable =>
                                    ZIO.some(record.offset)

                                 case CommittableWithMetadata =>
                                   StorageService
                                       .put(state, record.offset.topic, record.offset.partition)
                                       .map(versionId => Some(record.offset.withMetadata(versionId)))

                                 case NotCommittable => ZIO.none
                               }
        } yield offsetOpt
      }
      .map(_.toSeq)
      .flattenIterables
      .aggregateAsync(Consumer.offsetBatches)
      .mapZIO(_.commit)

And what about having this model:

sealed trait Offset {

  type Self <: Offset

  def topic: String
  def partition: Int
  def offset: Long
  def consumerGroupMetadata: Option[ConsumerGroupMetadata]

  /**
   * Converts this instance of Offset to a CommittableOffset (increment the offset)
   */
  def toCommittableOffset: CommittableOffset

  final lazy val topicPartition: TopicPartition = new TopicPartition(topic, partition)
}

sealed trait CommittableOffset extends Offset {

  type Self <: CommittableOffset

  def commit: Task[Unit]
  def batch: OffsetBatch
  def addMetadata(metadata: String): Self
  def metadata: Option[String]

  private[consumer] def asJavaOffsetAndMetadata: OffsetAndMetadata = new OffsetAndMetadata(offset, metadata.orNull)

  /**
   * Attempts to commit and retries according to the given policy when the commit fails with a
   * RetriableCommitFailedException
   */
  final def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): RIO[R, Unit] =
    Offset.commitOrRetry(commit, policy)
}

With this model, we could use Offset trait for the offset get from the CommitableRecord and then if you want to commit or add metadata you should convert this Offset to a CommitableOffset which will be able to commit. And we could introduce an implicit conversion from Offset to CommitableOffset to avoid breaking changes.

erikvanoosten commented 1 year ago

Okay, I understand your use case, And I like your thinking. CommittableOffset is indeed a reasonable way to model this. This is a breaking change though, even with an implicit conversion it will not be binary compatible (we don't really have that as a goal so it could still be fine). Since not many people use metadata for commits, IMHO source compatibility is required.

@guizmaii WDYT?

flavienbert commented 1 year ago

I published a version with CommittableOffset. To be honest I like this new version. I think it's much more understandable. There is different models between offsets get from the consumer and offsets that you want to commit.

However, there is one breaking change: you have to explicitly convert ReadOnlyOffset to CommitableOffsetin order to commit offsets: offset.nextCommittableOffset.commit. But I prefer explicit conversion this way users can understand how it works internally.

@erikvanoosten let me know your point of view.

flavienbert commented 12 months ago

@erikvanoosten any updates? Could be great to have the metadata commitment feature merged.

erikvanoosten commented 12 months ago

@erikvanoosten any updates? Could be great to have the metadata commitment feature merged.

I have asked @svroonland and @guizmaii to also take a look at this PR.

Since we want to change this API in near future, and changing this many times is not nice for our users, for me source compatibility would be essential. Perhaps the other committers see this differently...

erikvanoosten commented 11 months ago

@flavienbert master changed quite a bit, would you like me to rebase this branch for you?

flavienbert commented 11 months ago

I rebased my branch, But the CI fail for sbt ciGenerateGithubWorkflow but it seems to be good

guizmaii commented 11 months ago

@erikvanoosten You merge once the CI pass :)

guizmaii commented 11 months ago

@flavienbert Thanks for your work :)

erikvanoosten commented 11 months ago

@flavienbert Could you run sbt fmt please?

erikvanoosten commented 11 months ago

Thanks for you patience @flavienbert ! This PR is now available in https://github.com/zio/zio-kafka/releases/tag/v2.6.0 🥳