fsprojects / FSharpx.Async

Asynchronous programming utilities for F#
http://fsprojects.github.io/FSharpx.Async/
Other
94 stars 31 forks source link

CircularQueueAgent blocks all readers when request to dequeue is larger than maxLength #28

Open sgoguen opened 9 years ago

sgoguen commented 9 years ago

The following code reproduces the issues:

#r "./bin/Debug/FSharpx.Async.dll"

//  Let's use a curcular queue agent to distribute work
let queue = FSharpx.Control.CircularQueueAgent<string>(5)

//  Let's create a simple reader process
let reader(name, taking,delay) = 
  async {
    printfn "Starting Reader"
    try
      while true do
        let! values = queue.AsyncDequeue(taking)
        for value in values do
          printfn "%s: Reading %s" name value
        do! Async.Sleep(delay)
    finally
      printfn "Stopped Reader"
  } |> Async.Start

//  And a function to continually add items
let addItems(x) = 
  async {
    for x in [1..x] do
      do! queue.AsyncEnqueue([|sprintf "Hi %i" x |])
  } |> Async.Start

//  This works fine
addItems(500)
System.Threading.Thread.Sleep(2000)
reader("Hao", 1, 100)
System.Threading.Thread.Sleep(2000)
reader("Bob", 5, 1000)
System.Threading.Thread.Sleep(2000)
//  This is where everything stops
reader("Xen", 10, 1000)