softwaremill / ox

Safe direct style concurrency and resiliency for Scala on the JVM
https://ox.softwaremill.com
Apache License 2.0
366 stars 25 forks source link

`flatten` method for `Stream[Stream[T]]` #197

Closed nimatrueway closed 1 week ago

nimatrueway commented 3 weeks ago

Is there any tool of such nature for Stream, I was writing a few examples for my company to compare different Scala streaming libraries (e.g. monix Observable, fs2 Stream, ox Source, etc.) that I run into this case.

Source.fromIterable(splitRange(request.startDate, request.endDate)) // = STREAM[(Instant, Instant)]
  .mapPar(4) { case (start, end) =>
    service.fetch(start, end) // = STREAM[T]
  } // = STREAM[STREAM[T]]
  .flatten // DOES NOT EXIST 😭

I was just wondering if this is overlooked or is there any solid reason as to not include flatten in Source api?

nimatrueway commented 3 weeks ago

I looked into how this can be implemented based on Source[T].merge, and wrote a quick draft implementation of my idea at https://github.com/softwaremill/ox/pull/198

I suspect that having a select that allows a mutable list of ReceiveClause so that we can add to the clauses as the parent source generates more children sources will boost performance 🤔