lpeterse / haskell-socket

A Haskell binding to the POSIX sockets API
MIT License
47 stars 10 forks source link

unsafeSocketWaitRead consumes quite a lot of memory when used with timeout #27

Closed SX91 closed 7 years ago

SX91 commented 7 years ago

I've written some small test program to illustrate heap growing on unsafeSocketWaitRead with near 100% CPU load (https://github.com/SX91/haskell-socket-test). Run it with test-socket 300 (300 microseconds to make heap grow faster).

Heap profiling

Timeout (task cancellation) itself causes heap and CPU load increasingly grow.

lpeterse commented 7 years ago

Hi and thanks for the detailed bug description and the test case.

What operating system are you using? The waiting code is platform dependent. The Windows part loops several times introducing a delay with exponential back-off in each iteration. I might have messed up some tail recursion.

SX91 commented 7 years ago

Hi! Linux, Fedora 24 with latest updates. GHC 7.10.3 (LTS-6.25).

SX91 commented 7 years ago

Maybe you have to bracket unsafeRead/WriteSTM and execute the unregister IO action it comes with?

UPD: Replacing threadWaitReadSTM with threadWaitRead fixes the problem.

SX91 commented 7 years ago
threadWaitReadSTM :: Fd -> IO (Sync.STM (), IO ())
threadWaitReadSTM fd 
#ifndef mingw32_HOST_OS
  | threaded  = Event.threadWaitReadSTM fd
#endif
  | otherwise = do
      m <- Sync.newTVarIO False
      _ <- Sync.forkIO $ do
        threadWaitRead fd
        Sync.atomically $ Sync.writeTVar m True
      let waitAction = do b <- Sync.readTVar m
                          if b then return () else retry
      let killAction = return ()
      return (waitAction, killAction)

https://hackage.haskell.org/package/base-4.9.0.0/docs/src/GHC.Conc.IO.html#threadWaitReadSTM

Just wow! It creates the TVar, forks a new greenthread and defines killAction as return ()!? So that's the whole reason we have a leak.

lpeterse commented 7 years ago

Mhh, that is interesting.

It is noticeable that this is only a problem with the non-threaded runtime. Your fix contains a race condition I hoped to have circumvented in the original version (I'm not so sure anymore whether I succeeded for all combinations of platform and runtime environments): The interest in the file descriptor shall be registered while holding the lock on the socket while the waiting itself shall not hold the lock (withMVar, see below). The whole issue might also have something to do with issue #25. I'd like to properly understand and fix both together.

tryWaitRetryLoop :: Socket f t p -> (Fd -> Int-> IO (IO ())) -> (Fd -> Ptr CInt -> IO CInt) -> IO CInt
tryWaitRetryLoop (Socket mfd) getWaitAction action = loop 0
  where
    loop iteration = do
      ewr <- withMVar mfd $ \fd-> alloca $ \errPtr-> do
          when (fd < 0) (throwIO eBadFileDescriptor)
          i <- action fd errPtr
          if (i < 0) then do
            err <- SocketException <$> peek errPtr
            unless (err == eWouldBlock || err == eAgain) (throwIO err)
            Left <$> getWaitAction fd iteration  -- This call is non-blocking!
          else
            return (Right i)
      case ewr of
        Left  wait   -> do
          wait -- This call is blocking!
          loop $! iteration + 1
        Right result -> do
          return result
SX91 commented 7 years ago

That's pretty bad as you can't really fix unsafeSocketWait{Read,Write} without modifying the Base.

I consider threadWaitReadSTM implementation buggy. I can't see any reason for killAction to be return () instead of threadKill threadID.

lpeterse commented 7 years ago

Ah, it's leaking TVars (both with the threaded and non-threaded runtime):

Without -threaded: socket-leak

With -threaded: socket-leak-threaded

lpeterse commented 7 years ago

The clue leads here:

https://hackage.haskell.org/package/base-4.9.0.0/docs/src/GHC.Event.Thread.html#threadWaitSTM

lpeterse commented 7 years ago

You're right: My code ignores the second value returned by threadWaitReadSTM and it shouldn't.

The documentation says:

The second returned value is an IO action that can be used to deregister interest in the file descriptor.

It is rather an obligation than an option. The reason this didn't cause problems earlier is that as soon as the socket receives an event the registrations are released automatically as they are OneShot. Creating more and more registrations without any socket events happening (as in the test case) is rather unusual but nonetheless reveals a serious bug.

Still, the fallback implementation of threadWaitReadSTM (that is used with the single-threaded RTS) seems broken as you pointed out: It's leaking threads. Changing the killAction to threadKill threadID though might cause another undesired race. I'll think about this a little further and eventually file a bug report for the base library as soon as I'm sure I haven't overseen anything. UPD Bug has already been filed and fixed and the proposed threadKill is a valid solution (see https://ghc.haskell.org/trac/ghc/ticket/12852)

lpeterse commented 7 years ago

So, I got more familiar with GHCs internals and how the event manager interacts with the waiting mechanism. This is the implementation of threadWait which uses an MVar to actually block (it's a very neat idea indeed):

threadWait :: Event -> Fd -> IO ()
threadWait evt fd = mask_ $ do
  m <- newEmptyMVar
  mgr <- getSystemEventManager_
  reg <- registerFd mgr (\_ e -> putMVar m e) fd evt M.OneShot
  evt' <- takeMVar m `onException` unregisterFd_ mgr reg -- This one is blocking.
  if evt' `eventIs` evtClose
    then ioError $ errnoToIOError "threadWait" eBADF Nothing Nothing
    else return ()

The STM mechanism always seemed a bit like overkill to me, but it allowed the separation between registration and waiting. Now that it turns out to be broken, it might be worth to reconsider it all and eventually propose an extended threadWait implementation. I wonder how all the code around manages to not cause horrible race conditions using these primitives ...?! Will do some research on this.

lpeterse commented 7 years ago

Okay, here is how I think it should look like:

threadWait' :: Exception e => Event -> ((Fd -> IO a) -> IO a) -> e -> IO ()
threadWait' evt withFd e = do
  mgr  <- getSystemEventManager_
  mevt <- newMVar
  bracketOnError
    ( withFd $ \fd->
        registerFd mgr (\_ evt' -> putMVar mevt evt') fd evt M.OneShot
    )
    ( void . unregisterFd_ )
    ( const $ do
        evt' <- takeMVar mevt
        unless (evt' .&. evt /= 0) (throwIO e)
    )

threadWait :: Event -> Fd -> IO ()
threadWait evt fd = threadWait' evt (\f-> f fd) (errnoToIOError "threadWait" eBADF Nothing Nothing)
qnikst commented 7 years ago

@lpeterse, what are the possible race conditions in threadWait that you are talking about?

qnikst commented 7 years ago

Meanwhile threadWaitSTM now returns a correct killAction, but I see no single point in using STM approach and then just block, idea of the STM variant is that you will check condition in complex STM transaction, otherwise it just introduce unnecessary complexity and overheads for no reason.

lpeterse commented 7 years ago

The race condition I have in mind is not in threadWait itself, but occurs when using it to wait on a file descriptor that is in an MVar under the assumption that the user closes the descriptor before having closed all threads that are eventually still performing reads and writes.

Here is a solution using threadWait exemplifying the race. It reads the file descriptor from the MVar and then waits after releasing the lock:

exampleReceive :: MVar Fd -> IO ByteString
exampleReceive mfd = do
  fd <- readMVar mfd
  threadWaitRead fd     -- (1)
  withMVar mfd $ \fd-> unsafeReceive fd 4096  

My socket library locks the file descriptor for all operations, especially for closing. In the pseudo-code above in (1) the socket could have been closed in the meantime (that is not too bad as it would just cause an exception) but (extremely unlikely but not impossible) it could also already have been reassigned and designate a completely different socket.

Maybe the whole approach of allowing the user to close the socket without first terminating all depending threads is wrong, but on the other hand operations like closeFdWith imply this is a valid scenario.

lpeterse commented 7 years ago

Yesterday evening, I implemented the proposed threadWait that holds the lock for registration, but not for the waiting itself. This makes the STM mechanism obsolete. It works quite well, although some things needed for a clean solution (eventIs, evtClose etc) are not exported from the internal GHC module. Oh, and it does not work with the single threaded RTS, of course.

https://github.com/lpeterse/haskell-socket/blob/ea245b22e3d8f35cc33a8185d3280d3607e7389c/platform/linux/src/System/Socket/Internal/Platform.hsc#L18

qnikst commented 7 years ago

Possibly I'm oversimplifying a problem, so let me clarify. The problem appear in case if fd is closed after fd is read from mvar but before threadWaitRead is called, and threadWaitRead does not provide functionality to make that safe? While threadWaitReadSTM does, as one can call something like:

join $ withMVar mfd $ \fd -> do
     (wait, kill) <- threadWaitSTM fd
     return (fd, atomically wait `finally` kill) 

but threadWaitReadSTM have a bug at least until 8.0 that have a memory leak in case if data does not arrive.

As a result you need your own threadWaitRead (or change ghc's) that is similar to the ghc one, but allow to solve your problem?

UPD corrected code a bit.

lpeterse commented 7 years ago

Yes, that's essentially it.

I have put some thought into it, but I simply don't come up with a solution using the standard threadWaitRead. It either leads to the described race condition or I cannot release the lock until the wait terminates.

The STM solution with the finally kill does the trick, though, but I suspect the overhead is not really to justify.

lpeterse commented 7 years ago

For the record, just referencing the corresponding GHC ticket I was not yet aware of: https://ghc.haskell.org/trac/ghc/ticket/12852 and https://phabricator.haskell.org/D2729

Thanks to @qnikst

lpeterse commented 7 years ago

So here are the next TODOs:

SX91 commented 7 years ago

That's not easy. As you can see in Base's Conc.hs, it uses threaded foreign function that is local to the module, it uses some internal Base modules etc.

qnikst commented 7 years ago

As for threaded it's not a problem as it's a synonym to supportsBoundThreads that is exported from Control.Concurrent, also same FFI call can be constructed:

libraries/base/GHC/Event/Thread.hs
foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool

isEvent looks a bit more complex.

lpeterse commented 7 years ago

I think it could be possible to derive a standalone instance of Enum to recover the contained Int value. Beautiful is something different...

@qnikst Do you think I should propose the modified threadWaitX for inclusion in base or is this a too specific use case and the STM version would be considered appropriate enough?

P.S. I'm currently working on the first TODO to get at least the multi-threaded RTS case fixed (Windows is not affected at all as it uses another mechanism anyway).

qnikst commented 7 years ago

I think standard threadWaitRead can work, at a cost of some overhead, a bit simplified version as one have to carefully think abut async exceptions, that may arrive here and there:

threadWaitRead' :: MVar Fd -> IO ()
threadWaitRead mfd = mask_ $ do
   box <- newEmpty
   withMVar_ mfd $ \fd -> do
      forkIO $  try (threadWaitRead fd) >>= putMVar box
      return fd
   either throwIO return =<< takeMVar box

in this case we should not have race condition between events.

@lpeterse I'm not sure I'm the best person for that question, I'm not familiar enough with internals. It would be better to consult on freenode:#ghc about the question.

qnikst commented 7 years ago

hm.. seems I'm not correct, we should not exit withMVar_ until threadWaitRead was actually called, but seems it's not a case here. But threadWaitReadSTM is affected by the same issue anyway, so possibly your proposal is the only stable way forward. I'll think a bit more about this.

lpeterse commented 7 years ago

Thanks so far. Any further ideas are always welcome :-)

lpeterse commented 7 years ago

I think I fixed it for both the single-threaded and the multi-threaded runtime. The test-socket program does no longer leak memory on my machine. I put up a 0.8.x.x branch for validation and testing.

Feel free to review and find bugs/races in the following relevant part:

waitRead :: Socket f t p -> Int -> IO ()
waitRead s _ = wait s threadWaitRead threadWaitReadSTM

waitWrite :: Socket f t p -> Int -> IO ()
waitWrite s _ = wait s threadWaitWrite threadWaitWriteSTM

wait :: Socket f t p -> (Fd -> IO ()) -> (Fd -> IO (STM (), IO ())) -> IO ()
wait (Socket mfd) threadWait threadWaitSTM
  | rtsSupportsBoundThreads = mapException
    ( const eBadFileDescriptor :: IOError -> SocketException )
    ( bracketOnError
        ( withMVar mfd $ \fd -> do
            when (fd < 0) (throwIO eBadFileDescriptor)
            threadWaitSTM fd
        ) snd ( atomically . fst )
    )
  | otherwise = do
      m <- newEmptyMVar
      bracketOnError
        ( withMVar mfd $ \fd-> do
            when (fd < 0) (throwIO eBadFileDescriptor)
            forkIO $ catch
              ( threadWait fd >> putMVar m True )
              ( \(SomeException _)-> putMVar m False )
        ) killThread
        ( const $ takeMVar m >>= flip unless (throwIO eBadFileDescriptor) )
lpeterse commented 7 years ago

I'll close this as fixed. It will be part of the 0.8.0.0 release.