informatikr / hedis

A Redis client library for Haskell.
http://hackage.haskell.org/package/hedis
BSD 3-Clause "New" or "Revised" License
329 stars 127 forks source link

PubSub function that can return a value #172

Open JivanRoquet opened 3 years ago

JivanRoquet commented 3 years ago

First of all, many thanks for the work done on Hedis.

I have one interrogation though. I'd make good use of a pubSub function that is able to return a received message.

An example of usage might be (kind of pseudo-code):

myEndpoint :: ScottyM ()
myEndpoint =
    post "/hello/world" $ do
        data :: MyData <- jsonData
        response <- runRedis redisConn $ do
            result <- pubSub (subscribe ["channel"]) $ \msg -> do
                unsubscribe ["channel"]
                pure msg
            pure result

        -- the idea is to be able to use the message being received from here
        json $ doSomethingWith response

To the best of my knowledge, such a scenario currently requires making use of IORef. Is there any conceptual limitation that prevents an implementation working as suggested above, or would it be feasible in theory?

k-bx commented 3 years ago

Hi! You are correct, something like MVar would be needed (or IORef if you're not sending values between threads). I believe this is by design.

May I suggest, however, an API that would be of a more natural fit for this if you intend to make a slow streaming HTTP response https://hackage.haskell.org/package/scotty-0.12/docs/Web-Scotty.html#v:stream

Another alternative is to use websockets or similar alternatives which are designed to keep the connection open and "push" events to client.

qnikst commented 1 year ago

Hello! At first let me thank you for the request. Unfortunately I'm not sure if we can provide API in the library that will fit generic case. There are way too many internal details that can be solved in different ways.

However let me suggest an alternative approach that allows to solve your problem while being more effective and you can have more control on what is going on (pseudo-code still):

myhandler :: TChan ByteString -> ByteString -> IO ()
myhandler ch msg = atomically $ writeTChan ch msg

onInitialComplete :: IO ()
onInitialComplete = putStrLn "Redis acknowledged that mychannel is now subscribed"

main :: IO ()
main = do
  conn <- connect defaultConnectInfo
  ch <- newBroadcastTChan
  pubSubCtrl <- newPubSubController [("mychannel", myhandler ch)] []
  concurrently ( forever $
      pubSubForever conn pubSubCtrl onInitialComplete
        `catch` (\(e :: SomeException) -> do
          putStrLn $ "Got error: " ++ show e
          threadDelay $ 50*1000) -- TODO: use exponential backoff
       ) $ scotty 8080 $ myEndpoint ch

myEndpoint :: TChan ByteString -> ScottyM ()
myEndpoint ch =
    post "/hello/world" $ do
        data :: MyData <- jsonData
        activeChan <- atomically $ cloneTChan ch
        response <- atomically $ readTChan activeChan
        json $ doSomethingWith response

what is happening here:

  1. We create a thread that reads all the values from the channel and sends them to a broadcast channel.
  2. If there are no readers of the channel, messages just disappear without using memory. If there are readers the message will be received by all of them.
  3. When endpoint is being called it "registers" a reader and receives next incoming message, then it unsubscribe when activeChan goes out of the scope.

Would this approach work for you?