unisonweb / unison

A friendly programming language from the future
https://unison-lang.org
Other
5.73k stars 266 forks source link

RFC: Unified APIs for distributed programming, durable state, fault-tolerance and live redeployments #141

Closed pchiusano closed 7 years ago

pchiusano commented 7 years ago

This document describes a set of core Unison functions for expressing fault-tolerant multi-node systems, including systems that must be dynamically updated and redeployed without downtime. You can jump to the full api (about 30 functions) or keep reading for a guided explanation of the whole thing.

Remarks:

Lastly, thank you to all who have contributed to this design or worked on earlier iterations!

Contents:

  1. The basics of multi-node computation: Remote and Remote.transfer
  2. Ephemeral Node local state: Channel values, Remote.send and Remote.receive
  3. Heartbeats, supervision, and lifecycle management
  4. Capabilities and dynamic loading and linking
  5. Durable state
  6. Node spawning and sandboxing
  7. The full API
  8. Appendix: History and context

The basics of multi-node computation: Remote and Remote.transfer

Let's start with the basics. We have a type, Remote a, which forms a monad:

-- Promote a pure value to `Remote`
Remote.pure : ∀ a . a -> Remote a

-- Sequencing of remote computations
Remote.bind : ∀ a b . (a -> Remote b) -> Remote a -> Remote b

We can write remote computations using a do block, which desugars in the usual way:

do Remote
  x = 23 -- just use single `=` rather than `let x =` as in Haskell
  y := pure 42 -- use `:=` rather than `<-` as in Haskell
  pure (x + y)

Note on syntax differences from Haskell: We use a single = rather than let x = ..., and we use := rather than <- for monadic bind.

Remote computations may proceed on multiple values of type Node. A Node is conceptually a "location where computation can occur". At runtime, a representation of Node might be a hostname + public key.

To move a Remote computation to a different node, we use Remote.transfer:

-- Transfer control of remainder of computation to target node
Remote.transfer : Node -> Remote Unit

ex1 : Node -> Node -> Remote Number
ex1 alice bob = do Remote
  xs = [92,3,145,9,2,64]
  Remote.transfer alice
  sorted-xs = sort Number.Order xs -- sorting occurs on `alice` node
  Remote.transfer bob
  pure (sum sorted-xs) -- summation occurs on `bob` node

Implementation note: if we desugar the do block, the continuation following a Remote.transfer is an arbitrary function Unit -> Remote a. This function is sent to the recipient Node over a forward-secret, mutually authenticated encrypted pipe (using one of the Noise protocols), any missing needed dependencies (as determined via Unison's "nameless" cryptographic hashing scheme) are synced and cached, and the computation then proceeds on the recipient. Note that the sender does not wait around for the recipient to "complete" the computation (which might be forever); it is "fire and forget". The sender transfers the computation and its job is done. Error handling and supervision will be handled separately.

Remote computations can be forked:

Remote.fork : ∀ a . Remote a -> Remote Unit

This starts the computation running asynchronously, purely for its effects.

And computations can fail, explicitly.

Remote.fail : ∀ a . Cause -> Remote a

-- this is TBD
type Cause = Error Text Node | Stopped | Expired | Unresponsive Node

Note on syntax: We use type rather than data to introduce the declaration of a new type.

Error handling and supervision is discussed later.

Ephemeral Node local state: Channel values, Remote.send and Remote.receive

We can create (typed) channels, and send and receive values on these channels:

-- Create a `Channel`; just a GUID at runtime
channel : ∀ a . Remote (Channel a)

-- Send a value to a `Channel` on the current node
Channel.send : ∀ a . Channel a -> a -> Remote Unit

-- Receive a value from a `Channel` on the current node,
-- with an expiration controlled by the supplied heartbeat
-- Note: when the outer `Remote` is bound, listener is registered
Channel.receive : ∀ a . Heartbeat -> Channel a -> Remote (Remote a)

-- Receive multiple values from a `Channel` on the current node,
-- with an expiration controlled by the supplied heartbeat
-- Note: when the outer `Remote` is bound, listener is registered
Channel.subscribe : ∀ a . Heartbeat -> Channel a -> Remote (Remote a)

A Channel is just a random, unique tag, and receive and subscribe just register a callback associated with that tag in the node runtime. They differ only in what happens when a value is received: receive deregisters this callback as soon as a value is produced, while subscribe keeps the callback around, and enqueues any values it receives. Both receive and subscribe will deregister once the associated Heartbeat is stopped or expires on its own, and using the resulting Remote a after this point will fail the computation.

Primitives for creating and manipulating Heartbeat values are covered next.

Heartbeats, supervision, and lifecycle management

A Heartbeat is a value used to control the lifecycle of a computation (or, we'll see later, durable state), and is used as a handle for supervision. The simplest way to create a heartbeat is from a duration of time:

-- Create a `Heartbeat` that stops beating after a duration,
-- unless a `Heartbeat.reset` is performed
heartbeat : Duration -> Remote Heartbeat

Duration.seconds : Number -> Duration

After the given duration elapses, the heartbeat is said to be stopped, and anything linked to the heartbeat will be stopped as well. Here are some other functions on Heartbeat:

-- Stop the heartbeat; anything linked to the heartbeat will be terminated / cleaned up
Heartbeat.stop : Heartbeat -> Remote Unit

-- Kill the heartbeat with an explicit error; anything linked to the heartbeat will be terminated / cleaned up
Heartbeat.fail : Cause -> Remote Unit

-- Ensure that the `Heartbeat` has at least the provided duration
-- to live. If the heartbeat has greater than this duration still 
-- remaining, then this is a no-op.
Heartbeat.reset : Duration -> Heartbeat -> Remote Unit

-- The resulting heartbeat is live only if _both_ heartbeats are live
Heartbeat.both : Heartbeat -> Heartbeat -> Heartbeat

-- The resulting heartbeat is live if _either_ heartbeat is live
Heartbeat.either : Heartbeat -> Heartbeat -> Heartbeat

-- Be notified when a heartbeat completes
-- When outer `Remote` is bound, the supervisor is registered
Remote.supervise : Heartbeat -> Remote (Remote Cause)

-- Cancel the lexically scoped computation if/when the heartbeat stops,
-- and notify any supervisors of completion or errors
Remote.link : ∀ a . Heartbeat -> Remote a -> Remote a

We can think of a heartbeat as being in one of three states:

Remote.supervise waits until the heartbeat stops, and notifies us of the cause. This is useful for setting up supervision trees.

In general we use heartbeats rather than explicit cleanup actions to manage lifecycle of computations and durable data.

An example

In the following code, if the forked computation takes more than 10 seconds, the Channel.subscribe operation will be cancelled:

do Remote
  c := channel
  for-10s := heartbeat (Duration.seconds 10)
  read-c := Channel.subscribe for-10s c
  Remote.fork <| do Remote
    Remote.transfer alice
    r1 := some-huge-computation
    r2 := another-big-computation
    Channel.send c r1
    Channel.send c r2
  Remote.sequence [read-c, read-c]

In this case, it might be useful to also cancel the forked computation as soon as we know the other side will no longer be listening. We can do that with the primitive, Remote.link, given above. Putting it together, we can update the above code to cancel the remote computation if more than the 10 seconds elapse:

do Remote
  c := channel
  for-10s := heartbeat (Duration.seconds 10)
  read-c := Channel.subscribe for-10s c
  (Remote.fork `compose` Remote.link for-10s) <| do Remote
    Remote.transfer alice
    r1 := some-huge-computation
    r2 := another-big-computation
    Channel.send c r1
    Channel.send c r2
  Remote.sequence [read-c, read-c]

Since the forked computation is linked to the heartbeat, it will only continue as long as the heartbeat is live. In the event that it is killed, the read-c calls will also fail, and the callbacks associated with the channel will be deregistered.

Implementation notes

The above API is very flexible, but it implies some things about the Unison runtime. Consider this code:

Note: Thanks to @runarorama for helping to flesh out some of these issues

Remote.fork-linked : ∀ a . Heartbeat -> Remote a -> Remote Unit
Remote.fork-linked h = Remote.fork `compose` Remote.link h

-- h1 : Heartbeat
do Remote
  get-cause := Remote.supervise h1
  Remote.fork-linked h1 <| do Remote
    Remote.transfer alice
    r1 := some-huge-computation
    r2 := another-big-computation
    Channel.send c r1
    Channel.send c r2
  Remote.transfer bob
  cause := get-cause
  case cause of
    Error err node -> ...
    ...

Think about what happens if the alice node were to get hit by an asteroid while in the middle of some-huge-computation. At that time, the computation is in a scope where h1 is a linked heartbeat, so we'd hope that the supervising computation on bob which calls get-cause gets notified with either an Unresponsive alice (if the asteroid strikes before alice can get out a reply) or an Error "Asteroid strike!!" alice if the asteroid strike gets detected in time to send out a reply to any supervisors.

What's needed to make this happen? Here's a sketch:

Because the status map is a bit 'sticky', this protocol seems to work even if a computation is bouncing rapidly between multiple Unison nodes, and avoids the race condition of a heartbeat status request "chasing" the a computation as it hops between nodes.

Concern: I have a vague concern that this heartbeat and supervision API is too expressive and ends up being unimplementable. Can anyone think of programs whose meaning is unclear or seem like nonsense? (In particular, what if Heartbeat values are shared and supervised in multiple places...?) There are likely ways to make the API a little less expressive to address these things. For instance, in Erlang, supervision is tied to process creation; you don't get a first class "supervision handle" that multiple processes can supervise, so the supervision graph is necessarily a tree. (I am not an Erlang expert, anyone who is feel free to pipe in) We could likely mimic that design here.

Capabilities and dynamic loading and linking

Nodes can load and link against values dynamically:

-- Introduce a new, globally unique capability, of a particular type
capability : ∀ a . Remote (Capability a)

-- Introduce a capability on the current node, so long as the heartbeat is live
Capability.provide : ∀ a . Heartbeat -> Capability a -> a -> Remote Unit

-- Update a capability on the current node, if it exists
Capability.update : ∀ a . Capability a -> (a -> a) -> Remote Unit

-- Ask for a capability dynamically
Capability.ask : ∀ a . Capability a -> Remote (Optional a)

It's probably not obvious, but I believe this is the key building block for doing live redeployments and upgrades of a running Unison system, since we can now bind against dependencies dynamically. I also suspect you can do interesting things like build 'IPFS in Unison', replace DNS, build one (or more) P2P computing fabrics for sharing compute resources (something like Golem), and more (I think!). Imagine:

peers : Capability (Vector (Node, BloomFilter))

-- Ask for the capability at the current node, or use bloom filter
-- optimized traversal of peers to find a node with requested capability
discover : ∀ a . Capability a -> Remote a

Capabilities are also used as the sole FFI for Unison. Nodes with access to external C libraries, a local database, or whatever, are just nodes with more provided capabilities. The unsafe linear algebra function written in C that you want to call from Unison would have an associated Capability (Matrix -> Remote Matrix) for using the function on whatever nodes have access to it. This post has a bit more detail.

Durable state

There are a couple new things here:

-- Create a new key-value store, encrypted with the key.
-- The data will be deleted when the provided heartbeat stops.
Index.empty : ∀ k v . Heartbeat -> Key -> Remote (Index k v)

-- Insert a value, returning a `Clock` which has visibility of this update
Index.insert : ∀ k v . Clock -> k -> v -> Index k v -> Remote Clock

-- Lookup a value associated with a key
Index.lookup : ∀ k v . Clock -> k -> Index k v -> Remote (Optional v)

Key is just a symmetric encryption key; durable storage is always encrypted. We might generate keys via:

AES256.key : Remote Key
Blowfish.key : Remote Key
-- etc

The Clock controls visibility of updates, allowing for updates to happen asynchronously and/or on eventually consistent storage. Index.lookup waits until the 'committed' version of the storage is greater than or equal to the given clock value. Likewise, Index.insert waits to insert until the 'commited' version is greater than or equal to the given clock before it performs the insert.

Note: The Index.insert and Insert.lookup functions could take a Heartbeat to control how long to wait for the clock before proceeding with the operation, but I think this is overkill and it's fine to set this time globally (to, say, a few seconds).

The Clock API

Here's the API for working with clocks:

-- True if the first clock occurs at or after the second clock
Clock.>= : Clock -> Clock -> Boolean

-- ∀ c, c >=_Clock zero is true
Clock.zero : Clock

-- increment c >= c
Clock.increment : Clock -> Clock

-- Returns two clocks, c1, c2 that are incomparable
-- Inverted by merge
Clock.split : Clock -> (Clock, Clock)

-- merge c1 c2 >= c1 and merge c1 c2 >= c2
Clock.merge : Clock -> Clock -> Clock

A reasonable implementation might be based on interval tree clocks

Node spawning and sandboxing

Nodes can be created dynamically:

-- Create a new node 'in the same location' as the current node
Remote.spawn : Heartbeat -> Optional Key -> Sandbox -> Remote Node

-- TBD
type Sandbox =
  Sandbox CPU% Memory Storage (∀ a . Node -> Remote a -> Remote a)

Nodes are spawned with a sandbox, controlling who has what access to the node, and an optional symmetric key. If no key is provided, a new asymmetric keypair will be generated for the node, and the node reference will just include the public key. If a symmetric key is provided, the Node value itself will contain a reference to this key at runtime, and the node reference itself should be considered secret. Nodes with symmetric keys are useful for temporary nodes where the cost of generating asymmetric keys and doing asymmetric crypto is too high.

The full API

Here's the full set of primitive functions:

-- Promote a pure value to `Remote`
Remote.pure : ∀ a . a -> Remote a

-- Sequencing of remote computations
Remote.bind : ∀ a b . (a -> Remote b) -> Remote a -> Remote b

-- Transfer control of remainder of computation to target node
Remote.transfer : Node -> Remote Unit

-- Start running a remote computation asynchronously
Remote.fork : ∀ a . Remote a -> Remote Unit

-- Explicitly fail a computation
Remote.fail : ∀ a . Cause -> Remote a

-- this is TBD
type Cause = Error Text Node | Stopped | Expired | Unresponsive Node

-- Create a `Channel`; just a GUID at runtime
channel : ∀ a . Remote (Channel a)

-- Send a value to a `Channel` on the current node
Channel.send : ∀ a . Channel a -> a -> Remote Unit

-- Receive a value from a `Channel` on the current node,
-- with an expiration controlled by the supplied heartbeat
-- Note: when the outer `Remote` is bound, listener is registered
Channel.receive : ∀ a . Heartbeat -> Channel a -> Remote (Remote a)

-- Receive multiple values from a `Channel` on the current node,
-- with an expiration controlled by the supplied heartbeat
-- Note: when the outer `Remote` is bound, listener is registered
Channel.subscribe : ∀ a . Heartbeat -> Channel a -> Remote (Remote a)

-- Create a `Heartbeat` that stops beating after a duration,
-- unless a `Heartbeat.reset` is performed
heartbeat : Duration -> Remote Heartbeat

Duration.seconds : Number -> Duration

-- Stop the heartbeat; anything linked to the heartbeat will be terminated / cleaned up
Heartbeat.stop : Heartbeat -> Remote Unit

-- Kill the heartbeat with an explicit error; anything linked to the heartbeat will be terminated / cleaned up
Heartbeat.fail : Cause -> Remote Unit

-- Reset the amount of time remaining in the heartbeat
Heartbeat.reset : Duration -> Heartbeat -> Remote Unit

-- The resulting heartbeat is live only if _both_ heartbeats are live
Heartbeat.both : Heartbeat -> Heartbeat -> Heartbeat

-- The resulting heartbeat is live if _either_ heartbeat is live
Heartbeat.either : Heartbeat -> Heartbeat -> Heartbeat

-- Be notified when a heartbeat completes
-- When outer `Remote` is bound, the supervisor is registered
Remote.supervise : Heartbeat -> Remote (Remote Cause)

-- Cancel the lexically scoped computation if/when the heartbeat stops,
-- and notify any supervisors of completion or errors
Remote.link : ∀ a . Heartbeat -> Remote a -> Remote a

-- Introduce a new, globally unique capability, of a particular type
capability : ∀ a . Remote (Capability a)

-- Introduce a capability on the current node, so long as the heartbeat is live
Capability.provide : ∀ a . Heartbeat -> Capability a -> a -> Remote Unit

-- Update a capability on the current node, if it exists
Capability.update : ∀ a . Capability a -> (a -> a) -> Remote Unit

-- Ask for a capability dynamically
Capability.ask : ∀ a . Capability a -> Remote (Optional a)

-- Create a new key-value store, encrypted with the key.
-- The data will be deleted when the provided heartbeat stops.
Index.empty : ∀ k v . Heartbeat -> Key -> Remote (Index k v)

-- Insert a value, returning a `Clock` which has visibility of this update
Index.insert : ∀ k v . Clock -> k -> v -> Index k v -> Remote Clock

-- Lookup a value associated with a key
Index.lookup : ∀ k v . Clock -> k -> Index k v -> Remote (Optional v)

-- Symmetric key generation
AES256.key : Remote Key
Blowfish.key : Remote Key

-- True if the first clock occurs at or after the second clock
Clock.>= : Clock -> Clock -> Boolean

-- ∀ c, c >=_Clock zero is true
Clock.zero : Clock

-- increment c >= c
Clock.increment : Clock -> Clock

-- Returns two clocks, c1, c2 that are incomparable
-- Inverted by merge
Clock.split : Clock -> (Clock, Clock)

-- merge c1 c2 >= c1 and merge c1 c2 >= c2
Clock.merge : Clock -> Clock -> Clock

-- Create a new node 'in the same location' as the current node
Remote.spawn : Heartbeat -> Optional Key -> Sandbox -> Remote Node

-- TBD
type Sandbox =
  Sandbox CPU% Memory Storage (∀ a . Node -> Remote a -> Remote a)

Appendix: History and context

This post has an early writeup of how Unison's hashing scheme could be used to build a robust multi-node computation story. That eventually got an implementation, and as a demo I put together a simple multi-node search engine in Unison. That raised a couple issues and questions, some discussed in that post, some discussed in this post about microservices, and some that I have just been ruminating on. 🤔

The big questions were around:

runarorama commented 7 years ago

It's not totally clear how this type supports multiple values:

Channel.subscribe : ∀ a . Heartbeat -> Channel a -> Remote (Remote a)

Can you expand on that?

pchiusano commented 7 years ago

@runarorama sure - you bind the outer Remote, and at that point, you are listening on the channel. You can then bind the inner Remote repeatedly to get values:

do Remote
  one := Channel.subscribe forever chan
  Remote.fork <| do Remote ... something that writes to `chan`
  Remote.replicate 14 one -- read 14 values from the channel

I imagine the implementation of subscribe would create a queue and attach it to the channel as long as the heartbeat is live, and the inner Remote a would just dequeue from this queue.

runarorama commented 7 years ago

OK, I see. This looks really great, by the way. Thanks for writing this up.

atacratic commented 7 years ago

Looks great! I love how the Index API uses Clocks.

In your IoT post, Capability.ask took a Node argument. Could you say a bit about why that went? I'm also not quite sure about the Capability (Matrix -> Remote Matrix) example. How can you generically turn a Matrix -> Matrix available at one node into a Matrix -> Remote Matrix? Maybe you don't because the former never exists.

Would it be possible in principle for access to an Index to be a Capability, and the Index API not to mention Remote? Trying to work out if there is some non-orthogonality there. Or maybe Remote being used as 'we'll implement this in Haskell' rather than 'this is an essential part of the business of remoting'.

pchiusano commented 7 years ago

Thanks 😀

On a phone, but Capability.ask doesn't need the Node since it can just ask the local node. You could implement Capability.ask-at that takes a node and just does a transfer.

You have the right idea about Matrix -> Matrix. That that cannot exist, as that would imply the ability to transport arbitrary foreign code between nodes. Any foreign function (even functions that could be pure) will return a Remote, so users of the function can contact the node which actually has the implementation.

You'll have to flesh out what you mean about Index not mentioning Remote. I think it must, by the same reasoning as above?

Another thing that is important for security - you cannot transport a function to another node that reads from the local file system. Instead you have to ask for that capability once the computation has been transferred, which gives a nice centralized simple way to do sandboxing (just allow only certain caps, all others return None). On Thu, Jan 19, 2017 at 6:05 PM Chris Gibbs notifications@github.com wrote:

Looks great! I love how the Index API uses Clocks.

In your IoT post, Capability.ask took a Node argument. Could you say a bit about why that went? I'm also not quite sure about the Capability (Matrix -> Remote Matrix) example. How can you generically turn a Matrix -> Matrix available at one node into a Matrix -> Remote Matrix? Maybe you don't because the former never exists.

Would it be possible in principle for access to an Index to be a Capability, and the Index API not to mention Remote? Trying to work out if there is some non-orthogonality there. Or maybe Remote being used as 'we'll implement this in Haskell' rather than 'this is an essential part of the business of remoting'.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/unisonweb/unison/issues/141#issuecomment-273927147, or mute the thread https://github.com/notifications/unsubscribe-auth/AAArQplQV_6Tu-x8amrqB9Gl1W5kgbifks5rT-xBgaJpZM4LdJk3 .

atacratic commented 7 years ago

OK, cool, I see.

One thing I understand now is that anything non-transferable has to be wrapped in a Remote ().

I feel a slight skepticism of the fact that Remote is handling locality, FFI, and (with Index) state as well. But I guess that's an old issue - it's kind of like Unison's IO. I certainly don't have any other suggestions...

If you transfer a function returning Remote (a) to another node, is there anything you know in general about how that changes its effect? For example, if you transfer after creating your index but before applying Index.insert, is Insert going to phone home to update the index where you came from? Or fail since the index isn't present on the current node? (Or quietly re-transfer you back again?)

I think (from seeing DIndex in the codebase) that Index itself is not distributed, but rather is tied to a node?

pchiusano commented 7 years ago

If you transfer a function returning Remote (a) to another node, is there anything you know in general about how that changes its effect? For example, if you transfer after creating your index but before applying Index.insert, is Insert going to phone home to update the index where you came from? Or fail since the index isn't present on the current node? (Or quietly re-transfer you back again?)

That is a good question.

Durable storage isn't tied to any node. I realized that is too inflexible, and doesn't square well with nodes being disposable, temporary things, since you usually want to keep durable state around longer than a particular computation that acts on that state.

For individuals running a Unison node server, the physical storage will be tied to that server, so yes, that server will need to be contacted for durable mutable data created there. I don't think the primitive function Index.insert/lookup functions should "quietly transfer you back", since you may want to execute some computation very close to the data. But maybe an Index.insert' does the "quiet transfer back" and it could be defined in terms of Index.insert.

For a service like unison.cloud or similar, I think it makes sense to use a shared cloud storage layer for all the nodes, or at least all the nodes in a region, or something like that. The Clock API allows for this sort of thing, and it also allows for async updates to a 'local' storage layer.

I feel a slight skepticism of the fact that Remote is handling locality, FFI, and (with Index) state as well. But I guess that's an old issue - it's kind of like Unison's IO. I certainly don't have any other suggestions...

Yeah. It is possible to make the Remote effect more fine-grained. I originally started out doing that (like I started with Remote, which was 'just' pure computation + Remote.transfer, and Remote! which added mutable state + IO), then decided it was overkill. If Unison gets effect typing + algebraic effects (which I quite like), then I might revisit this choice, since then there's no real penalty to making the effects more fine-grained.

atacratic commented 7 years ago

Makes sense!

Last question (probably): is it possible with these primitives to write a receive function of type Heartbeat -> Channel a -> Channel b -> Remote (Either a b)? A 'return me the first thing to arrive on either of these channels'?

pchiusano commented 7 years ago

Yes! See if you can figure it out. Lmk if are stuck or just want me to give answer. 😀 On Mon, Jan 23, 2017 at 6:34 PM Chris Gibbs notifications@github.com wrote:

Makes sense!

Last question (probably): is it possible with these primitives to write a receive function of type Heartbeat -> Channel a -> Channel b -> Remote (Either a b)? A 'return me the first thing to arrive on either of these channels'?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/unisonweb/unison/issues/141#issuecomment-274652885, or mute the thread https://github.com/notifications/unsubscribe-auth/AAArQlvGKGELyvLoE2ndQFyFhRTCF7J1ks5rVTkTgaJpZM4LdJk3 .

atacratic commented 7 years ago

Hmm... I came up with the following. Hopefully you'll tell me I've overcomplicated things!

-- Receive the first value sent on either of two channels, returning that value immediately.
-- Guarantees to consume only one value from only one of the channels.
receive-either : Heartbeat -> Channel a -> Channel b -> Remote (Either a b)
receive-either h ca cb = do Remote
                           cab := channel
                           read-ab := Channel.receive h cab
                           h' := Heartbeat.clone h
                           Remote.fork <| do Remote
                             read-a := Channel.receive h' ca 
                             Channel.send cab (Either.Left read-a)
                           Remote.fork <| do Remote
                             read-b := Channel.receive h' cb 
                             Channel.send cab (Either.Right read-b)
                           -- need to handle race condition in which a second 
                           -- value is injected into cab as heartbeat is being 
                           -- stopped?  Assume yes.
                           Remote.fork <| do Remote
                             cleanup-period := heartbeat $ Duration.seconds 1
                             read-ab := Channel.receive cleanup-period cab
                             either (\a => Channel.send ca a) (\b => Channel.send cb b) read-ab
                           -- but if channels are buffered then this might 
                           -- reorder the values in flight :-(
                           -- Now return the actual result...
                           read-ab <* Heartbeat.stop h'

(You'll see I had to invent Heartbeat.clone - otherwise what duration do I give my new heartbeat? I'm assuming I can't use h because stopping that would have effects on whatever else was linked to the heartbeat I was passed in.)

atacratic commented 7 years ago

Oh and actually I need to do my final Remote.fork on the right hand side of the final <* otherwise my cleanup might eat the value I'm trying to return...

pchiusano commented 7 years ago

@atacratic you don't need to worry about a race condition where the second result comes as the winner is being read. A Channel.receive is more like a read from an MVar. Once a Channel.receive obtains a single value, that is the value that will always and forever be returned for that particular receive (you can probably imagine an implementation that would ensure this).

receive-either (❤️) c1 c2 = -- emoticons are valid identifiers, ha
  -- subtle question: should loser be unsubscribed from channel
  race (do Remote { c1 := Channel.receive (❤️) c1; Remote.map Left c1 })
       (do Remote { c2 := Channel.receive (❤️) c2; Remote.map Right c2 })

race : forall a . Heartbeat -> Remote a -> Remote a -> Remote a
race (❤️) r1 r2 = do Remote
  cr := channel
  h := Heartbeat.forever
  r := Channel.receive (❤️) cr
  here := Remote.here
  Remote.fork-linked ((❤️) `Heartbeat.both` h) <| 
    do Remote { r1 := r1; Channel.send-to here cr r1; Heartbeat.stop h }
  Remote.fork-linked ((❤️) `Heartbeat.both` h) <| 
    do Remote { r2 := r2; Channel.send-to here cr r2; Heartbeat.stop h }
  r

I think this brings up some questions about desired behavior. Some random notes that might not make much sense to anyone else. Say I have:

Remote.link h <| blah

If blah bombs with an error, any Remote.supervise on h should be notified. And if h is stopped, blah should be killed. Okay, but... what about:

Remote.link h <| do Remote 
  c := Channel.receive h2 chan
  ...

Suppose h is stopped after the Channel.receive h2 begins. Now what should happen? Here are some options:

  1. Nothing - that receive stays active as long as h2 is live, even though nothing will ever listen for it. This is simple but maybe problematic - it could lead to memory leaks.
  2. The receive should be cancelled immediately. So as part of the act of killing a Remote due to a stopped heartbeat, we cancel any outstanding waiting receives. I don't think this is correct though, since a receive on a channel may be shared in two places in the code.
  3. The receive should be cancelled as needed by the GC, basically, when it's clear that nothing can be listening on a receive it can be cancelled. Would use weak references for this.

I think 3 is the right behavior. But once we add that, I think we could get rid of the Heartbeat parameter from the Channel.receive and Channel.subscribe primitives. Instead, you could just use Remote.link on top of a receive to get the old behavior.

PS: minor formatting thing - I just use the Haskell syntax highlighting, it's close enough. And it's easier for me to read the code on mobile if you don't align at the =, just newline and indent one level like I did above. :)

pchiusano commented 7 years ago

@atacratic btw, thanks for taking a close look at this, this is really helpful I think!

atacratic commented 7 years ago

@pchiusano Thanks! Now I have a whole load of questions again... :)

I agree with your choice of 3.

pchiusano commented 7 years ago

I was going to go through the answers to your questions, but then I had another idea: what about just getting rid of Channel and introducing something like MVar (or even all of STM) as a primitive? The main point of Channel was just to provide ephemeral mutable state, but that could be provided very directly.

mvar : Remote (MVar a) -- note: I hate the name 'MVar'
-- usual interface

STM.tvar : a -> STM (TVar a)
STM.retry : STM a
STM.atomically : STM a -> Remote a
instance Monad STM

Then I was thinking this through... (again this might not make sense to anyone else). The TVar will at runtime be a (Node, GUID) pair, where the Unison runtime will keep a map of GUID to actual Haskell TVar. This way the TVar can be sent to other nodes, who can read/write the var by contacting the node that has the underlying TVar in its map. The trouble is when do we remove anything from that map. Is it safe to use GC reachability for this?

It seems like this could be problematic if the computation hops to another node, who gets the sole reference to a TVar, the GC runs, then the computation hops back and tries to write the TVar and oops it's gone! Like I am concerned about the case where only other nodes have a reference to the TVar. Maybe the solution is just "don't do that"; if you want the var to stick around, you need to keep a reference to it live on the node where it was created.

Needs more thought...

pchiusano commented 7 years ago

... okay, I've concluded TVar is a bad idea, it's too powerful. You could end up having to execute a distributed transaction, since the TVar values being edited/read may all live on different nodes. This gets into can of worms and unsolved research territory, so let's avoid that. :)

I like the MVar idea, so wanted to explore whether using GC reachability to determine when it's safe to collect the MVar is okay. Here's API:

var : Remote (Var a) -- empty var
Var.put : a -> Var a -> Remote Unit
Var.take : Var a -> Remote a

Some notes:

Should we shoot for same semantics as MVar, where the thread with the sole reference to an MVar that is blocked on a take/put will be thrown an exception? If we say yes, then this code doesn't work:

do Remote
  v := var
  Remote.fork <| do Remote
    Remote.transfer alice
    -- some stuff
    Var.put 42 v
  Var.take v

The trouble is that the Var.take will be the sole reference on the current node to that Var, so the Var could actually be GC'd right away. But the thread on alice will eventually phone home and write to the Var, so our take can succeed at that point. I see 3 options:

  1. Just add a Heartbeat argument to Var.take/put. The Var will be prevented from being GC'd during that period. This is very simple.
  2. Supervise the work on alice and keep a live reference to the Var as long as alice hasn't crashed? I don't think this works in general. Even if we have transferred a Var to another node, we have no idea whether that node will keep a reference to that Var or just discard it. Tracking this information accurately seems like a can of worms, though I think in principle it could be done.
  3. Don't try to shoot for the same semantics as MVar. So when a thread is blocked on a Var read, even if it is the last such thread with a strong reference to the Var, keep it around, in case some other node contacts us and writes to the Var. This seems like a good way to get zombie threads and memory leaks.

The first choice seems like the best option. I suspect you could even build something like 2. on top of it.

Okay, so to summarize:

I'm going let this marinate a bit, then I think I should produce a fresh draft incorporating this and any other changes. I'll probably commit something to a docs/ directory that can be versioned, rather than editing this issue, which doesn't have history.

@atacratic thanks again for the careful review and thoughtful questions. If you think of anything else let me know!

atacratic commented 7 years ago

I like the simplicity of the Var API.

Good not to try to solve distributed transactions or distributed garbage collection! I like the heartbeat approach to the latter.

I wonder if Capability is fundamental or if it can be built on top of Var?

Should 'get me a GUID' be on this API? guid: Remote (GUID) Either that or access to some kind of node ID (+RNG?) so the user can build their own GUIDs.

How is an Index retrieved after node restart/replace? i.e. how does any unison code get hold of an Index a that points to the same underlying storage? Does the Index a value need have been preserved continuously on a succession of active nodes? (And is that reasonable?)

I like this 'API map' I made...

 transfer        -- location
 fork            -- concurrency
 Var             -- state (ephemeral)
 fail, Heartbeat -- failure
 Index, Clock    -- state (persistent)
 Capability      -- ?
 (spawn)

Seems to relate each bit of the API to core concepts in the problem domain, more or less.

The only bit that troubles me is that state appears twice in quite different ways. Makes me wonder about replacing Index with a Persist a, where a is a pure unison value which might happen to be a huge key/value map. Since we have universal value serializability why not have universal value persistence too? Probably adds challenges to building a performant mutable index though.

Looking forward to marinated respin of the doc :-)

pchiusano commented 7 years ago

I wonder if Capability is fundamental or if it can be built on top of Var?

Hmmm... maybe? Capability serves two purposes - it allows for node-specific functionality. This was really the main usage I was envisioning at first. Like one node has a GPU or an FPGA attached, with special capabilities present.

Then I realized Capability was just a general way of declaring things at global scope that can be dynamically bound, which I suspect is needed to bootstrap a system. Your question about "how do I find out about an Index" is along these lines. But I'm a bit hazy on bootstrapping and need to work through some examples to understand it better. I just have a vague intuition that these primitives are sufficient. A good thought exercise: how would you bootstrap from 0 a running system with 10k nodes? You have to start with some piece of information that is known globally, static, and durable, which is then used to derive other information, and so on, until the system is live and at its steady state.

Let me know if you do any thinking about this... I can see how Var by itself could do the dynamic binding of dependencies, what about bootstrapping? (Maybe we should try doing an example of how bootstrapping would work and this will become more clear...)

Something I haven't talked about is that for Capabilities to be useful as an FFI, and for bootstrapping, there needs to be a way to declare them statically. So maybe syntax like:

capability posts = Index Date BlogPost

-- posts : Capability (Index Date BlogPost)

At parse time, we create a GUID for the Capability, hash that plus the type, and use that as the identity. Thus you can rename the capability if you want later, it's just metadata. All references to the cap are via this identity, not by name. Thus there's also no "editing" of a Capability - posts always and forever is the same. Editing it just conjures up a fresh GUID, and you use the normal refactoring support to migrate a codebase transitively to a different Capability.

The only bit that troubles me is that state appears twice in quite different ways. Makes me wonder about replacing Index with a Persist a, where a is a pure unison value which might happen to be...

This is a great idea!!! Except, going a step further, you don't need to have a single Persist (let's say Durable) value that has an entire index... that is going to have bad memory usage. Your Durable usage could be more fine-grained, so you could implement all your indexing data structures in pure Unison as a regular library! (like think of a BTree - the root is a Durable (BTree a), where BTree a contains other Durable (BTree a) values....

I like anything that makes Unison dumber and moves more of the smarts into regular user code. This seems worth exploring.

Re: randomness or GUID generation, sure, that could be added. There will probably be a whole bunch of "boring" but practical functions which I didn't try to cover in this RFC.

note there is Remote.here : Remote Node which returns the current node, which can be considered a globally unique identifier.

Okay, I'm going to let this sit some more...

atacratic commented 7 years ago

Environment

I wonder if the stateful elements of this should be made into an explicit environment Env which can be manipulated. Here's how I reckon that might play out.

-- Env is the repository for injected capabilities and the 'state of the world'.
-- It's the thing that Capability.ask queries and that Durable/Index mutate.
Env.get : Remote Env          -- get the current ambient environment
Env.set : Env -> Remote Unit  -- set the ambient environment
-- (maybe there should be Clocks involved in these)

...with which we can implement the following function:

-- run computation c with environment e (leaving ambient environment unchanged on return)
Env.run : ∀ a . (e : Env) -> (c : Remote a) -> Remote a

This acts as our runState, letting us run computations 'purely' even when they use the stateful parts of Remote.

Effectful primitives are injected at runtime as capabilities into the initial ambient Env. But (existing) capabilities can also be added to the environment within unison using the following.

-- create an environment containing a single capability
Env.create : ∀ a . Heartbeat -> Capability a -> a -> Env   -- replaces Capability.provide
-- merge two Envs: union by GUID; on intersection choose upper rather than lower
Env.above : (lower : Env) -> (upper : Env) -> Env  -- imagine layers
-- create, get, set, above together allow editing the ambient environment.
-- They replace Capability.provide/Capability.update.  The 'mutable cell' aspect of
-- Capability moves into Env.  Capability API is cut down to just 'ask'.

The idea of Env.above is to allow progressive layered binding to resources - think docker and union file systems.

Benefits of this approach?

What do you think? :smiley:

Capability bootstrapping

At parse time, we create a GUID for the Capability

I think it has to be: at parse/edit time, we help the programmer find the GUID of the existing capability (various types of search including google?!), or we let them create a new GUID if no good match exists, and publish/expose that new GUID (together with a description) into the cloud code store in the same way as we publish terms by hash. (Possibly related to the 'hashing of data declarations' in https://github.com/unisonweb/unison/issues/106.)

Motivating case: "what's the Capability for the fast sparse matrix multiply function in v4.2 of this fortran binary? I need to use the same Capability when coding as my colleague does when provisioning the unison cluster with the FFI stuff."

Would it make sense to say: effectful primitives are injected...

I'm not sure about defining a syntax for declaring capabilities within the unison language. I see it as something that happens externally (an action performed in the editor.) Clearly you need to decide what name to bind them to in program text when you still have a 'parse from text' model, but should the program text really need to declare them? I guess this is a similar problem to imports.

Could the codebase editor tool create the capabilities? And the program text just declare an import in some way, if that's needed?

(Is it even necessary to allow creation of Capabilities within unison terms? Would Durable make that unnecessary?)

Durable

All the above section seems to apply to Durable as well. It's also an object with an identity and a location. What does it even mean to create one within a unison term? How do you bind to the one you created last time? So I'm thinking all Durables should come with a Capability.

I wonder if the contents of a Durable is even mutable or if that's another thing that mutates in the Env. Seems like that would be taking the functional/State/ST approach to its logical conclusion. In that case Durable might disappear and leave us with just:

-- signature is same as Env.create but with a Key
Env.create-durable : ∀ a . Heartbeat -> Key -> Capability a -> a -> Env   

and obviously some helper functions to make mutating that bit of the environment convenient.

I also toyed with the following before starting thinking about Env - just a port of Index.

durable : ∀ a . Heartbeat -> Key -> Remote (Durable a)
Durable.put : ∀ a . Clock -> a -> Durable a -> Remote Clock
Durable.get : ∀ a . Clock -> Durable a -> Remote a
woparry commented 7 years ago

All the above section seems to apply to Durable as well. It's also an object with an identity and a location.

If Durable is to be used for database-like storage, it should not have a single-node location -- the purpose of such a value is to exist past the demise of its creator. (Maybe it's only available at certain locations rather than everywhere, though). This is the main difference between Durable a and Capability a -- a Capability is only available as long as the underlying node is available.

If this is taken in the Env direction, the environment could include capabilities for reading and writing Durables. You might want multiple such capabilities, to represent different durable backing stores (otherwise how do you bootstrap these and migrate between different ones?).

How do you bind to the one you created last time.

This seems like a job for a Capability to store a (very small) piece of persistent mutable state which acts as a pointer to a Durable (with CAS style atomic update semantics?). This allows you to represent a the persistent distributed tree example with a simple pointer to the root node representing the "current" state.

Related to durables we might then have:

-- make a available past the demise of this node to anyone with the hash.
persist: Capability (a -> Durable a) 
-- the run-time representation of Durable is a  content-based hash of the value + the type.
retrieve: Capability (Durable a -> a) 

In particular, you might want to design a system where different nodes have these different capabilities (see eg. Datomic, a single-writer multiple-reader transactional system).

If Durables can be created with different capabilities, corresponding to different storage backends, does this imply that the identity of capability required to retrieve the value should be passed around with the durable so that any node receiving it can retrieve the underlying value?

atacratic commented 7 years ago

If Durable is to be used for database-like storage, it should not have a single-node location

Yes, that matches what Paul said in this comment. I'm imagining that the runtime system, possibly with help from the user, decide what attached storage to bring into the each new computation's Env. Maybe for example all the computations on all the nodes on a given machine have access to the durables on its local disk.

Aside: I'm thinking about 'initial computations' - things invoked externally / by the user - as being the things that have an initial Env assembled for them. I'm imagining a UI in the runtime where you actually select the computation to run, the node to kick it off on, and you get to tweak the Env it gets given. The ambient Env has to change on Remote.transfer (e.g. as the set of available FFIs changes) so maybe the ambient Env is under the covers derived from a pair (Env {- stuff from the node -}, Env {- stuff following this computation around -}) via Env.above.

This is the main difference between Durable a and Capability a -- a Capability is only available as long as the underlying node is available

My impression is that the Capability a value is a global thing which can exist for a long time across many nodes and encoded into many program terms; but on any given node it may or may not resolve to an actual Remote a (and what if anything it resolves to can even change over time on each node.)

I guess access to a Durable might be interrupted by, say, some storage server going down (or some cluster losing quorum might interrupt read and/or write access.) Do we want to hide the possibility of write failure on the API, forcing the implementation to cache writes that then might be lost? When do we want to promise that updates are committed?

the environment could include capabilities for reading and writing Durables. You might want multiple such capabilities, to represent different durable backing stores

Ah you're thinking that the storage location (and I guess other settings like level of replication) should be configurable from Unison. I guess that must be right - maybe with a very good default, like 'replicated in memory across the unison cloud, migrating to disk after periods of inactivity.'

I guess a 'capability for reading and writing Durables' is basically a storage provider, and there could be arbitrarily many implementations/configurations of such providers. And the available providers are themselves resolved through Capabilities, which should be available in the Env.

Other thoughts...

I'm still holding on to the idea that identification of Durables should be done through the Capability mechanism, in the interests of localizing the 'dynamic resource binding' concern in Capability. Specification of the storage provider is something that should optionally be done on creation of the Durable, and which should be subject to change/migration by a human without breaking assumptions in the code. But is it reasonable that the Capability lookup system should have to consult all attached datastores in order to find the one containing a given Durable? Do we give Capability.ask a hint about which storage it might like to consult, or is this a clue that the idea is broken? I guess this difficulty is a part of trying to decouple the identity of a Durable from its location.

OK maybe a cunning trick would be for each storage to export a bloom filter of the GUIDs it contains. Then it's not so bad to consult several, and if you still didn't like it you could do union on the filters (bitwise OR!).

On mutability: I don't think my previous idea works, of saying that Durables are each immutable and that mutation happens by swapping out Durables in the Env. That might be nice if all backing data stores were composed only of persistent data structures (i.e. they can cheaply give you copies of old versions of the data you're still holding references to) but this clearly isn't a reasonable requirement.

So how about the following proposal for a Durable API...

-- A 'Storage' represents some set of Durables (each with their own Capability)
-- which we are able and permitted to access.
type Storage

-- The ambient Env is guaranteed to contain a Capability (Storage), rendered as 'Storage.default'
-- (although maybe it's not guaranteed to provide actual durability, e.g. when 
-- running pure as a test)

-- A storage contains a set of Capabilities (that happen to all be of the form 
-- Capability (Durable _)) with the set supporting a kind of merge function... like an Env!  
-- So let's say that this set is actually accessible via an Env, even though it's not used 
-- directly as the environment for any computation.
Storage.env : Storage -> Remote Env

-- The net result is that if Capability.ask can resolve your Capability (Durable a) then it means
-- it's confirmed that it exists on at least one storage.  If it exists on multiple then it 
-- resolves to the one from the Storage most recently attached (hmm...)

-- Add the contents of a Storage into the environment.  Not actually a primitive.
Env.attach-storage : Storage -> Remote Unit
Env.attach-storage s = do Remote; Env.set $ Env.above Env.get (Storage.env s)
-- Note that since the contents of a Storage is constantly in flux (and viewed differently from
-- different nodes), so attaching a Storage is not a one-off change.

That last observation means that a given Remote Env can change over time. That's a much stronger statement than what I'd realised previously, namely that Env.get can return different values over time. It weakens the case for having Env at all, if it doesn't actually put us in functional paradise of our state being represented by a succession of pure values. An Env value only responds the same to Capability.ask over time to the extent that the attached storage does. But I think this is still a valuable guarantee in the case where the computation being run has exclusive access to the storage, which you would ensure was the case if you were running 'runState for Remote' and hoping to get repeatable answers.

So down to the actual Durable bit...

durable : ∀ a . Storage -> Heartbeat -> Key -> Remote (Durable a)
Durable.put : ∀ a . Clock -> a -> Durable a -> Remote (DurableResult Unit)
-- the Heartbeat argument to put/get/delete is 'how long to wait trying to commit 
-- this' - see DurableResult.
Durable.get : ∀ a . Clock -> Durable a -> Remote (DurableResult (Optional a))
-- Durable.get returns an Optional because I'm proposing a Durable.delete which might
-- happen after the Capability.ask resolves.
-- Delete the Durable now rather than waiting for its Heartbeat to expire.
Durable.delete : ∀ a . Clock -> Heartbeat -> Durable a -> Remote (DurableResult Unit)
-- how to find the Durable again in future:
Durable.capability : ∀ a . Durable a -> Capability (Durable a)

-- Storage might be down; cluster might lack quorum.  Reads and/or writes might be
-- blocked for a substantial time.  Unless we want to force any Storage impl to 
-- choose 'AP' out of CAP and require a merge function for partition recovery, 
-- but probably we don't?  So the Durable ops actually return a Remote (Remote _) 
-- to get you a clock by which time the update is visible (and durably committed?), 
-- together with whatever value you were hoping for.  The put/get/delete APIs all 
-- take a heartbeat controlling how long to wait.  If that expires then the callback is 
-- called with a None, although that doesn't actually guarantee the update is 
-- never committed (?)
-- (Not mad-keen on Remote (Remote _) - maybe take a continuation/callback 
-- in delete/put/get instead?)
type DurableResult a = Remote (Optional (Clock, a))

Issues:

pchiusano commented 7 years ago

So, looked through these comments. I'd like to take another pass at the document and think we should hold off on a more detailed discussion until I produce something.

But, in general, I prefer to just keep things simple. Here are a couple comments:

pchiusano commented 7 years ago

Produced new iteration of this, see #142.