haskell-distributed / distributed-process

Cloud Haskell core libraries
http://haskell-distributed.github.io
710 stars 95 forks source link

Live Server Upgrades #106

Open hyperthunk opened 11 years ago

hyperthunk commented 11 years ago

So... Cloud Haskell isn't likely to support the same kind of hot code upgrade as Erlang any time soon, and for good reason. That model simply doesn't make sense for a strongly typed language like Haskell, and there's nothing in the RTS to support it either - one of our aims is to avoid changing the RTS and to keep CH as a library.

After looking at http://hackage.haskell.org/packages/archive/plugins/1.5.3.0/doc/html/System-Plugins-Load.html, I can see that it's possible to dynamically load new modules, but there's really nothing in that approach that fits the concept of upgrading an existing (i.e., already loaded) module cleanly.

I do however, think that we can approximate many of the benefits of Erlang's rolling upgrades without actually doing any runtime code changes. At a very high level, my idea is that we provide a mechanism for seamlessly migrating process state from one node to another, and enable many of the benefits of rolling upgrades by transparently moving managed processes from one node to another, where the latter node is running a different image to the former/original node.

First let's look at the mechanical aspects of this, just to see if it's plausible. Then we'll consider the pros and cons. From an implementation standpoint, I think we can achieve this via several steps:

  1. define an API for processes to manage their intrinsic ongoing state
  2. make it possible to migrate Serializable data to a node running another binary/image
  3. provide an API for processes to handle data transformations when migrating to the upgraded node
  4. figure out how to synchronise all of this properly

Let's talk about (1) for now. Firstly, it's important to realise that for (1) and (2) I'm not proposing that the v1 node sends a Closure (Process a) to the v2 node. That clearly wouldn't work, because the nodes are not running the same image and the closure environment (viz function pointers/free variables) won't be the same on the destination node! Instead, what I'm proposing is that certain kinds of special process which are willing to manage their ongoing state in a specific way, could be allowed to relocate that state to another node providing that it is Serializable. How this state is decoded on the other side is a combination of built in Cloud Haskell support and user defined transformations. We'll cover the latter part shortly(ish).

Consider, for example, the generic process implementation that we've started working on for -platform. In that module, we use the state monad to track the server state which is Serializable and make this available to the user defined implementation code. The GenServer module does something similar.

type Process s = ST.StateT s BaseProcess.Process

-- [snip for brevity]

getState :: Process s s
getState = ST.get

putState :: s -> Process s ()
putState = ST.put

modifyState :: (s -> s) -> Process s ()
modifyState = ST.modify

-- [snip for brevity]

loop :: Behaviour s -> Timeout -> Process s TerminateReason
loop s t = do
    s' <- processReceive (dispatchers s) t
    nextAction s s'
    where nextAction :: Behaviour s -> ProcessAction ->
                            Process s TerminateReason
          nextAction b ProcessContinue = loop b t
          nextAction b (ProcessTimeout t') = loop b t'
          nextAction _ (ProcessStop r) = return (TerminateReason r)

processReceive :: [Dispatcher s] -> Timeout -> Process s ProcessAction
processReceive ds timeout = do
    s <- getState
    let ms = map (matchMessage s) ds
    -- TODO: should we drain the message queue to avoid selective receive here?
    case timeout of
        Infinity -> do
            (s', r) <- ST.lift $ BaseProcess.receiveWait ms
            putState s'
            return r
        Timeout t -> do
            result <- ST.lift $ BaseProcess.receiveTimeout (intervalToMs t) ms
            case result of
                Just (s', r) -> do
                  putState s'
                  return r
                Nothing -> do
                  return $ ProcessStop "timed out"

Now let us imagine that a new exception type is defined which can be thrown to a process (lightweight thread) - let's call it ForceUpgradeException. When this exception is delivered asynchronously to a process using throwTo, it will of course terminate the process. Any process however, can install an exception handler to deal with this. A special process, in my proposed model then, is a process (like GenProcess) that installs an exception handler for ForceUpgradeException and in the exception handler calls a Cloud Haskell API for migrating the process state to another node. Let's imagine that there is a primitive we can call to relocate the process state to the node we're failing over to - I realise that we're not actually failing so this is really a take-over.

-- assuming that we have something like
data ForceUpgradeException = ForceUpgradeException !NodeId
    deriving (Typeable, Show)
instance Exception ForceUpgradeException

relocate :: (Serializable a) => NodeId -> a
relocate nid state = mask $ sendCtrlMsg Nothing $ RelocateSignal nid state

The GenProcess code can easily add support for this, because it can mandate that the state is Serializable - we can therefore automate the relocation logic ourselves, but using try and handling ForceUpgradeException by calling relocate targetNode state'.

Now looking at relocate - going via the local node controller may be a good idea, as we can immediately track the fact that this process has been moved to another node (more specifically, to another running haskell program). We'll consider some of the implications of this later on too.

So before we get into the muddy details of (2) I want to clarify that we're saying....

  1. When upgrades are started, a ForceUpgradeException will be thrown (asynchronously) to the target process
  2. If a ForceUpgradeException is caught, then there is an API that can be used to move the process state to the other node as part of the take-over
  3. The relocate API is useless (or will end up terminating the calling process?) if called otherwise

A corollary issue we ought to consider is that of maintaining state invariants whilst a process is transitioning from one state to another. Consider a process that, for example, is in the middle of writing to a file. The implementer may wish to handle normal state changes by reading the contents of the file and writing some changes out before replying (or continuing) but this will break in the face of asynchronous exceptions, such that if the relocate call tries to read the file and serialise its contents for reconstitution on the take-over/v2 node, the content of the file may have become inconsistent due to reads/writes being terminated unceremoniously.

The obvious solution to this is to call mask before entering any such critical path and a generic process management API such as GenProcess should provide explicit, declarative support for doing this.


Migrating Serializable state to another node isn't impossibly hard. There are two core problems that need to be solved - 'addressability' and decoding. The latter problem is very simple to outline - we cannot use Typeable to identify the type and thence Binary to decode the incoming data; We might not have an instance for it on the target node, or if we do it may have changed between versions and decoding might fail, both of which would prove catastrophic. In either case, the Fingerprint that Typeable gives us may or may not provide enough information to identify the type, but CH currently sends a pointer to it, which is unlikely to be useful on a target node running a different image.

This use of a pointer is, of course, a very sensible optimisation. We wouldn't want to do away with that in the normal case, nor is it clear to me whether or not there is any other way to handle type identification on remote nodes without user intervention.

Handing Decoding

My proposal for handling this latter difficulty is very simple, and probably quite unpopular - make it the user's problem, at least to some extent. If the definition hasn't changed then they know this and can use an API call in Primitives.hs to decodeFromVersion or some such, which would just defer to binary's decode. We would basically have to ignore the fingerprint in this situation.

If the expected state has changed, then the user needs to deal with this themselves. If the type which the server loop is working with has changed, then they should simply write a transform from a -> b and use myTransform . decodeFromVersion to handle it. If there are changes to the data types involved, then the user should handle these themselves, and we can support both idioms simply by making sure that relocated process state is given to the destination (upgraded) node as a ByteString. The user can write code using ByteString and Binary to grab the various chunks of the original (prior) state and transform it however they wish.

This might sound complicated, but it's only a bit more complicated than what you've got to do in Erlang in practise. Erlang can do magical upgrades because modules can change on the fly, but more often than not record/tuple definitions have changed and the managed process APIs in OTP such as gen_server actually provide an explicit callback for code upgrades, just so that the developer can write these kinds of transformations themselves. Admittedly they've not got to bother with encoding/decoding, but I think that's a reasonable price to pay for a beautiful type system.

Before we can move on to 'addressability' and points (3) and (4), we have another set of issues to handle. When relocating a process, we have mandated that the relocate API call is provided with Serializable data representing the process' current view of its state. As mentioned previously, we cannot magically take the free variables and transmit them, as we know next to nothing about the image that the v2 node is running. It would be very unfortunate however, if the process running on node-v1 had some quantity of messages in its mailbox and these disappeared when the process was relocated to node-v2! As much as I'm willing to move away from Erlang's model for release upgrades, I'm not willing to loose my process' message queue!

Implementation Challanges

This is where it gets a bit more complicated....

All the messages in a process' mailbox are Serializable and so can be copied to node-v2 in theory. We do, however, have the same problem here as we did with the process state migration, viz what to do if there are messages in the mailbox which will not be recognised by code running on node-v2 due to API changes and so on. I suspect the only solution here is to ensure that the user has provided a means for decoding them properly. If we go back to the principle we had chosen for handling the process state translation, we gave the user a ByteString and made them decode it themselves. This isn't going to scale for the contents of the whole message queue, but we might be able to get away with this to some extent.

GHC precomputes the MD5 hash for the TyCon so I'm guessing that we will know if a type is not recognisable when we try to examine its fingerprint. Quite how this works in practise is a little hazy for me - I've not delved into the GHC sources enough to fully understand it - I suspect that the location of the type's pointer in the data section of the image might be used along with the MD5, so we may need a little runtime support in order to identify that? Maybe not though....

Clearly functions like decodeFingerprint in Cloud Haskell are going to fail regardless, because they basically take the fingerprint binary that is created with encodeFingerprint and pass it to toForeignPtr - because encodeFingerprint simply grabs a pointer to the type def (TyCon?) this will not work when the layout of the image (viz the code and data sections) has changed; Even if the Fingerprint was for the same type, this wouldn't work for identifying the type when running another image. And if not, is that the end of the world? I think not, as we can still serialise the data to a ByteString to send it and therefore we can come up with some means of identifying the types on another node running a different version of the image. It might be that the user needs to tag all their remote-able data types with a Word8 or some such, so that we can figure out how to convert them via a lookup table. Or perhaps just using the MD5 is sufficient?

I'm not 100% clear on how to do this, but it doesn't seem impossible anyway. In fact, one advantage of working via some kind of lookup table would be that we could handle transformations and straight decodes identically. The developer writing the upgrade will need to decide what to do with messages of types that no longer exist, as well as types for which the TypeCon has fundamentally changed anyway. The code that handles relocate nid state instructions in the node controller would basically also require a table mapping (Serializable a) => a -> ByteString and the user would then annotate the output of Data.Binary.encode somehow and on the other side, there would be another table from (Serializable a) => ByteString -> Message a wherein the user would figure out the mapping between types themselves. If it was necessary to go down that path, we could provide some build/compile time support for generating the lookup/mapping tables I'm sure.

Of course for now, I'm leaving all the lovely details of how to handle that to the reader! ;)

There is also the problem of moving the process' typed channels to the new node. We would need to re-establish the ReceivePort for each of these in the new node controller, so fair amount of internal node controller code will be required to support this properly. In addition to the challenge of handling the types and their transformations onto the remote node, there is a not insignificant amount of work required to handle draining the message queue and transactional channels cleanly due to the complex nature of the CQueue implementation. Whilst this is tricky to get right, because the writes go via the node controller, we should be able to avoid getting deadlocked whilst doing so.

Again, I'm going to leave this as another item for further discussion.

Handing 'Addressability'

After worrying about how to transfer typed messages from one node to another, when both nodes are running different executables, this ought to be a breeze.

Processes are 'addressable' via their ProcessId, which encapsulates the NodeId for the node on which they reside as well as a node-local identifier for the process' thread. If processes are to be re-locatable then we must ensure that wherever the ProcessId has escaped to, we inform the actor possessing it of this change. This isn't actually too big a problem. When a process decides to relocate to another node, we must send a control message to all other nodes in the system informing them of this fact. All process message passing interactions happen via the local node controller, and it is there that we should deal with this fact. Because a ProcessId is Serialisable and can be sent to arbitrary receivers, it would be necessary to hold a map from the old process identifiers to the new ones if we allowed processes to arbitrarily call relocate. If we have to keep these mappings in all the node controllers throughout the system's lifetime, we're going to leak space like nobody's business. I propose therefore, that we keep the existing semantics which are that once node-v1 goes away, messages sent to the original ProcessId pointing to that node will be silently dropped. This is a bit nasty compared to Erlang's upgrade mechanism, but I don't think we can scale otherwise - I'm certainly open to suggestions about how to better manage this.

At a minimum however, a process migration should generate a Monitor signal, thus I propose that we add a new DiedReason indicating that a process has moved.

data DiedReason = 
     -- | Normal termination
     DiedNormal
  |  DiedProcessRelocated !ProcessId
  |  DiedNodeRelocated !NodeId
  |  -- snip...

This at least allows remote processes to monitor the fact that the process/node they were interacting with has moved, so that code which holds on to one or more ProcessId items - or NodeId for that matter - can update itself when the move is complete.

Talking of monitors, when migrating a process to node-v2, we will need to re-establish all monitors and links, and any node controller that receives the relocation-in-progress signal should do the same. Nodes which are disconnected when the take-over happens will basically be stuck, and attempting to track all the peers we've seen over time so as to handle this sounds unworkable to me.

Dealing with synchronisation

While the -v1 node controller is merrily attempting to migrate a process' state and mailbox to another node, and dealing with incoming and outgoing links and monitors (!), it's highly likely that other nodes are attempting to interact with node-v1. Ignoring connection requests and other infrastructure level calls from Network.Transport is almost certainly the right thing to do, but what should we do about messages that other processes have asked us to deliver? And how do we ensure that all the processes in an application's supervision trees are upgraded seamlessly? Should we enforce some specific ordering, or should the user be able to specify this, or both? If a process is linked to another process that we wish to relocate - what semantics do we require? And to what extent do we need/want to attempt to synchronise with other nodes whilst this is going on?

And.... Should we communicate something about this at the Network.Transport level? I would expect this to impact significantly there, not least because we'd want to automatically establish a connection to node-v2 if we were previously connected to node-v1. And of course the semantics of doing that are flaky for distributed platform based on asynchronous messaging, as evidenced by the fact that reconnect is explicit. We won't even talk about what Erlang does .

What occurs to me about this, is that we do have to force the user to get involved in the migration. Not only is this normal even for Erlang release upgrades (when data types change for example), it is reasonable IMO and will still be simpler in practice than having to build some complex beast that handles down time. The architectural complexity would be primarily in CH, but the user would have to do some work to make the migration happen.

Another thing that we must consider here is that some kinds of process will not be able to magically migrate their state. A process which has, for example, opened a socket and started listening (or sending) will not be able to do this. The same would be true for a distributed erlang application that was failing-over to another node, though code upgrades would not cause that problem there. In practise, this is also not a huge difficulty, as a load balancer in front of the application isn't a hugely complex thing to have to configure. There is a bit of complexity leakage here, in that a process which is maintaining some kind of session, is going to have to make sure they can re-establish that state on node-v2. Again, this should be do-able because the load balancing mechanism should hide the fail-over from the client and the state migration can simply rely on the relocate and migration/translation mechanism. Of course, it's entirely up to the application developer if they want to handle session state in this way, but the point is that we can support doing so when required.

So......

Thoughts/Ideas on a postcard please!!! :)

simonpj commented 11 years ago

Some quick thoughts.

First, it's an ambitious project. I rather think there may be other things that need doing first. But perhaps you are motivated by a particular application.

Second, the type of relocate looks very strange. Shouldn't it take the state as input (to be serialised) not return it? And what code should be run at the other end? You need some way to specify which function to call in the new image.

Third, if the old process (OldP) has state of type OldP.T, the new process (NewP) has to parse that state somehow. Imagine that we serialised it as a string with Show. Then OldP's 'show' would generate the string, and NewP's 'read' would parse it. They had better agree!

One possibility is to insist that OldP's state type and the type that NewP parses are identical. (After parsing, NewP might then translate that old state into a new richer state, but that's a separate matter.) What does "identical" mean? One simple possiblity would be "the same all the way down" which is what the MD5 finger print does.

And yet it may be too much to insist that the types are identical. An alternative would be use use a self-describing format, much as happens for web services. I'm out of my depth here, but I'd look at JSON and Protocol Buffers and that kind of stuff. No point in re-inventing the wheel.

hyperthunk commented 11 years ago

HI Simon - thanks for the feedback!

First, it's an ambitious project. I rather think there may be other things that need doing first.

I quite agree. This is very low priority for me, but I thought that others (such as Pankaj) might be interested, and I like to capture the conversation around these ideas on the issue tracker.

Second, the type of relocate looks very strange

That's just me typing too late at night.

And what code should be run at the other end? You need some way to specify which function to call in the new image.

Yes indeed. I was thinking there would be a structured API for this, but you're right that this information needs to come from the 'user' as it were. Over email Jeff suggested that the code calling relocate should provide a Closure (Process ()) which makes sense.

And yet it may be too much to insist that the types are identical

Indeed this is one of the big challenges. Again over email, Jeff suggested solving this first:

"It might be nice to use a system to ensure that types with different fingerprints can be automatically converted. This is an issue that comes up a lot (also in sending messages between different versions, in the case of partial upgrades) and might be worth solving first." - Jeff Epstein

I might open a separate bug for that at some point, but I want to see if someone bites and takes an interest in owning this first. :)

basvandijk commented 11 years ago

Have you looked at safecopy? It solves a very related problem.

hyperthunk commented 11 years ago

@basvandijk that's completely awesome - I didn't know it existed. It looks very similar to what we need - if we adapt that approach to work with lazy bytestrings and the one or two other differences - the automatic derivation would need to be done a bit differently and so on - but yes, this looks like an ideal approach. Thanks for pointing it out!

LaurentRDC commented 2 weeks ago

I have been thinking about this problem as well.

I have heard great things about Akka, an actor-model framework for the Java Virtual Machine. Akka documentation has this to say about rolling upgrades:

There are two parts of Akka that need careful consideration when performing an rolling update.

  • Compatibility of remote message protocols. Old nodes may send messages to new nodes and vice versa.
  • (something irrelevant)

Note that there's nothing special about state being copied -- indeed, state could be sent via a message from one old node to a new node.

For Cloud Haskell, it seems like the restrictions should be strictly the same: as long as the two Cloud Haskell clusters (one old, one new) can share messages between each other, we're golden.

To me, this moves the challenge of rolling upgrades from Cloud Haskell to versioned serialization. I can imagine writing helper functions to provide versioned put and get for Binary instances. But what about Typeable?