Gabriella439 / pipes-concurrency

Concurrency for the pipes ecosystem
BSD 3-Clause "New" or "Revised" License
43 stars 12 forks source link

Implement Pipes.Concurrent.Broadcast #17

Closed kvanbere closed 3 years ago

kvanbere commented 10 years ago

Implementation for #12

I wrote a test, but the testsuite goes haywire on Windows (no terminal output and every test fails? Maybe that's a fault on my end..).

Make sure you test it if you can.

I'm actually reasonably happy with the result, but be warned, I didn't make an effort to optimize it.

kvanbere commented 10 years ago

Maybe I should implement a way to kill this nicely? At the moment Broadcast can only be closed on the receiver end and there is no killswitch like the other chans. A killswitch would be useful because the server needs to be able to end all the forked clients somehow.

Gabriella439 commented 10 years ago

I just realized that my original type for dupBroadcast was slightly wrong and should actually be an STM action instead of IO. The reason why is that you want to use STM to avoid race conditions that arise from the order in which an element is written to an Input and a new Output is generated, since that would effect whether or not the new Output received the element.

My next comment is just to use named fields for the Broadcast type instead of a tuple to help internally document their purpose for people reading the source code. It's also the coding style I prefer. You don't have to use the field names in the code, though.

Also, I would rename spawnBroadcast' to just broadcast and skip the spawnBroadcast without the seal action. The only reason I have separate spawn and spawn' functions in pipes-concurrency is just for backwards compatibility since I came up with spawn first. If I were to break backwards compatibility I would just replace spawn with spawn' completely.

Similarly, rename dupBroadcast to subscribe.

Other than those changes, your code looks pretty good. If you can make those fixes then I will merge it in.

kvanbere commented 10 years ago

Cheers! I was getting afraid my code wasn't good enough, but I'll promptly make the changes tonight.

Gabriella439 commented 10 years ago

Your code was just fine! I was just unavailable because I'm on vacation in Vietnam and I didn't have internet access the last few days while traveling to the countryside. That's also why my work is generally sporadic until I return home in February.

kvanbere commented 10 years ago

Okay, I've done everything mentioned above.

I can't get it to work though, and it's puzzling. On my windows laptop I use:

import Control.Concurrent hiding (yield)
import Control.Concurrent.Async
import Pipes
import Pipes.Concurrent
import Pipes.Concurrent.Broadcast
import qualified Pipes.Prelude as P

testBroadcast :: IO ()
testBroadcast = do
    (feed, bc, se) <- broadcast
    eat1 <- atomically $ subscribe bc
    eat2 <- atomically $ subscribe bc

    t1 <- async $ do
        runEffect $   fromInput eat1
                  >-> P.chain (const $ putStrLn "Recieved 1")
                  >-> P.print
        performGC

    t2 <- async $ do
        runEffect $   fromInput eat2
                  >-> P.chain (const $ putStrLn "Recieved 2")
                  >-> P.print
        performGC

    runEffect $   P.stdinLn
              >-> toOutput feed

    putStrLn "done"

main = testBroadcast

When I type things I don't even get a "Recieved 1" or "Recieved 2", nothing happens.

I can't figure out what's going on...

kvanbere commented 10 years ago

Doesn't seem to work on OSX either, meaning it's not a Windows thing -- it just doesn't work. :(

Gabriella439 commented 10 years ago

Also, I noticed there is one other thing we forgot to consider. Right now the Output has no way of knowing that there are no more listeners downstream. To fix this, you need to attach a seal action to run when the Broadcast gets garbage collected.

kvanbere commented 10 years ago

I don't think the broadcaster needs to know about who is subscribed, and likewise, it definitely shouldn't be destroyed if the number of subscribers reaches zero (you just broadcast into the void until someone connects). As sealing the chan wouldn't cause it to be garbage collected any faster, I don't think there is any benefit in doing so (?), unless I'm missing something.

Marvelous job on the typo, by the way. That was a sneaky one. It's all working good on my end now.

Gabriella439 commented 10 years ago

I know that TChans won't fill up if there are no subscribers, but if the Broadcast and all associated Inputs are garbage collected then there is no possible way that anything will ever read from the TChan every again, and that's useful information to pass along to the broadcaster.

Also, if we ever generalize this to work with TBChans later on then it will be necessary to detect when there can never be any more subscribers, otherwise you risk triggering an STM deadlock by writing to a full TBChan that will never be read from. So it's worth getting correct in the long run.

Note that I'm not saying that it should be destroyed if the number of subscribers reaches zero, but rather when the Broadcast and all associated Inputs get garbage collected (I made a mistake when I said that it should seal when the Broadcast is collected). You can have zero subscribers but still keep a reference to the Broadcast, which will keep the TChan alive because it will then know that there is still a potential to get a new subscriber later on. If the Broadcast gets garbage collected and all associated Inputs get garbage collected then that's a proof that nobody can ever read from the TChan ever again.

kvanbere commented 10 years ago

Oh, I think I understand now. You're talking about a case where you keep the Output, but scrap the Broadcast and have no subscribers, so it's impossible to subscribe, hence writing is useless?

Is this correct?

I'll implement this, and for bonus points, I'll even implement something like Pipes.Concurrent where you can make multiple modes -- Unbounded and Bounded, the latter for TBChan, which is a breeze to implement.

kvanbere commented 10 years ago

Just realised TBChan doesn't have a broadcast mode.

kvanbere commented 10 years ago

I tried to implement this, but it kind of becomes disgusting. I think it requires tracking the number of open Inputs and I can't really think of a use case. Maybe it's better if you implement it.

Edit: Although maybe there is a usage for knowing the number of subs...

Gabriella439 commented 10 years ago

So what we can do for now is not include the feature just yet and then add it as an enhancement in a later patch without breaking backwards compatibility. Just give me a little bit more time to review your submission one last time.

Gabriella439 commented 10 years ago

Allow me to clarify. We can add the broadcast functionality and make it so that the generated Output always returns True (i.e. it doesn't track open Inputs or the Broadcast or anything) and then later on if we figure out a good way to track these using garbage collection then just add it and it will be a transparent upgrade.

Gabriella439 commented 10 years ago

Okay, so this looks good. I only have two more requests. Could you merge this functionality into the main Pipes.Concurrent module under a Broadcast section and then rebase your changes on top of master? If you do those two things then I'm ready to merge this in.

kvanbere commented 10 years ago

I'm kind of making a mess of this locally. Theres a conflict in test-main.hs from the New thing, do I just fix it up and add another commit?

kvanbere commented 10 years ago

My git-fu wasn't strong enough to make this automatic D:

It's now in my fork in master (cleaned it up with reset to a single commit). The test stuff I wrote disappeared; oh well.

Gabriella439 commented 10 years ago

It's alright. If you want, I can just merge in your changes myself and mark you as the commit author.

kvanbere commented 10 years ago

I'm fine with that, and I'm not really fussed about authorship as long as the features make them in, because I need them for my next project.

Gabriella439 commented 10 years ago

Give me some more time to review this. I just want to be sure that this is the best approach because it's a significant addition to the API.

Gabriella439 commented 10 years ago

So the reason I delayed this is because I noticed there might be a simpler implementation that doesn't use TChans at all:

import Control.Concurrent.STM
import Data.Monoid
import Pipes
import Pipes.Concurrent

type Broadcast a = TVar (Output a)

broadcast :: IO (Output a, Broadcast a)
broadcast = do
    tvar <- newTVarIO mempty
    let output = Output $ \a -> do
            o <- readTVar tvar
            send o a
    return (output, tvar)

subscribe :: Buffer a -> Broadcast a -> IO (Input a)
subscribe buffer tvar = do
    (output, input) <- spawn buffer
    atomically $ modifyTVar' tvar (mappend output)
    return input

I haven't tested this yet, but it seems like it would workd. As a bonus, if it did work this implementation gets all the garbage collection stuff correct for free. Another bonus is that downstream subscribers can each use separate Buffer types.

Since Inputs are also Monoids, this suggests that we can also reflect this design pattern with the following types:

type Funnel a = TVar (Input a)

funnel :: IO (Funnel a, Input a)

feed :: Buffer a -> Funnel a -> IO (Output a)

What do you think?

Gabriella439 commented 10 years ago

Here's the full sketch for what the Broacast and Funnel code would look like:

import Control.Concurrent.STM
import Data.Monoid
import Pipes
import Pipes.Concurrent

newtype Broadcast a = Broadcast { unBroadcast :: TVar (Output a) }

broadcast :: STM (Output a, Broadcast a)
broadcast = do
    tvar <- newTVar mempty
    let output = Output $ \a -> do
            o <- readTVar tvar
            send o a
    return (output, Broadcast tvar)
{-# INLINABLE broadcast #-}

subscribe :: Buffer a -> Broadcast a -> IO (Input a)
subscribe buffer (Broadcast tvar) = do
    (output, input) <- spawn buffer
    atomically $ modifyTVar' tvar (mappend output)
    return input
{-# INLINABLE subscribe #-}

newtype Funnel a = Funnel { unFunnnel :: TVar (Input a) }

funnel :: STM (Funnel a, Input a)
funnel = do
    tvar <- newTVar mempty
    let input = Input $ do
            i <- readTVar tvar
            recv i
    return (Funnel tvar, input)
{-# INLINABLE funnel #-}

widen :: Buffer a -> Funnel a -> IO (Output a)
widen buffer (Funnel tvar) = do
    (output, input) <- spawn buffer
    atomically $ modifyTVar' tvar (mappend input)
    return output
{-# INLINABLE widen #-}
kvanbere commented 10 years ago

Hi Gabriel,

That looks great. Are you sure that usage of mappend isn't going to slow down the system slowly over time? Otherwise, the new version looks faster (less transactions) and more straightforward.

kvanbere commented 10 years ago

One thing I picked up is that this fills up the channels in linear time O(n) (n : # subscribers) whereas a broadcasting TChan uses a linked list which appends to every subscriber at once in O(1) because the subscribers use a cursor on a single buffer rather than many separate buffers.

If there's 1000 subscribers, this is going to be relatively slow or even unusable for the purpose many would want to use this for including myself, such as passing messages around to many sockets in a server. In some circumstances, where sockets would need to reply to messages in response to others, this would cause a single action to explode into an echoing O(n^2) complexity.


TChan implementation: http://hackage.haskell.org/package/stm-2.4.2/docs/src/Control-Concurrent-STM-TChan.html#newBroadcastTChan

Monoid instance for Output: https://github.com/Gabriel439/Haskell-Pipes-Concurrency-Library/blob/master/src/Pipes/Concurrent.hs#L101

Gabriella439 commented 10 years ago

Yeah, you're right. Alright, then in that case we will go with your original implementation which will be more efficient and conservative with memory. If you can get it to build against master one last time then I will merge this in.

kvanbere commented 10 years ago

Your implementation would still work and likely be faster than using generic broadcasting STM channels, but you'd need to tweak it to use a linked list of TVars with cursors. If you're not interested in doing something like this, I'll get my branch ready.

I was talking on IRC the other day and this is an exciting flagship feature. Let's get it done right ;)

Gabriella439 commented 10 years ago

So the issue with my approach is that there is a separate buffer for each subscriber and the broadcaster has no buffer at all. This means that you do any better than O(N) broadcasts using my approach because you have a stash a separate copy of the outgoing element in each subscriber's buffer. The only way to get O(1) broadcasts is to keep the buffer with the broadcaster instead of the subscribers, but that's what the TChan-based approach already does. The main thing that needs fixing is getting the garbage collection behavior correct so that the broadcaster knows when to stop broadcasting (i.e. no more references to either the Broadcast or all downstream Inputs). I'm pretty sure this can be implemented, though, so give me some time to study the TChan-based approach more closely

kvanbere commented 10 years ago

bump

Gabriella439 commented 10 years ago

I haven't forgotten about this. I just got back from vacation and I'm recovering from heavy jet lag before starting my new job, so I'm laying low for a few days.

kvanbere commented 10 years ago

Just reminding you we should figure out what we want to do with the code lying around.

k0001 commented 10 years ago

I wonder what's the status of this. Would it be of any help if I take @kvanberendonck's changes and attempt to implement @Gabriel439's latest sugestion about getting garbage collection right?

Gabriella439 commented 10 years ago

Yeah, if you can that would be a great help. I'm slowly catching up with my libraries so I should be able to help out soon, too.

k0001 commented 10 years ago

Great. I'll try doing that then.

kvanbere commented 10 years ago

(Poke)

I would like to give this another shot from scratch if @k0001 isn't going to. Could we make a small list of requirements?

Gabriella439 commented 10 years ago

Sorry about that! I got distracted by the mvc work. I will work on this.

k0001 commented 10 years ago

@kvanberendonck @Gabriel439 I'm sorry. Frankly, I haven't spent any time on this since my last comment, I've been working on other things.

Gabriella439 commented 10 years ago

I have to delay this until after this weekend because I need to revise some figures for a paper this weekend.

kvanbere commented 10 years ago

Good stuff. If there's anything I can do to help, I finished my last exam today so just drop a line.

Gabriella439 commented 10 years ago

Besides re-merging your work, the other thing I wanted to do was to see if I could get the termination signaling to work so that things shut down on time.

tonyday567 commented 10 years ago

I needed some sort of broadcast ability in an MVC project and played around with Gabe's specification. It worked pretty well for my use case, which doesn't involve many clients, and was very straightforward using the funnel and widen functionality. Here's a TCP server for instance:

import           Control.Applicative
import           Control.Monad
import           Control.Concurrent (threadDelay)
import           MVC
import           MVC.Prelude as MVC
import qualified Pipes.Network.TCP as TCP
import Data.ByteString (ByteString)
import Control.Concurrent.Async
import Control.Concurrent.STM

broadcastServer :: (String, String) -> Managed (View ByteString, Controller ByteString)
broadcastServer (h,p) = join $ managed $ \k -> do
    -- make a Broadcast channel
    (outB, refB, refF, inF) <- atomically $ do
        (outB', refB') <- broadcast
        (refF', inF') <- funnel
        return (outB', refB', refF', inF')
    let server = TCP.listen (TCP.Host h) p $ \(sock,_) ->
            forever $
            TCP.acceptFork sock $ \(sock',_) -> do
                inClient <- subscribe Unbounded refB
                outClient <- widen Unbounded refF
                aIn <- async $ runEffect $
                       for (fromInput inClient) (lift . TCP.send sock')
                aOut <- async $ runEffect $
                        TCP.fromSocket sock' 4096 >-> toOutput outClient
                void $ waitAnyCancel [aIn, aOut]
    let v = asSink (void . atomically . MVC.send outB)
        c = asInput inF
    withAsync server $ \_ -> k ((,) <$> pure v <*> (pure c <> keepOpenController))

keepOpenController :: Managed (Controller a)
keepOpenController = MVC.producer Single $ do
    lift $ threadDelay (365 * 24 * 60 * 60 * 10^6)
    return ()
rainbyte commented 4 years ago

Are there still any problem with this PR? Can it be merged after being rebased?

kvanbere commented 4 years ago

Sorry it’s been so long I don’t even remember.

rainbyte commented 4 years ago

I will inspect it soon. Thanks