hoc081098 / FlowExt

FlowExt | Kotlinx Coroutines Flow Extensions | Kotlinx Coroutines Flow Extensions. Extensions to the Kotlin Flow library | kotlin-flow-extensions | Coroutines Flow Extensions | Kotlin Flow extensions | kotlin flow extensions | Flow extensions
https://hoc081098.github.io/FlowExt/docs/1.x
MIT License
334 stars 24 forks source link

Add time windowed chunking, `groupedWithin` #185

Open Dogacel opened 10 months ago

Dogacel commented 10 months ago

Similar to groupedWithin in Akka, I think this is a very useful utility to have.

Akka Docs

Proposed solution

fun Flow<T>.groupedWithin(size: Int, limit: Duration): Flow<List<T>> { ... }

implementation can be based on https://github.com/Kotlin/kotlinx.coroutines/issues/1290#issuecomment-1309290613

I have modified that code slightly, I can help work on a solution based on a channel flow.

Behavior

Why This is very useful when we are bridging the gap between the regular APIs and streaming APIs. For example, assume you have an API to fetch SQS messages, traditionally you would implement it as

suspend fun main() {
  val sqsClient = getSqsClient()

  while (true) {
    val items = sqsClient.poll(10)
    process(items)
    delay(10.seconds)
  }
}

instead, we can use

suspend fun main() {
  val sqsClient = getSqsClient()

  flow {
    while (true) {
      val items = sqsClient.poll(10)
      items.forEach { emit(it) }
      delay(10.seconds)
    }
  }.groupedWithin(128, 30.seconds) {
    process(it)
  }
}
hoc081098 commented 10 months ago

Thank @Dogacel 🙏 for this issue.

Is this operator similar to

Dogacel commented 10 months ago

Thank @Dogacel 🙏 for this issue.

Is this operator similar to

I haven't used RxJava but from the explanation, it looks like it is the bufferTime described in RxJava.