MfgLabs / akka-stream-extensions

Extensions for Akka Stream
Apache License 2.0
123 stars 27 forks source link

Shapless Extension Completes Before All Data has been passed through #30

Open natevecc opened 7 years ago

natevecc commented 7 years ago

I ran into an edge case when using the shapeless extensions with Flow.flatMapConcat and one of the flows in the coproductFlow had a broadcast and merge in it.

It seems the CoproductFlexiMerge does not handle the case where a sub flow might push more than one value to it's inlets well. In this setup I think it incorrectly sets itself as being complete after pulling a single value from the inlet and never processes the second output of my internal broadcast/merge graph. In a flow without flatMapConcat this wasn't a problem because the graph just kept running until all data was processed but flatMapConcat seems to shutdown the subgraph as soon as it receives the upStreamComplete message and data can be lost. Sorry I can't give a better explanation as to what's causing the problem. The internals of akka-streams are a mystery to me.

Here's a representation of the graph that caused the problem

Source(foreverData) -> flatMapConcat(data)
  Source.single(data) -> coproductFlow
    -> normal flow ->
                 /-> flow ->\
    -> broadcast             merge -> (this output will now have two values)
                 \-> flow ->/
Timshel commented 7 years ago

Hi, I'll try to have a look before the end of the week.