Closed michalrus closed 11 months ago
The extracted code (with ClassyPrelude as implicit Prelude):
import qualified Control.Exception as Unsafe
import qualified Data.Aeson as J
import GHC.Conc (myThreadId)
import qualified Network.Wai as W
import qualified Network.Wai.Handler.WebSockets as W
import qualified Network.WebSockets as W
import qualified Servant.Server.Internal.ServantErr as W (responseServantErr)
data KillWaiThread =
KillWaiThread
deriving (Show)
instance Exception KillWaiThread
-- |The whole point of this module is to have application-level pings for greater control in application. I.e. to solve <https://github.com/jaspervdj/websockets/issues/159>.
pingingWsApp :: (W.Connection -> IO ()) -> W.Application
pingingWsApp action __unused1 __unused2 = do
onTcpPong <- newTVarIO (pure ())
W.websocketsOr
W.defaultConnectionOptions
{ W.connectionOnPong = join (readTVarIO onTcpPong)
}
(wsApp onTcpPong)
backup
__unused1
__unused2
where
backup _ respond' = respond' . W.responseServantErr $ toServantErr NotAWebSocket
-----------------------------------------------
wsApp onTcpPong pendingConn =
ignoreKillWaiThread . ignoreClosedTCP $ do
conn <- W.acceptRequest pendingConn
waiThreadId <- myThreadId
pingTimeoutKiller :: TVar (Maybe (Async ())) <- newTVarIO Nothing
flip finally (traverse_ cancel =<< readTVarIO pingTimeoutKiller) $ do
atomically . writeTVar onTcpPong $ do
newKiller <-
async $ do
threadDelay' (2 * connectionTimeout)
sendDisconnect conn ("Ping timeout." :: Text)
Unsafe.throwTo waiThreadId KillWaiThread -- this is ugly, but what else can we do w/ the API provided by 'Network.WebSockets'? / Unsafe.throw, because we don’t want it wrapped — we’ll catch it later in 'ignoreKillWaiThread'.
previousKiller <- atomically $ swapTVar pingTimeoutKiller (Just newKiller)
cancel `traverse_` previousKiller
W.forkPingThread conn (round connectionTimeout)
action conn
-----------------------------------------------
ignoreClosedTCP = handle (\(_ :: W.ConnectionException) -> pure ()) -- don’t log them, no point
-----------------------------------------------
ignoreKillWaiThread = Unsafe.handle (\(_ :: KillWaiThread) -> pure ()) -- don’t log them, no point
-----------------------------------------------
sendDisconnect conn why = do
sendJson conn . J.object $ ["tag" J..= ("Error" :: Text), "description" J..= why]
W.sendClose conn ("{}" :: ByteString)
When adapting ↑, it’s important to get masked and unmasked exceptions right here…
Also because of the API of connectionOnPong
, we can’t easily reuse that code for client connections. Hmm.
@michalrus thanks for the code above! Do you happen to know if any progress has been made since this was posted, or is your solution still the best way to do this?
It looks solid -- I agree that the API is not ideal, and I'm happy to change parts of it if there's a way that offers existing users a clean migration path. I think it would be a bit cleaner to have a single killer thread that shares an reference to a timeout, and every pong that is received prolongs this timeout -- that way it's not necessary to cancel and restart the killer threads. That is the way it's done in the snap backend of websockets, but of course snap also provides a different API than warp.
@jaspervdj thanks!
Question, is there some reason why the ping thread itself doesn't wait for pongs and cause the connection be closed when a pong doesn't return for a long time? This seems like a natural job for the ping thread (in addition to keeping the connection alive through proxies and stuff).
That was my expectation anyway for how the ping thread would behave--it was an unpleasant discovery that this library doesn't actually have a mechanism to detect disconnects by itself and that you have to build something crazy out of TVars like the above. I think it would cause less astonishment for users to let the ping thread do this out of the box (with an option to turn it on and off).
I'm currently working on another way to manage pongs using a Chan
and a single thread, I'll post again if I have any other ideas about the API...
Okay, I have a couple thoughts about the current API (particularly when writing a server):
1) It seems that connectionOnPong
is actually a global setting--the same action is invoked in response to a pong for every connection (at least when you run the server using something like runServerWith
or websocketsOr
from WAI). This kind of makes it a non-starter to track ping timeouts on a per-connection basis using this callback.
2) In order to work around this, I am forced to stop using receiveDataMessage
and instead directly use receive
so that I can receive the control messages myself. However, I can't quite replicate the behavior of receiveDataMessage
because it accesses the connectionSentClose
field of Connection
, which is not publicly exposed; see here. So, I have to make a fork of this repo and expose that constructor in order to do this. I think in this case receiveDataMessage
is aiming for convenience but is actually kind of doing users a disservice because it prevents us from seeing the control messages.
Suggestion: what if you provide a way to set the connectionOnPong
callback on a per-connection basis, perhaps as part of the acceptRequest
function?
Hmm, this is a bit surprising to me. connectionOnPong
should absolutely be a per-connection thing, if not is quite useless, as you pointed out. I don't use/maintain the WAI backend of websockets in any large projects myself; and it seems like this could be an misunderstanding that arose when that library was written.
Once that is fixed, I think it should be doable to build an application-level ping timeout on top of withPingThread
that was added recently, that users can easily wrap their application in -- and I'd be happy merging that into either this library or helping out with getting it into the WAI backend of websockets (depending on what parts it uses).
Maybe I'm misunderstanding, but the runServerWith
function provided by this library seems to have the same problem -- it accepts a ConnectionOptions
argument that seems to get used in every connection; see here. It seems like the websocketsOr
function from the WAI integration is following the lead of this function?
FWIW, having just been through the process of writing my own ping/pong system, I don't think withPingThread
is quite the right building block to use. I think the ping thread needs to internally understand what a pong is so it can notice when one takes too long to come. In case it helps, here's a sketch of how I did it:
runApp pending = do
conn <- WS.acceptRequest pending
withPinger conn $ \pongChan -> do
-- Run main application loop
forever $ WS.receive conn >>= \case
WS.ControlMessage (WS.Pong _) -> writeChan pongChan ()
-- ... other cases, including DataMessage
withPinger conn action = do
pongChan <- newChan
mainAsync <- async $ action pongChan
pingerAsync <- async $ runPinger conn pongChan
waitEitherCatch mainAsync pingerAsync >>= \case
-- If the application async died for any reason, kill the pinger async
Left _ -> cancel pingerAsync
-- The pinger thread should never throw an exception. If it does, kill the app thread
Right (Left err) -> cancel mainAsync
-- The pinger thread exited due to a pong timeout. Tell the app thread about it.
Right (Right ()) -> cancelWith mainAsync PongTimeout
runPinger conn pongChan = fix $ \loop -> do
WS.sendPing conn (mempty :: B.ByteString)
threadDelay pingWaitTime
-- See if we got a pong in that time
timeout 1000000 (readChan pongChan) >>= \case
Just () -> loop
Nothing -> return ()
@thomasjm That makes sense. I think runServerWith
needs to follow the lead of websockets-snap, basically cloning the options & updating connectionOnPong
when a new connection comes in. I think we can use withPingThread
in combination with a killer for that, but I'd need to have a stab at it to be sure.
@jaspervdj Please let me know if there are specific parts of this that my team could help with. We're using the library pretty heavily in production and have run into quite a few issues with connection state. (Possible we could sponsor some of the work if that would help.)
FWIW we're just using runServer
, we tried using wai-websockets but couldn't figure out how to reasonably disable the slowloris timeout. Haven't tried websockets-snap (yet).
Hi @nbouscal -- sorry for dragging this out. I am currently vacationing in Mexico so I don't have time to look at this properly, but I did have some downtime on a bus this morning, so I put together #199 with a draft of what I think I want it to look like. Note that because of the circumstances, I have only checked that this compiles; I haven't tried to run it at all. Could you help me out by reviewing and sense-checking that PR? Thanks for your patience!
No worries, hope you’re enjoying your vacation! I’ll take a look this weekend. Thanks!
There's a similar implementation out there at https://github.com/digitallyinduced/ihp/blob/dbb4ec64fe7b460fd80041e0b7a2867d90529d78/IHP/WebSocket.hs#L124
I've written an implementation that reuses withPingThread
: https://github.com/cachix/cachix/pull/414
Looking at the current implementation of ping pong handling, I suggest the following changes:
requireServerPong
: It should also install withPingThread
and be enabled by default.I've found a way to implement ping-pong generically for any connection and it also simplifies the code!
Please take a look at https://github.com/jaspervdj/websockets/pull/239
I have a working solution with the current API, but it feels sooooo hacky. :smiley_cat:
connectionOnPong
doesn’t get itsConnection
(why?), we’re bindingonPong <- newTVarIO (pure ())
.connectionOnPong
, we’re passingjoin (readTVarIO onPong)
.Connection
, we’re putting an action into thisonPong
, which:Async
, which:threadDelay
s(2 * tcpTimeout)
.throwTo waiThread PingTimeout
(this is awful).Async
.finally
, that cancels thatAsync
, as well, so that after clean disconnection we don’t run the code meant for ping timeout.This way, on average after
1.5 * tcpTimeout
(*), a dropped connection is detected and killed, and the whole system can learn about it, run appropriate clean up code etc.() Even if we were scheduling this killer
Async
right after sending a Ping (which seems more in line with the naming (ping timeout), it’d still happen after `1.5 tcpTimeout, because on average, the Ping message would be sent
0.5 * tcpTimeout` after the connection got dropped.When we don’t do that, dropping the connection on
iptables
does absolutely nothing, it just stays there as open, even after Warp’ssetTimeout
timeout passes! I don’t understand why, though.How to do this properly? Because my way surely doesn’t feel appropriate (although it works well).