fsprojects / FSharp.Control.AsyncSeq

Asynchronous sequences for F#
https://fsprojects.github.io/FSharp.Control.AsyncSeq/
Other
160 stars 59 forks source link

AsyncSeq.groupBy produces sequence which behaves like "cold" even if source is "hot" #89

Closed vchekan closed 6 years ago

vchekan commented 6 years ago

Description

I use AsyncSeq.groupBy and inside the group, when group is created, I get first message of sequence (AsyncSeq.tryFirst)to initialize business logic ( I am merging 2 kafka streams and offsets for stream2 are in stream1). Surprisingly, when I get first message and continue, I am getting "first" message again. In the code below, it fails in groupBy test with message

   Expected: not equal to <Some(1)>
  But was:  <Some(1)>

Repro steps

dotnet test --filter "AsyncSeq.groupBy should not restart"

[<Test>]    
let ``AsyncSeq.groupBy should not restart sequence but continue``() =
  //
  // "cold source will repeat 1st element: this is expected"
  //
  let coldSrc: AsyncSeq<int> = 
    asyncSeq {
      yield 1
      yield 2
      yield 3
      yield 4
    }
  async {
    let! first = coldSrc |> AsyncSeq.tryFirst 
    let! second = coldSrc |> AsyncSeq.tryFirst 
    Assert.AreEqual(Some(1), first)
    // Seq is restarted, so we get "1" again
    Assert.AreEqual(Some(1), second)
  } |> Async.RunSynchronously

  //
  // Implement "hot" source
  //
  let hotSource (src: AsyncSeq<int>): AsyncSeq<int> =
    use enum = src.GetEnumerator()
    let rec loop(): AsyncSeq<int> = asyncSeq {
      let! next = enum.MoveNext()
      match next with
        | Some next' ->
          yield next'
          yield! loop()
        | None -> ()
    }
    loop()

  let hotSrc: AsyncSeq<int> = 
    asyncSeq {
      yield 1
      yield 2
      yield 3
      yield 4
    }
    |> hotSource

  async {
    let! first = hotSrc |> AsyncSeq.tryFirst 
    let! second = hotSrc |> AsyncSeq.tryFirst 
    Assert.AreEqual(Some(1), first)
    // With "hot" source, we get 2nd element
    Assert.AreEqual(Some(2), second)
  } |> Async.RunSynchronously

  //
  // Test behaviour of sequence inside groupBy
  //
  let groupBySource: AsyncSeq<int> = 
    asyncSeq {
      yield 1
      yield 2
      yield 3
      yield 4
    }
    |> hotSource

  let res = 
    groupBySource
    |> AsyncSeq.groupBy(fun s' -> s' % 2)
    |> AsyncSeq.mapAsyncParallel(fun (_group, subseq) -> async {
      let! first = 
        subseq
        |> AsyncSeq.tryFirst
      let! second =       
        subseq
        |> AsyncSeq.tryFirst
      Assert.AreNotEqual(first, second)
      return! subseq |> AsyncSeq.toListAsync
    }

    )
    |> AsyncSeq.toList
    |> List.sortBy(fun a -> a.[0])

  let expect = 
    [
      [1; 3]
      [2; 4]
    ]

  Assert.AreEqual((expect |> sprintf "%A"), (res |> sprintf "%A"))

Expected behavior

Is source sequence is "hot" I would expect groupBy to not change it.

Related information

eulerfx commented 6 years ago

This is a consequence of the way groupBy is implemented - it uses a TaskCompletionSource as an Async ref, and of course Task is idempotent/monotonic causing the grouped sequences to be effectively cached.