nessos / Streams

A lightweight F#/C# library for efficient functional-style pipelines on streams of data.
http://nessos.github.io/Streams/
Other
382 stars 46 forks source link

Streams too eager? #16

Closed matthid closed 9 years ago

matthid commented 9 years ago

Is it by design that the following throws an IndexOutOfRangeException?

    let refs = [| 1; 2; 3; 4 |]
    let pages = [| [| 0; 3 |]; [| 0; 5 |]; [| 0; 1 |]|]
                |> Stream.ofArray
                |> Stream.flatMap Stream.ofArray
                |> Stream.map (fun ref -> refs.[ref])
                |> Stream.take 3
                |> Stream.toArray
    test <@ [| 1; 4; 1 |] = pages @>
eiriktsarpalis commented 9 years ago

Yes, this is by design. You would have to rearrange your combinators on your own there.

matthid commented 9 years ago

Thanks for the fast answer.

Would you be kind enough to give a rough explanation as I seem to be unable to get my head around it :(.

The docs state that everything is lazy until the end (toArray in this case). It seems hard to reason about side effects with this kind of design (I tried to implement a "toIOStream" and "ofIOStream" and failed miserably because of this)

matthid commented 9 years ago

This is no longer an issue as I found an alternative implementation of my two methods (which basically cache everything), however I would still be interested in the reasoning behind this.

eiriktsarpalis commented 9 years ago

It is lazy. You can verify this by adding a print side-effect on your map lambda. You might have also noticed that if you took 2 instead of 3 your code would have functioned. Streams will evaluate everything up to the element which will be discarded by take. This quirk is inherent to the underlying design of streams, and the way in which take is implemented in particular. This is not really a problem if your code is purely functional and exception safe.

matthid commented 9 years ago

Thank you! I can now see in the StreamEnumerator implementation how to do this correctly, and that it is in fact expected that TryAdvance can advance multiple items (I was a bit caught off-guard by this fact)

palladin commented 9 years ago

Hi matthid, Nessos.Streams lib is based on Java 8 Streams and the underlying execution model is a CPS push model. Although in java 8 your code will crash with indexoutofrange exception... for Nessos.Streams the composition of flatmap map with early termination combinators (like take) has special care to avoid such errors as you can see here https://github.com/nessos/Streams/blob/master/tests/Streams.Tests/StreamsTests.fs#L55.

palladin commented 9 years ago

To keep the long story sort, I'll reopen it because I'm definitely sure that it is a bug.

eiriktsarpalis commented 9 years ago

This is not a flatMap issue. For example

let refs = [| 1; 2; 3; 4 |]
[|1 .. 100|]
|> Stream.ofArray
|> Stream.map (fun i -> refs.[i])
|> Stream.take 3
|> Stream.toArray

has the same behaviour. This is totally reasonable given the underlying implementation of take.

palladin commented 9 years ago

Yep, the problem is that take is a little bit too eager.

palladin commented 9 years ago

Fixed in https://github.com/nessos/Streams/commit/25d5791150f5403287a144e03e8c025889cdfa7c Thanks!

matthid commented 9 years ago

Nice! So you can basically force an early exit with the CancellationToken even when there is a bulk operation running (assuming the generator is implemented properly). So I guess this can be closed.