fsprojects / FSharp.Control.TaskSeq

A computation expression and module for seamless working with IAsyncEnumerable<'T> as if it is just another sequence
MIT License
91 stars 7 forks source link

chunkBySize #258

Open BrianVallelunga opened 2 months ago

BrianVallelunga commented 2 months ago

I see chunkBySize is supposed to be added. Is there any guidance or reference for patching this in myself? I've come across a use for it in a pipeline.

Thanks

BrianVallelunga commented 2 months ago

This code seems to work, but I feel like it could be simplified.

let chunkBySize chunkSize (source: TaskSeq<_>) =
    let enumerator = source.GetAsyncEnumerator()

    let checkChunkIndexAndMoveNextAsync chunkIndex =
       if chunkIndex < chunkSize then
           enumerator.MoveNextAsync()
       else
           ValueTask.False

    taskSeq {
        let nextChunk () =
            taskSeq {
                let res = Array.zeroCreate chunkSize
                res[0] <- enumerator.Current
                let mutable i = 1

                while! (checkChunkIndexAndMoveNextAsync i) do
                    let current = enumerator.Current
                    res[i] <- current
                    i <- i + 1

                if i = chunkSize then
                    res
                else
                    Array.sub res 0 i
            }

        while! enumerator.MoveNextAsync() do
            yield! nextChunk()
    }
abelbraaksma commented 2 months ago

Hey @BrianVallelunga, thanks for reporting this! Indeed, chunkBySize was (and is) on the list of functions to be added from the Seq.xxx functions (i.e., where we try to have the same surface area as F#'s Seq as we can), but we didn't yet get to it (no green checkmark in the overview, sorry).

Your code is not wrong per se. But you'd want to put the let enumerator = source.GetAsyncEnumerator() inside the taskSeq and use use, not let (this will only work in taskSeq or task, outside it, the use is not recognized). Otherwise, the lifecycle of the enumerator is not guarded by the taskSeq CE and this may lead to unwanted side effects.

You can also add a finalize by hand.

I'll add this function soon(ish). I may have some time for it this weekend, but it's been a bit busy lately.