nomisRev / kotlin-kafka

Kafka bindings for Kotlin `suspend`, and Kafka streaming operators for KotlinX Flow.
https://nomisRev.github.io/kotlin-kafka/
Apache License 2.0
106 stars 10 forks source link

Publisher scope #156

Closed nomisRev closed 1 year ago

nomisRev commented 1 year ago

Closes #149 and #147.

A PublishScope, that can offer message (doesn't await ack), or publish which is offer + ack. The block however waits all the offer inside, similar to coroutineScope and re-throws any failed offer.

With transaction block, that wraps the block in the correct transaction semantics and has same behavior of await offer. A transaction blocks cannot be nested, thanks @PublisherDSL.

publisher.publishScope {
  offer((1..10).map {
    ProducerRecord(topic.name(), "$it", "msg-$it")
  })
  publish((11..20).map {
    ProducerRecord(topic.name(), "$it", "msg-$it")
  })
  transaction {
    // transaction { } illegal to be called here DslMarker magic
    offer((21..30).map {
      ProducerRecord(topic.name(), "$it", "msg-$it")
    })
    publish((31..40).map {
      ProducerRecord(topic.name(), "$it", "msg-$it")
    })
  }// Waits until all offer finished in transaction, fails if any failed

  // looping
  (0..100).forEach {
    delay(100.milliseconds)
    val record = ProducerRecord(topic.name(), "$it", "msg-$it")
    offer(record)
  }

  // streaming
  flow(1..100)
    .onEach { delay(100.milliseconds) }
    .map { ProducerRecord(topic.name(), "$it", "msg-$it") }
    .collect { offer(it) }
}

See test for more examples