informatikr / hedis

A Redis client library for Haskell.
http://hackage.haskell.org/package/hedis
BSD 3-Clause "New" or "Revised" License
329 stars 127 forks source link

Pubsub and connection handling #39

Open RamotionRussell opened 8 years ago

RamotionRussell commented 8 years ago

I'm having issues with pubsub -- I can subscribe to channels and receive data just fine for short-term intervals, but after a while, stuff seems to just... stop arriving. Are subscriptions properly handled in the connection pool? Because it seems like the subscribe command is just executed on a connection in the pool and forgotten, and if that connection is lost and replaced with another in the pool, the subscription is also lost.

k-bx commented 8 years ago

@RamotionRussell would be helpful if you'd share reproducible piece of code which we can discuss

levinotik commented 8 years ago

I'm looking into this issue as well. Still digging around as the issues may very well be in our application and not hedis, but I do have situations where subs seem to stop working. Here's the relevant code:

lookupOrCreateChannel :: Connection -> Server -> ChannelSlug -> IO Channel
lookupOrCreateChannel conn server@Server{..} name = do
      channel <- atomically $ lookupChannel server name
      let pubSubChan = chanId2Bs name
      case channel of
        Nothing -> do
          newServerChan <- atomically $ do
              chan <- newChannel name
              modifyTVar serverChannels . M.insert name $ chan
              return chan
          void $ forkIO $ runRedis conn (pubSub (subscribe [pubSubChan]) (messageCallback server))
          return newServerChan
        Just chan -> return chan

messageCallback :: Server -> Redis.Message -> IO PubSub
messageCallback server@Server{..} msg = 
  atomically $ do
    channel <- lookupChannel server (ChannelSlug $ decodeUtf8 $ msgChannel msg)
    case channel of
      Just chan -> do
        let broadcastMsg = Aeson.eitherDecode (LC8.fromStrict $ msgMessage msg) :: Either String RtmEvent
        case broadcastMsg of
          Right bMsg -> writeTChan (channelBroadcastChan chan) bMsg
          Left err   -> return ()
      Nothing -> return ()
    return mempty
qrilka commented 8 years ago

@RamotionRussell @levinotik could you give probably some more comments on this issue? I was trying to find possible problems with it and the only one which looks to be a possible reason is half-open sockets. And to prevent those we need either use PING from time to time or SO_KEEPALIVE socket option.

qnikst commented 1 year ago

Hello, were there any updates for this situation?

One possible problem that we have faced was when we used a proxy to support sentinel protocol for us. We used a haproxy and it has it's own connection timeout, and as it does not know that connection is in pubSub mode (Redis knows that and to not break connection on timeout in that case) it happily disconnects a client connection.