Gabriella439 / pipes-concurrency

Concurrency for the pipes ecosystem
BSD 3-Clause "New" or "Revised" License
43 stars 12 forks source link

thread blocked indefinitely in an STM transaction (GHC 7.8.2) #27

Closed tonyday567 closed 7 years ago

tonyday567 commented 10 years ago

Code to reproduce:

import Pipes.Concurrent

main = do
    (_, input) <- spawn Unbounded
    atomically $ recv input
ocharles commented 10 years ago

Is this the failing test we're seeing in nixpkgs? The build log and test execution is at http://hydra.nixos.org/build/13890161/nixlog/1/raw

benmos commented 9 years ago

See also:

ivan-m commented 7 years ago

On my phone, so please excuse poor formatting.

Instead of withSpawn taking a continuation (Input a, Output a) -> m r, what about if we make it (Input a -> m i, Output a -> m o) -> m r and then bracket those two parts independently with their respective parts of seal?

ivan-m commented 7 years ago

So, nested continuations (which fails because of the fact that you can't actually individually run those two sub-continuations) didn't pan out, but what about this?

withBuffer :: Buffer a -> (Output a -> IO o)
           -> (Input a -> IO r) -> IO r
withBuffer buffer fOutput fInput = bracket
  (spawn' buffer)
  (\(_, _, seal) -> atomically seal)
  (\(output, input, seal) ->
    snd <$> concurrently (bracket (return ()) (const (atomically seal)) (const (fOutput output)))
                         (bracket (return ()) (const (atomically seal)) (const (fInput input))))

Using it:

λ> withBuffer unbounded (const (return ())) (atomically . recv)
Nothing

I think the distinction here is that you're requiring something to be done with both the input and output.

The main bikeshedding here is whether the input or the output should be last; my take is that - as it seems to be the input for future Pipes (the naming keeps confusing me) - we're more likely to want to keep going so have that as the final continuation.

Gabriella439 commented 7 years ago

Yeah, I like that and I think that will work reliably. In fact, I think that could completely eliminate the use of finalizers attached to weak references if we switched to withBuffer

Basically, the reason it works is because it attaches a seal to the end of each respective continuation to guarantee that seal must be called when the Output or Input goes out of scope, instead of relying on garbage collection to detect that

The only thing I'd suggest is simplifying the implementation a little bit to:

import Control.Concurrent.Async
import Control.Exception
import Pipes.Concurrent

withBuffer
    :: Buffer a
    -> (Output a -> IO l)
    -> (Input  a -> IO r)
    -> IO (l, r)
withBuffer buffer fOutput fInput = bracket
  (spawn' buffer)
  (\(_, _, seal) -> atomically seal)
  (\(output, input, seal) ->
    concurrently (fOutput output `finally` atomically seal)
                 (fInput  input  `finally` atomically seal)
  )

Let me see if I can translate all the tutorial examples to use withBuffer. If it works out, then I'll put up a pull request

ivan-m commented 7 years ago

The downside of returning (l,r) is that it doesn't match what your managed library expects.

ivan-m commented 7 years ago

And I have no idea why I tried using buffer within concurrently rather than just finally 🤦‍♂️

Gabriella439 commented 7 years ago

So I haven't had time to refactor the tutorial to use withBuffer, but I will still put up just the change to add withBuffer regardless so that people can begin to use this