akka / akka

Build highly concurrent, distributed, and resilient message-driven applications on the JVM
https://akka.io
Other
13.03k stars 3.59k forks source link

document that Akka Streams perform element pre-fetch, and how that works #18092

Open AnIrishDuck opened 9 years ago

AnIrishDuck commented 9 years ago

The following fails:

import org.reactivestreams._

val count = 4L
var requested = 0L

val pub = new Publisher[Long]() {
  override def subscribe(s: Subscriber[_ >: Long]) = s.onSubscribe(new Subscription() {
    def cancel() = { }
    def request(l: Long) = {
      requested += l
      (0L until l).foreach(s.onNext)
    }
  })
}

val sub = new Subscriber[Long] {
  var sub: Subscription = _

  var received = 0L
  var donePromise = Promise[Unit]
  var done = donePromise.future

  override def onSubscribe(s: Subscription) = {
    sub = s
    sub.request(count)
  }

  override def onError(t: Throwable) = { }
  override def onComplete() = { }
  override def onNext(e: Long) = { received += 1 ; if(received == count) donePromise.success(Unit) }
}

Source(pub).log("MIDDLEMAN").runWith(Sink(sub))

Await.result(sub.done, 1.second)

requested ==== count

Unless you remove .log("MIDDLEMAN"). It appears that .log() will preemptively request elements. This can complicate debugging complex scenarios involving precise request() and cancel() sequences, as it interrupts the true flow of data.

ktoso commented 9 years ago

This (as well as https://github.com/akka/akka/issues/18091) highlight that we did not document pre-fetching enough I guess. The fact is, every stage in Akka Streams does pre-fetching as related to it's buffer size. You'll find pre-fetch behaviours in all stages, not just log. So the question becomes - how should we properly document it or otherwise explain, because I don't think dropping the pre-fetch mechanisms is an option AFAICS.

AnIrishDuck commented 9 years ago

OK. I hope I didn't miss some documentation somewhere, but I obviously wasn't aware that pre-fetching is a thing.

It's probably worth noting that I'm running into these issues when writing tests, not in production. I'm writing custom stages and would like to test pretty specific scenarios to rule out certain concurrency issues. Having some way to turn pre-fetching off for certain stages would solve my issue. At the moment I'm creating bespoke Publisher and Subscriber objects which feels wrong.

Or maybe more precise methods of control over backpressure / completion / cancellation in a testing environment? It's possible that having special testing methods / stages where I can gate elements would solve my problem.

rkuhn commented 8 years ago

Depending on request counts is brittle and should be avoided: the signaled demand is just a means, not an end, and ideally the Subscriber will auto-tune the amount of buffering it performs in order to reach performance goals (like maximizing throughput or minimizing latency).