Closed hyperthunk closed 7 years ago
As per github's clever links, there is another use case for this in https://github.com/haskell-distributed/distributed-process-supervisor/issues/8.
@teh I think the issues being discussed at the bottom of https://github.com/haskell-distributed/distributed-process-supervisor/issues/8 are quite similar to your asks about interactions between warp handlers and managed processes in this library's API. The ability to write this seems pressing to me:
demo :: STM.TChan InputType -> Process (ProcessDefinition State)
demo broadcastChan = do
rc <- liftIO $ dupTChan broadcastChan
defaultProcess = [
-- other things elided
infoHandlers = [ handleExternal (readTChan rc) doStuff ]
]
Better still, would be the ability to wrap API handlers around an STM channel, such that we can write something like apiHandlers = [ handleCast (handleExtern (doStmStuff)) myHandler ]
. This should, theoretically, be possible, since the various handleX
functions already deal with abstractions around typed channels, and the matchSTM
primitive has been available in distributed-process since around v0.5, allowing us to multiplex on matching arbitrary STM actions alongside typed channels and regular inputs to our mailbox.
@teh take a look at https://github.com/haskell-distributed/distributed-process-client-server/tree/tw/h-ext and let me know if that would meet your needs please? I think the 2-way STM stuff is totally concrete and the implementation is extremely simple (and based on idioms we use all over the place for handling typed channels).
Specifically, this test case exemplifies:
testExternalService :: TestResult Bool -> Process ()
testExternalService result = do
inChan <- liftIO $ newTQueueIO
replyChan <- liftIO $ newTQueueIO
let procDef = statelessProcess {
apiHandlers = [
handleExternal
(readTQueue inChan)
(\s (m :: String) -> do
liftIO $ atomically $ writeTQueue replyChan m
continue s)
]
}
let txt = "hello 2-way stm foo"
pid <- spawnLocal $ serve () (statelessInit Infinity) procDef
echoTxt <- liftIO $ do
-- firstly we write something that the server can receive
atomically $ writeTQueue inChan txt
-- then sit and wait for it to write something back to us
atomically $ readTQueue replyChan
stash result (echoTxt == txt)
kill pid "done"
@hyperthunk thanks for keeping at it! (I guess you mean https://github.com/haskell-distributed/distributed-process-client-server/commit/e20135fb6a2ae4dd283fa341315990ae1323b088 specifically?)
This isn't quite what I am doing but pretty close!
liftIO
I provide an IO callback that then runs a process. Though that may just be your example. I could pass in inChan
and replyChan
?Process
to handle this I am limiting throughput. I can probably spawn a pool of processes and use work-stealing, but at that point I am again re-inventing something that the IO scheduler does quite well already.The boundary between callback-IO and CH is tricky, and the handleExternal
covers a lot of use cases. I will try to rewrite my code with them.
@teh, yeah I get that your usage is a bit different. Here are some thoughts...
In terms of (1/2), it's totally up to you how you structure things. I would not use a single channel to go back to the warp handler either, since that would also be a potential bottleneck.
For (3), my advise is this: do not fight warp, and do not fight the runtime. The point about this feature in client-server, is that it allows you to read a value back in IO
land using STM (and whichever flavour of STM you like to, it's not limited to TChan
or TQueue
at all). So currently you spawn a process for each request (or more accurately from what you've said, you spawn a process onto either each green thread or each capability/native thread. That's fine - you can keep doing that, just use the handleExternal
capability to simplify your code if you want to read the results back without using runProcess
. Notably handleExternal
doesn't provide the monitoring infrastructure you're used to for error detection though, so what I would suggest is that you consider using it to implement a streaming input source.
I'll post a picture in a few mins..
Okay @teh having spent some time poking around in warp, I think the best thing for what you're doing is to simply fork a new process and wait on the result using stm, and tbh you could just use a TMVar
and write the result of call
to that. If you want to use handleExternal
so that the server writes the reply directly, you can - using the branch I'm merging shortly - but I'm not sure how useful that really is. What would be better would be a means to issue call
and have it wait on the STM result and monitor the server. I can see a way to do that, and will implement it this afternoon and send an update.
Thanks! That's the conclusion I came to as well, and that's what my code does. Sorry if I wasn't clearer about that. As long as the request is small and blocking it's pretty simple. But I'm am looking forward to handleExternal
for stuff like websockets. It's a very natural fit. Thanks for adding it!
@teh okay, I've just pushed an improved mechanism for doing synchronous round trips - it'll simplify the server code (making it easier to follow, more transferable during potential refactoring, etc) and allows you to use whatever STM constructs you like to synchronise client and server.
Here's the example code from the tests... I'm going to break it down (and we can steal the markup from this comment to set up a doc page around it later on the wiki/website...)
data StmServer = StmServer { serverPid :: ProcessId
, writerChan :: TQueue String
, readerChan :: TQueue String
}
We start out by defining a server handle, which is good practise for -cilent-server apps as per the docs/tutorials. We will use this to interact with the server process. Since we want to resolve it to a process (for monitoring) and in our test case, to kill it (once we're done), we add the relevant instances from -extras to make that easy:
instance Resolvable StmServer where
resolve = return . Just . serverPid
instance Killable StmServer where
killProc StmServer{..} = kill serverPid
exitProc StmServer{..} = exit serverPid
The client part of the interaction uses a new function exposed through the Client
module, callSTM
, which takes an arbitrary STM
action for writing and another one for reading, and then executes them both, whilst monitoring the server to ensure we get a failure message if it crashes (so we don't block indefinitely). Currently we can't read and then write atomically
(and I'm not sure we want to here), but there is a way to expose that if we want to.
The callSTM
implementation is very simple, and relies on awaitResponse
from -extras, which does the relevant monitoring for us...
callSTM :: forall s a b . (Addressable s)
=> s
-> (a -> STM ())
-> STM b
-> a
-> Process (Either ExitReason b)
callSTM server writeAction readAction input = do
liftIO $ atomically $ writeAction input
awaitResponse server [ matchSTM readAction (return . Right) ]
Back to our code then, we implement the client side of our API using this function, and use the handle to (a) ensure we have the relevant STM
data available to us, and (b) ensure nobody accidentally passes an invalid ProcessId
or some such:
echoStm :: StmServer -> String -> Process (Either ExitReason String)
echoStm StmServer{..} = callSTM serverPid
(writeTQueue writerChan)
(readTQueue readerChan)
Now for our server implementation. We create the STM
actions, which as you can see from the client code involves two TQueue
s, one for writing requests and a second for replies. You could easily replace these with TChan
or TMVar
if you wished, though I'd be cautious about using blocking cells if I were you. Anyway, the client and server APIs simply deal with STM a
and don't regulate this at all.
Given our input and output channels, we wire them into the server using the new handleCallExternal
API, which works very much like handleCall
except that it takes two STM
actions, one for reading and another for writing back the replies. Since these are expressed as STM a
(roughly speaking), you can do whatever you like just as with the client portion of the code. This is where wrapping up your server capability into an isolated module and exposing it only via a handle becomes important. Later on, when we start looking at Task
and other APIs, we will build on (and build new) capabilities that abstract this kind of detail away from the application developer.
Here's our server code now:
launchEchoServer :: Process StmServer
launchEchoServer = do
(inQ, replyQ) <- liftIO $ do
cIn <- newTQueueIO
cOut <- newTQueueIO
return (cIn, cOut)
let procDef = statelessProcess {
apiHandlers = [
handleCallExternal
(readTQueue inQ)
(writeTQueue replyQ)
(\st (msg :: String) -> reply msg st)
]
}
pid <- spawnLocal $ serve () (statelessInit Infinity) procDef
return $ StmServer pid inQ replyQ
Those STM
implementation details don't escape the lexical scope of the launchEchoServer
function, which I feel is important here, to minimise leaking information that API consumers shouldn't have to care about.
Finally, the test case, which simply launches the server, calls it synchronously, and puts the reply/response into our result:
testExternalCall :: TestResult Bool -> Process ()
testExternalCall result = do
let txt = "hello stm-call foo"
srv <- launchEchoServer
echoStm srv txt >>= stash result . (== Right txt)
killProc srv "done"
So, there you have it. I'll try and get this merged soon. I think callSTM
is a neat way to encapsulate synchronous communication between non-CH clients and CH servers. For non-synchronised, and generally more nuanced cases, handleExternal
should be general enough to support almost all other use-cases.
Oh, and I should point out that because the handleCallExternal
API uses CallHandler s a b
as it's handling type, you can also handle inputs with noReply state
, however you'd need to stash the writer channel in your State
when initialising the server and use that to write back later on. Doing that, would provide excellent concurrency, since you can receive the HTTP request, fire off an asynchronous message/query to the underlying resource, then go back to listening, and when the resource-reply is sent back to your server, write back to the STM reply queue. I wouldn't want a single server managing all warp requests like that - to do so would be to fight warp's threading model, which I don't think is a good idea - however even when handling a single request in this way, you immediately give up your slice of the CPU for other threads/requests to proceed, which is likely to yield good results.
A key thing you'll want to watch out for with all of this though, is that if you're sending remote calls/messages then serialisation can incur quite a heavy cost. If you're communicating with other local processes, I strongly suggest using the Unsafe
versions of call (providing you're sure you don't have any unevaluated thunks in the data you're passing), since these are orders of magnitude faster.
You are productive! :)
callSTM
is a nice primitive. It also helps with cases where e.g. the http IO handler times out and gets killed with an async exception I think?
@teh, well it might do... The underlying call
mechanism isn't quite the same as the one used for ordinary messaging. It is implemented like so:
awaitResponse :: Addressable a
=> a
-> [Match (Either ExitReason b)]
-> Process (Either ExitReason b)
awaitResponse addr matches = do
mPid <- resolve addr
case mPid of
Nothing -> return $ Left $ ExitOther "UnresolvedAddress"
Just p -> do
mRef <- P.monitor p
receiveWait ((matchRef mRef):matches)
where
matchRef :: MonitorRef -> Match (Either ExitReason b)
matchRef r = matchIf (\(ProcessMonitorNotification r' _ _) -> r == r')
(\(ProcessMonitorNotification _ _ d) -> do
return (Left (ExitOther (show d))))
Now receiveWait
does nothing about handling asynchronous exceptions, however if the server is bound to the creating/parent thread then I think what you actually want is to ensure it is killed if that thread exits. This is rather nuanced, since managed processes are not designed to be long or short lived, but they aren't a natural fit for a single shot query+response either.
In many ways, my suspicion for what you're doing is that you might be better off just using distributed-process-async. If the server process needs to maintain some state - e.g., a database connection, OS resources, etc - then it makes sense to model it as a managed process. If you just want to proxy other cloud haskell processes that are doing more complex things and are addressable by name etc, then a stateless managed process will do the trick, but there is a risk of accidental complexity when dealing with cleanup operations.
The usual way to clean up once a managed process is shutting down, is to install a shutdown handler. This will run before the process exits, and can be used to ensure any acquired resources are released properly. Since you cannot link your warp thread to the managed process (which is running in CH land) easily, a sensible approach would probably be to start the managed process with a Timeout
value matching that of the warp http timeout value +/- 10%. That way, if the warp thread that spawned the managed process dies before it can ask the managed process server to stop, the server will automatically time itself out and cleanup before it goes.
Something I keep meaning to ask you...
Oh and I should point out that cleanup is one of the reasons why you'd normally arrange your process hierarchy in a supervision tree, such that when a branch shuts down, all the server processes perform their cleanup properly before restarting (or shutting down the supervisor, depending on the strategy chosen).
Yes to both questions 1 and 2 (in my current code which is essentially still the one in https://github.com/haskell-distributed/distributed-process/issues/306#issuecomment-277825991 ).
Cleanup is an interesting question in general, though AFAICT I don't have anything like open file handlers that needs cleaning up directly.
I do need to make sure that an async exception kills the Process
I am spawning per request as well. If I have a pool of handlers (+ a supervisor) then this issue doesn't arise. These are all questions for me though. I'll write it up when I find out what works best.
Closing this issue, as the handleExternal
stuff has now been merged and will be released shortly.
So we can compose STM actions inside a managed process definition, allow clients and servers that reside on the same node to communicate over STM channels, queues, etc.