I wanted to use commitBatch which suppose use underlying chunks. My stream is already chunked because I try to commit offsets already grouped for producer:
val run = {
val producerSettings: fs2.kafka.ProducerSettings[cats.effect.IO, String, String] = ???
val consumerSettings: fs2.kafka.ConsumerSettings[cats.effect.IO, String, String] = ???
KafkaProducer.stream(producerSettings)
.flatMap { producer =>
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.records
.map { committable =>
val key = committable.record.key
val value = committable.record.value
val record = ProducerRecord("topic", key, value)
ProducerRecords.one(record, committable.offset)
}
.evalMap(producer.produce)
.groupWithin(500, 15.seconds)
.evalMap(_.sequence)
.evalMap { chunk =>
val offsetsChunk: fs2.Chunk[CommittableOffset[effect.IO]] = chunk.map(_.passthrough)
offsetsChunk
}.through(
??? // What to put here.
)
}
.compile.drain
}
In general I don't see real example in docs for using passthrough of producer to commit consumed offsets.
Hi, Consumer documentation mention
commitBatch
,commitBatchOption
. However, they do not exist in the code: https://github.com/fd4s/fs2-kafka/search?q=commitBatchOptionI wanted to use commitBatch which suppose use underlying chunks. My stream is already chunked because I try to commit offsets already grouped for producer:
In general I don't see real example in docs for using passthrough of producer to commit consumed offsets.