Closed leonoel closed 1 year ago
m/replay
different from m/stream
and m/signal
How is m/replay different from m/stream and m/signal
signal
is continuous, memoizes the latest value and discards previous ones.
stream
is discrete, memoizes each event for a single propagation frame and discards the past
replay
is discrete, memoizes all events from start and replays the entire past to new subscribers
Can you provide a continuous time example under the proposed syntax?
The example from readme could look like this :
(def !input (atom 1))
(def main
(let [<x (m/signal (m/watch !input))
<y (m/signal (m/latest + <x <x))]
(m/reduce (fn [_ y] (println y)) nil <y)))
Thanks, this will be a very useful addition. I've been using a function similar to memo
in my application, and it is very useful to ensure a task only gets run once.
One question:
(def res (memo
(m/sp
(m/? (m/sleep 1000))
(throw (ex-info "ERROR" {})))))
(m/? res) ;; wait 1s, throw exception
(m/? res) ;; (1) ??
What happens at (1) - does it wait for 1s again and then throw the exception, or throw the exception instantly?
What happens at (1) - does it wait for 1s again and then throw the exception, or throw the exception instantly?
At first glance it would make sense to me that any crashed publisher memoizes the error and rethrows it immediately to all current and future subscriptions.
Released in b.31
The problem
Purely functional composition of effects is an effective solution to the supervision problem. Constraining effects to be represented as pure values provides information about the structure of the program, each concurrent process has a well-defined parent which naturally enforces a strict supervision hierarchy at runtime. However, this model is unable to express the idea of sharing, i.e. when a process must dispatch a value to multiple consumers.
The problem of sharing is currently solved by
reactor
and its companionssignal!
andstream!
, respectively for discrete and continuous flows. The current implementation successfully proved that functional effect systems can be used as a foundation for transactional propagation of events, even with arbitrarily dynamic graph topologies. However, it falls short on supervision : all publishers are supervised by their reactor, no matter which process subscribes to them. There is no hierarchy of publishers, so if any publisher crashes the whole reactor crashes. If the reactor is cancelled, all of its publishers are cancelled. Any lifecycle management more complex than that must be implemented manually, defensively and imperative-style.The goal of this report is to discuss an alternative model reconciling the benefits of both worlds - ability to express sharing and correct supervision by default. The new approach should be general enough to match current semantics for discrete and continuous flows (resp. streams and signals), as well as potential new ones. The essence of the problem is runtime supervision structure : if an effect has an identity that can be shared, then its process may have more than one supervisor, which implies the supervision tree becomes a supervision DAG.
A concrete example
We want to implement a simple static DAG. Each node computes a single value from some other nodes, with an additional delay.
An acceptable solution must :
:b
only once, even though the result is used twice.First, we can represent a single node with a task built as the sequential composition of the parallel join of its dependencies, followed by a sleep.
We can then implement a naive, purely-functional solution :
This task satisfies requirements 1 and 3, but not 2. The node
:b
is a pure value with no identity, therefore it is run twice instead of being shared and reused. To represent sharing, we have to break referential transparency and implement memoized publishing for node:b
.Futures are the commonly accepted solution to this problem nowadays. In modern functional effect systems, they're generally called fibers but share the same basic properties - an identifier for a running process that can be cancelled, and whose result can be awaited.
We can now tweak our first attempt :
The task now satisfies requirements 1 and 2, but 3 doesn't hold anymore. When the main process is cancelled, the cancellation is propagated to the awaiting of node
:b
, but not to its underlying process. The parent process can terminate while a child is still running, which results in a waste of resources at best, a memory leak at worst. In other words, futures break supervision.A correct solution could be something like this :
Which is arguably more verbose and error-prone.
Lazy publishers
A lazy publisher is a stateful object associated with an effect (task or flow) and exposing a memoized view of this effect. Running a lazy publisher as an effect registers a new subscription to this view, cancelling a subscription deregisters it.
Unlike eager publishers (e.g. futures, but also
signal!
andstream!
in their current implementation), instanciating a lazy publisher is not an effectful operation. The lifecycle of a lazy publisher is driven by its subscribers, not by its creator. Therefore, creating a lazy publisher doesn't require any special context, and disposal can be safely delegated to the GC.The effect of a lazy publisher runs as long as there is at least one active subscription. Its values are memoized and dispatched to all subscriptions. The effect process is started when a new subscription is registered and there was no subscription registered before. When an active subscription is deregistered and there weren't any other active subscription, the current effect process is cancelled and the subscription termination is bound to it. Otherwise, the subscription process fails immediately. When the effect process terminates spontaneously (i.e. without cancellation), all active subscriptions and all subsequent ones terminate immediately.
Assuming the existence of
memo
, a lazy publisher caching the result of a task, we get an elegant solution to the previous problem :Prior art
clojure.core/delay
and core lazy sequences have lazy publisher semantics, but they're not a good fit for effects due to their inconsistent behavior in face of failure and cancellation, also their fundamentally synchronous nature mandates host support for thread blocking which is not an option in clojurescript.clojure.core/delay
and lazy sequences respectively, but do not allow to cancel the underlying process when the result is not required anymore.stream!
, and cancels the underlying process on last unsubscription. However, the dispatch mechanism is not transactional, so values may be lost in case of consecutive subscriptions.Proposed changes
Purely functional operators work just fine, no major change is required.
The transactional propagation engine currently implemented in
reactor
must be unified with a lazy publisher approach where the lifecycle of effects is driven by subscriptions.memo
.replay
.stream!
andsignal!
are renamed tostream
andsignal
respectively, they now return lazy publishers.reactor
is removed, as instanciation of signals and streams doesn't require a context anymore.