monix / monix

Asynchronous, Reactive Programming for Scala and Scala.js.
https://monix.io
Apache License 2.0
1.93k stars 245 forks source link

Add Observable.mapAsyncOrdered for parallel mapping over an observable #329

Closed lukestephenson closed 5 years ago

lukestephenson commented 7 years ago

I've briefly discussed this on gitter before, but can't find a search feature in gitter to keep track of the conversation so moving to github.

I'm already making use of the awesome Observable.mapAsync but would love it if it would preserve the order of the elements.

Here is a use case as to why I'd like this operator. I'm processing a large dataset which can potentially fail. As the processing progresses, I'd like to periodically output the id of the record which was last successfully processed. If the processing fails, I can then continue from the last id that was successfully processed (the source of my data provides the ability to resume from a known point).

If the Observable is using mapAsync, the ordering is no longer preserved and I can't easily record what the id of the last processed record is.

If ordered parallel operations is painful, I'd be open to other patterns or suggestions to handle.

And for reference, this is the equivalent functionality in akka-streams http://doc.akka.io/docs/akka/2.4/scala/stream/stages-overview.html#mapAsync (The default in akka is order preserving).

He-Pin commented 7 years ago

mapAsyncUnordered as akka?

alexandru commented 7 years ago

@lukestephenson I understand the use-case. Well, it's a little difficult to implement. Any idea on the semantics involved? For example, is the Akka implementation equivalent with the following code and can you check?

observable.bufferIntrospective(parallelism).flatMap { batch =>
  // Processing batch in parallel
  val task = Task.gather(batch.map(f))
  Observable.fromTask(task).flatMap(Observable.fromIterable)
}

This is similar to the approach highlighted here: https://monix.io/docs/2x/tutorials/parallelism.html#imposing-a-parallelism-limit

What this does is to build a source that emits batches and then process a batch in parallel, one by one. Preserving order in this case is OK, because you only care about a finite batch at a time.

However, it has a downside: if one of those batches triggers a slow Task that takes a long time to finish, it will block pending batches. So if you have say parallelism = 10, what this means is that this will be a maximum limit, instead of actual active workers, because due to uneven execution times you can end up with fewer tasks being active at any given moment.

But this kind of goes with the territory of wanting ordering - because we can't do unlimited buffering, regardless of implementation, at some point the implementation will have to freeze progress in order to wait on some slow Task to finish.

Does this make sense? Is this acceptable?

alexandru commented 7 years ago

@hepin1989 our current implementation is unordered because IMO it's the more useful variant. We can add a mapAsyncOrdered if there's interest.

He-Pin commented 7 years ago

That's okay,just need a mind shift here :)

lukestephenson commented 7 years ago

@alexandru Thanks for the suggestion

I don't think bufferIntrospective will work in my case. I'm expecting the publishing of elements to be slower than the consumption, so I don't think it would back pressure enough to cause the batch processing piece to occur in parallel. That said, I could use bufferTumbling to obtain a batch for processing. In the interests of simplicity, I'm happy to go with this approach.

With regard to the operator if we decide to implement.

Waiting on a slow task at some point is acceptable (and can't be avoided to guarantee ordering). A buffer which is larger than the level of parallelism can in most cases ensure the desired level of parallelism is achieved if a few tasks finish out of order internally.

I took a look at the akka implementation - https://github.com/akka/akka/blob/5edead2bda89e5c1bcaa39ae17578d6d86fa03ed/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala#L1138.

For each task that is executed in parallel, it is first added to a Buffer, after first being wrapped in an object Holder which tracks if the task as finished. When a task finishes, it updates it's Holder to flag completion. Then it can peek at the head of the Buffer and if the head is completed, it can push that onto the next stage (and continue through the Buffer until the head is no longer a completed task).

I'm considering trying to implement and submit a PR, but this may not be the easiest introduction to monix internals. I first wanted understand the existing MapAsyncParallelObservable, and am currently at the point of trying to understand why it uses a AsyncSemaphore rather than a TaskSemaphore (because it appears to then reimplement some of the logic of TaskSemaphore). But that's a separate discussion.

alexandru commented 7 years ago

@lukestephenson ah, very interesting.

So one possible and simple implementation would be to use a buffer onto which we push Future references (or something similar) and then on the other side we keep waiting on the head to be completed in order to signal it downstream.

Yes, it's not exactly an easy operator to implement. You can usually start with something that works but that is not that well optimized and then work your way from there through pain and sheer will :-)

source.map(x => task(x).runAsync) // yields Futures
  .whileBusyBuffer(BackPressure(parallelism))
  .mapFuture(f => f)

In the meantime, speaking of that substitute, make sure you read: https://monix.io/docs/2x/tutorials/parallelism.html

alexandru commented 7 years ago

@lukestephenson so you can try your hand at a PR if you think you can do it. I'll gladly help you and review it, just as a word of warning, when working with concurrency, there's some pain involved, you won't get it correct from the start.

Or if you think you'd like some simpler operator, open other tickets 😄

kellerkindt commented 7 years ago

I am now also in a position where I need a mapAsync that preserves the order. Is there any ETA / update regarding it?

lukestephenson commented 7 years ago

I never got around to attempting it. Went with one of the workaround suggested by @alexandru

alexandru commented 7 years ago

That workaround should be pretty OK too, it's what the logic should be anyway, although in a specialized implementation we might be able to eliminate some overhead.

erickpintor commented 7 years ago

I'm also interested on such operator. Here is the pattern I'm using right now:

val semaphore = AsyncSemaphore(maxParallelism)

source
  .map(x => semaphore.greenLight(() => run(x).runAsync))
  .asyncBoundary(OverflowStrategy.BackPressure(maxBufferSize))
  .mapFuture(identity)

I've experienced a better performance overall, with the cost of more memory when there is a slow future on the stream.

ericwush commented 7 years ago

+1 for needing a mapAsync preserving order

alexandru commented 6 years ago

Assigning this the 3.0.0 milestone — at this point a less than perfect implementation is better than none and in the beginning we could just do the buffer in combination with mapFuture.

Avasil commented 6 years ago

It looks like cool issue, I could try it but looking at the implementation of MapParallelUnorderedObservable it will probably take me some time and lots of questions. :P