akka / akka

A platform to build and run apps that are elastic, agile, and resilient. SDK, libraries, and hosted environments.
https://doc.akka.io
Other
13.06k stars 3.59k forks source link

DOC: Document feed-forward deadlock scenarios #17435

Open lancearlaus opened 9 years ago

lancearlaus commented 9 years ago

Issue A stream that fans out via Broadcast and fans in via Zip fails to complete when one of the intermediate branches contains a drop.

Example The following graph will fail to complete

    val source = b.add(Source(1 to 10))
    val bcast = b.add(Broadcast[Int](2))
    val drop = b.add(Flow[Int].drop(5))
    val zip = b.add(Zip[Int, Int])

    source ~> bcast ~>         zip.in0
              bcast ~> drop ~> zip.in1

Additional Detail Here's a snippet from a more complete test case covering different variations in an attempt to isolate the problem.

  // PASS
  def zipSource(num: Int, diff: Int) = Source() { implicit b =>
    import akka.stream.scaladsl.FlowGraph.Implicits._

    val source0 = b.add(Source(1 to num))
    val source1 = b.add(Source(1 to (num + diff)))
    val zip = b.add(Zip[Int, Int])

    source0 ~> zip.in0
    source1 ~> zip.in1

    (zip.out)
  }
  // PASS
  def dropSinkSource(num: Int, diff: Int) = Source() {  implicit b =>
    import akka.stream.scaladsl.FlowGraph.Implicits._

    val source = b.add(Source(1 to (num + diff)))
    val bcast = b.add(Broadcast[Int](2))
    val drop = b.add(Flow[Int].drop(diff))
    val sink0 = b.add(Sink.ignore)

    source ~> bcast ~> sink0
              bcast ~> drop

    (drop.outlet)
  }
  // FAIL for diff > 0
  def zipDropSource(num: Int, diff: Int) = Source() {  implicit b =>
    import akka.stream.scaladsl.FlowGraph.Implicits._

    val source = b.add(Source(1 to (num + diff)))
    val bcast = b.add(Broadcast[Int](2))
    val drop = b.add(Flow[Int].drop(diff))
    val zip = b.add(Zip[Int, Int])

    source ~> bcast ~>         zip.in0
              bcast ~> drop ~> zip.in1

    (zip.out)
  }

  // PASS
  "Zip" should "complete with same length streams" in {
    val future: Future[Int] = zipSource(10, 10).runWith(Sink.fold(0)((s, i) => s + 1))
    whenReady(future)(_ shouldBe 10)
  }

  // PASS
  it should "complete with different length streams" in {
    val future: Future[Int] = zipSource(10, 20).runWith(Sink.fold(0)((s, i) => s + 1))
    whenReady(future)(_ shouldBe 10)
  }

  // PASS
  "Sink with drop" should "complete with different length streams" in {
    val future: Future[Int] = dropSinkSource(10, 10).runWith(Sink.fold(0)((s, i) => s + 1))
    whenReady(future)(_ shouldBe 10)
  }

  // PASS
  "Zip with drop" should "complete with same length streams" in {
    val future: Future[Int] = zipDropSource(10, 0).runWith(Sink.fold(0)((s, i) => s + 1))
    whenReady(future)(_ shouldBe 10)
  }

  // FAIL
  it should "complete with different length streams" in {
    val future: Future[Int] = zipDropSource(10, 10).runWith(Sink.fold(0)((s, i) => s + 1))
    whenReady(future)(_ shouldBe 10)
  }
drewhk commented 9 years ago

No, there is no bug. This is a typical deadlock case that can be tricky to figure out.

Scenario description:

If you add a buffer stage in the non-dropping path between broadcast and zip, and set its OverflowStrategy to Fail, then this stream will properly fail with a BufferOverflowException if the gap between dropped elements and all elements are large enough.

The reason why this does not fail for the smaller diff cases is because the test case has to drop enough elements so that all the buffers of zip (or any stage between broadcast and zip) are not enough bridge the gap.

patriknw commented 9 years ago

Excellent description, @drewhk. Please add to docs.

drewhk commented 9 years ago

Changed to a documentation ticket

lancearlaus commented 9 years ago

Thank you for the prompt, clear response on this issue. Based on @drewhk comments, I updated the test case to the following and can confirm that it works as expected.

    val source = b.add(Source(1 to (num + diff)))
    val bcast = b.add(Broadcast[Int](2))
    val buffer = b.add(Flow[Int].buffer(Math.max(diff, 1), OverflowStrategy.backpressure))
    val drop = b.add(Flow[Int].drop(diff))
    val zip = b.add(Zip[Int, Int])

    source ~> bcast ~> buffer ~> zip.in0
              bcast ~> drop   ~> zip.in1
drewhk commented 9 years ago

Btw, in these cases I recommend using the buffer in Fail mode, so if deadlock would happen for some reason, it fails the stream instead.

lancearlaus commented 9 years ago

That's what I did initially based on your comments and it certainly worked for the simple test case. However, when used on real, but still simple, flows with more data, I encounter buffer overflows. After some experimentation and re-reading docs about internal buffers, here's what I surmise. Please correct my assumptions.

When using failure buffer overflow strategy, buffer size must account for:

I arrived at the following formula through a little reasoning and experimentation:

Once I hit this threshold, it works fine. Alternatively, I can use the stream diff and a back pressure overflow strategy.

Of course, if this is right, or reasonably right, sizing buffers like this doesn't seem appropriate as a general practice. I'd like an approach that solely relies on the stream difference, which is known and intrinsic to the flow.

What are the performance or other implications of sizing according to stream diff and using a back pressure overflow strategy? What are the reasons it would deadlock, as you mention?

lancearlaus commented 9 years ago

Circling back on this, I created a blog post that explains the issue I encountered along with the solution of using a balancing buffer.

http://blog.lancearlaus.com/akka/streams/scala/2015/05/27/Akka-Streams-Balancing-Buffer/

I hope it helps those who encounter the same issue.