jberryman / unagi-chan

A haskell library implementing fast and scalable concurrent queues for x86, with a Chan-like API
BSD 3-Clause "New" or "Revised" License
127 stars 15 forks source link

Bounded Queue with lossy Readers #19

Open mfine opened 7 years ago

mfine commented 7 years ago

Currently when the Bounded Queue is full, writes block - is it possible to introduce behavior that lets writes complete successfully when the queue is full? Our use case is a bounded queue of ephemeral, timely data - we do not want to throttle the writer and prefer to have readers that lose messages.

I have not dug into the internals of the library, but noticed some of this kind of behavior with the dupChan:

slower readers of duplicated OutChan may fall arbitrarily behind.

is it possible to bring this behavior in for even a single reader?

I asked about this issue on haskell-cafe, and we ended up with an implementation following the suggested strategy. But would still love to move off our homegrown stuff and on to something hopefully more performant. As mentioned on the email thread, Pipes.Concurrent kind of has this with the newest, though I was unsure of the implementation approach:

        Newest n  -> do
            q <- S.newTBQueueIO n
            let write x = S.writeTBQueue q x <|> (S.tryReadTBQueue q *> write x)
            return (write, S.readTBQueue q)

(at the time, we wanted to be able to have multiple readers from our channels).

I'm happy to help out with the implementation. Thanks!

jberryman commented 7 years ago

Cool, I think I understand. It's tempting to try to organize this around a pre-allocated ring buffer:

As usual you need to take into account counter wrap-around (e.g. lap probably must never wrap around), and things can be made to go a little faster if you don't need multiple or blocking readers.

That's one approach I can see at least. Feel free to dig into the code, if you have time. I'm happy to try to help with advice or with understanding the code, but probably won't be able to work on this much myself in the near future.

Thanks for reporting!

mfine commented 7 years ago

Sounds great and thanks for the response! I had played around with a sliding buffer before, and think it makes a lot of sense. If a reader is keeping track of its lap and index into the buffer, it seems like supporting multiple readers would come for free. There could also be a couple of options for lapped readers:

Would have to ensure all the counters stay within bounds and respect the constraints.

Excited about this functionality - snowed in with work at the moment, but looking forward to working on it.

jberryman commented 7 years ago

Haven't thought about your description, but I realized my hand-wavy description was inadequate: so if the writer laps the reader, and then shortl after the reader laps the writer, then of course we're no longer FIFO (we've just read new then old). So the reader would need to look at the lap and, if it was decreasing, CAS the read counter to reset it to index 0 or something gross like that. That means effectively dropping nondeterministically between 0 and all existing elements in the queue. (actually I think that's your first bullet above)

I guess my advice would be unless you can get some nicer properties (like not needing to allocate new arrays), just try to implement what you need by adding a new function that conditionally atomically increments the read counter by n on a write (effectively dropping n elements).

mfine commented 7 years ago

Nice timing - I made a prototype of this idea last night with TArray's:

{-# LANGUAGE NoImplicitPrelude #-}

-- | Circular STM buffer for bounded channels.
--
module TBuf
  ( TBuf
  , newTBuf
  , dupTBuf
  , writeTBuf
  , readTBuf
  , sinkTBuf
  , sourceTBuf
  ) where

import           Control.Concurrent.STM
import           Data.Array.MArray
import           Data.Conduit
import qualified Data.Conduit.List       as CL

-- | Internal data type for item in buffer, including current epoch.
--
data TRec a = TNil | TRec Word a

-- | Opaque data type containing size of buffer, current index and epoch, and array.
--
data TBuf a = TBuf Word
              {-# UNPACK #-} !(TVar Word)
              {-# UNPACK #-} !(TVar Word)
              {-# UNPACK #-} !(TArray Word (TRec a))

-- | Create a new buffer of size n.
--
newTBuf :: Word -> STM (TBuf a)
newTBuf n = do
  ti <- newTVar 0
  te <- newTVar 0
  ta <- newArray (0, n - 1) TNil
  return (TBuf n ti te ta)

-- | Duplicate a buffer for reading from.
--
dupTBuf :: TBuf a -> STM (TBuf a)
dupTBuf (TBuf n ti te ta) = do
  i <- readTVar ti
  e <- readTVar te
  ti' <- newTVar i
  te' <- newTVar e
  return (TBuf n ti' te' ta)

-- | Write to buffer, lapping if necessary.
--
writeTBuf :: TBuf a -> a -> STM ()
writeTBuf (TBuf n ti te ta) d = do
  i <- readTVar ti
  e <- readTVar te
  -- write the data and increment the index and epoch if needed.
  writeArray ta i (TRec e d)
  writeTVar ti ((i + 1) `mod` n)
  when (i + 1 >= n) $
    writeTVar te (e + 1)

-- | Read from buffer, getting lapped if necessary.
--
readTBuf :: TBuf a -> STM a
readTBuf (TBuf n ti te ta) = do
  i <- readTVar ti
  e <- readTVar te
  loop i e
  where
    loop i e = do
      r <- readArray ta i
      case r of
        TNil ->
          retry
        TRec e' d
          -- the epoch ahead is lower - retry until it's not
          -- (this will not handle epoch wrap-around)
          | e > e' ->
            retry
          -- the epoch ahead is great - we've been lapped.
          -- reset and move on to the new lap from the beginning.
          | e < e' -> do
            writeTVar ti 0
            writeTVar te e'
            loop 0 e'
          -- the epoch is the same - we're on the same lap.
          -- return the data and increment the index and
          -- epoch if needed.
          | otherwise -> do
            writeTVar ti ((i + 1) `mod` n)
            when (i + 1 >= n) $
              writeTVar te (e + 1)
            return d

-- | Conduit sink for writing to TBuf.
--
sinkTBuf :: MonadIO m => TBuf a -> Sink a m ()
sinkTBuf buf = CL.mapM_ $ liftIO . atomically . writeTBuf buf

-- | Confuit source for reading from TBuf.
--
sourceTBuf :: MonadIO m => TBuf a -> Source m a
sourceTBuf buf = forever $ liftIO (atomically $ readTBuf buf) >>= yield

I put the writer's current lap when it writes in the entries to make for less contention between the reader and writer. The dropping of the elements ended when the reader moved down to zero did end up being gross! My application had a lot of problems with the sporadic large drops