cda-group / arc

Programming Language for Continuous Deep Analytics
https://cda-group.github.io/arc/
44 stars 6 forks source link

RFC: Finite State Machines #264

Closed segeljakt closed 1 year ago

segeljakt commented 3 years ago

After some discussion with Seif I think there is an opportunity to simplify tasks in arc-script. Tasks right now are a bit weird since you can define methods, enums, type aliases, imports, and extern functions in their bodies. I think it would be more concise if we simplified tasks into async functions. Like regular functions, a task's body would be nothing more than a list of statements which are optionally terminated with an expression. Tasks no longer contain members (items) as these could be defined as regular functions outside of the task. What we refer to as "state" will be nothing more than local variables which is stored inside a finite state machine (FSM).

If you currently want to express an Identity task, you would write it as follows:

task Identity(): ~i32 -> ~i32 {
    on event => emit event;
}

This assumes that the on statement is executed continuously without termination. The semantics become unclear if you start to nest the on statements:

task AddTwo(): ~i32 -> ~i32 {
    on a => on b => emit a + b;
}

I think it is more logical if on executes once, and is placed inside a loop such as:

task AddTwo(): ~i32 -> ~i32 {
    loop {
        on a => on b => emit a + b;
    }
}

The body of a task is compiled into a finite state machine (FSM). The FSM consists of two things: 1) a type which enumerates all possible states, and 2) a state transition function. A state contains all the variables which are in scope (or on stack) at the point before where you make a state transition:

enum State { S0, S1(i32) }
fun on_event(state: State, event: i32): State {
    match state {
        State::S0 => State::S1(event),
        State::S1(a) => { emit a + event; State::S0 }
    }
}

The challenge is how you compile control-flow constructs into FSM-code. We support the following constructs:

frej commented 3 years ago

Erlang doesn't have explicit looping constructs nor continue or break. Loops are written using tail-recursive functions. FSMs are either written by hand as a set of mutually tail-recursive functions or using one of the FSM behaviours in the stdlib.

Preemption is implemented using a reduction counter which is decremented for each call (in principle), when it hits zero, the process is scheduled out.

The only case where you will find a loop (at the BEAM level) which does not pass through call instructions is in the implementation of selective receives where a loop will traverse the message queue looking for a matching message. If no matching messages are found, the process will be scheduled out.

Due to the preemption the Erlang compiler does not need to transform user code to FSMs.

segeljakt commented 3 years ago

This sounds a lot like what Kompact is doing. Every Kompact component implements an execute function which lets the component handle up to max_events messages on its ports using a round-robin priority by default (skip is used to resume round-robin where it left off). &mut self can then selectively deque events from the component's ports.

impl ComponentDefinition for MyComponent {
    fn execute(&mut self, max_events: usize, skip: usize) -> ExecuteResult {
        // ...
    }
}

Is this similar to Erlang or am I "ute och cyklar"? πŸ˜…

Rust's async seems able to transform imperative code into FSM. I wonder how you handle continue and break πŸ€”

Do we need to transform the code into basic-blocks and then create a state for each block?

frej commented 3 years ago

There is an added complication here. Kompact, and in turn Arcon, expects an event handler to run to completion and not preempt (correct me if I'm wrong). That means that it is a "quick" event even if the handler contains loops and other iteration constructs. If you diverge from that model for the Arc-script event handlers, will that not preclude running Arc-script on Arcon?

Distinct from the issue of Arcon compatibility, if we go for a FSM-approach the brute-force way to do it, as you say, is to turn each basic block into a function which transforms a state. If our target language has something along the lines of call/cc that could be used to implement preemption and resumption with minimal transformations.

A call/cc-approach would also allow MLIR to do a good job during optimization as the preemption point could be treated as "just another function call". Transformating the code into basic blocks masquerading as functions will, in contrast, be very hard to optimize.

In conclusion, abandoning the model where an event-handler runs to completion, should not be done lightly as it will limit our choice of runtime.

segeljakt commented 3 years ago

Currently the event handler of Kompact can block until an async operation completes (In Rust, everything which is async gets compiled into an FSM)

However the event handler cannot use it to block until the next event arrives because the deque operation of ports is not yet async

For this reason I think we need to do the FSM transformation by ourselves. Once we have the FSM, which is just a state datatype and a state transition function, we could run it inside an Arcon event handler.

What did you mean by call/cc? If I understand you, there are different ways of implementing the state transition function? One way I see is to implement it as a function (State -> Event) -> State which has a match-expression (or switch) over the input State, and each case corresponds to a basic block in the code which occurs after a statement which blocks. If you have an on-statement, then the basic block could naively begin with a match over the input event type. There would be no gotos between blocks. Instead you just have each block return the next state.

segeljakt commented 3 years ago

Oh, is call/cc CPS? Edit: I see, it is a special kind of function in scheme http://www.madore.org/~david/computers/callcc.html

frej commented 3 years ago

A shorter description of call/cc without the philosophy

My reason for mentioning call/cc is that if we had it, it would be trivial to suspend execution at an arbitrary point, squirrel away the continuation somewhere, dispatch other events, and then resume execution from where we left off. All this in a way that would let the MLIR-optimizer treat it just as another function call.

Nested receives while running under Arcon could with, call/cc, be implemented by having an event handler per event type which dispatched to the right continuation (put there in the "squirrel away" stage above) or queued if there wasn't and enabled handler.

I had hoped that you or another Rustisist :) would say: "Oh we can implement that using async and futures..."

segeljakt commented 3 years ago

call/cc seems powerful enough to express any control-flow construct. I don't see an obvious way to implement it using Rust's async. If we were to implement it, then would it obsolete loops, ifs, exceptions, etc? From reading about it, it however appears like you need to have a garbage collector for the stack, or am I wrong? πŸ’­

One important thing about FSM which I forgot is we need to be able to persist the state occasionally in order to be able to roll-back. If we rely on Rust's async then I foresee it being more difficult to control how the state machine operates.

segeljakt commented 3 years ago

I discussed with the Rust community about how Rust handles async. If you have a function like this:

async fn foo(mut x: usize) -> usize {
    loop {
        x += bar(x).await;
        if x > 100 {
            break;
        }
    }
    x
}

It would be compiled into roughly the FSM:

enum State {
    S0 { x: usize },
    S1 { x: usize, bar_fut: impl Future<Output = usize> },
    S2
}
impl Future for State {
    type Output = usize;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        match self {
            State::S0 { x } => {
                let bar_fut = bar(x);
                *self = State::S1 { x, bar_fut };
            }
            State::S1 { x, bar_fut } => match bar_fut.poll(cx) {
                Poll::Pending => return Poll::Pending,
                Poll::Ready(bar_fut_return) => {
                    let x = x + bar_fut_return;
                    if x > 100 {
                        *self = State::S2;
                        return Poll::Ready(x);
                    } else {
                        *self = State::S0 { x };
                    }
                }
            }
            State::S2 => unreachable!("poll called after already ready"),
        };
        Poll::Pending
    }
}
frej commented 3 years ago

If we were to implement it, then would it obsolete loops, ifs, exceptions, etc?

Probably not, just because it can implement all constructs doesn't mean that it does so efficiently.

appears like you need to have a garbage collector for the stack, or am I wrong?

That seems very likely.

segeljakt commented 3 years ago

From some offline discussion, this is an algorithm Frej proposed (translated from Swedish):

To handle preemption:

Assuming we make the FSM translation from a SSA representation, where everything is unnested and a preemption-point is marked by a function call which returns the new event which is awaited, this should be a possible algorithm:

  1. Create an enum which represents the FSM's state, named S.
  2. Create a function which represents the FSM's state transition function, named f.
  3. Gather all variables which are live after each point of preemption and create a new variant (named S0, S1, ..., Sn) which stores them in S.
  4. Replace the function-call at the point of preemption with a construction of the new enum-variant and return it all the way up in the function call stack.
  5. Wrap the code after the point of preemption inside a function (named f0, f1, ..., fn), that takes the enum-variant and the newly arrived event (and substitutes the variables in the code for the parameters).
  6. Put a match expression in the body of f with cases that cover all states in Si and call the respective function fi.

To handle control-flow:

3.1. Gather all variables which are live before and after each loop statement (whose body contains blocking code) and correspondingly create two new variants, named S_entry and S_exit, which stores them in S. 4.1. Replace the loop with a construction of S_entry and pass it to f. 4.1.1. Replace any continue inside the loop block with a construction of S_entry and pass it to f. 4.1.2. Replace any break inside the loop block with a construction of S_exit and pass it to f. 4.1.3. Replace the last statement of the loop loop block with a construction of S_exit and pass it to f.

Example:

var x = 0;
loop {
    if x > 5 { break; };
    x += 1;
    on event => x += event;
}

Becomes:

enum S { S0 { x:i32 }, S1 { x:i32 }, S2 { x:i32 }, S3 }

use S::{S0, S1, S2, S3};

fn f(state: S, event: Option<i32>) -> S {
    match (state, event) {
        (S0 { mut x }, None)        => f0(x),
        (S1 { mut x }, None)        => f1(x),
        (S2 { mut x }, Some(event)) => f2(x, event),
        _                           => unreachable!(),
    }
}

fn f0(mut x: i32) -> S {
    return f(S1 { x }, None);
}

fn f1(mut x: i32) -> S {
    if x > 5 {
        return S3;
    }
    x += 1;
    return S2 { x };
}

fn f2(mut x: i32, event: i32) -> S {
    x += event;
    return f(S1 { x }, None);
}
frej commented 3 years ago

If we let f<n> return the next handler function, we will avoid the match in f.

segeljakt commented 3 years ago

Is this correct? One observation is this one needs to unwrap the state in each f<n> instead of in f:

struct Cont<State, Event> {
    state: State,
    next: fn(State, Option<Event>) -> Self,
}

type Event = i32;

enum State { S0 { x: i32 }, S1 { x: i32 }, S2 { x: i32 }, S3, }

use State::{S0, S1, S2, S3};

impl Cont<State, Event> {
    fn f0(state: State, _: Option<Event>) -> Self {
        if let State::S0 { x } = state {
            Self {
                state: S1 { x },
                next: Self::f1,
            }
        } else {
            unreachable!()
        }
    }

    fn f1(state: State, _: Option<Event>) -> Self {
        if let State::S1 { x } = state {
            if x > 5 {
                return Self {
                    state: S3,
                    next: Self::f3,
                };
            }
            Self {
                state: S2 { x },
                next: Self::f2,
            }
        } else {
            unreachable!()
        }
    }

    fn f2(state: State, event: Option<Event>) -> Self {
        if let (State::S2 { mut x }, Some(event)) = (state, event) {
            x += event;
            Self {
                state: S1 { x },
                next: Self::f1,
            }
        } else {
            unreachable!()
        }
    }

    fn f3(_: State, _: Option<Event>) -> Self {
        unreachable!()
    }
}
frej commented 3 years ago

Is this correct?

Yes, this is what I was proposing.

One observation is this one needs to unwrap the state in each f instead of in f

That's not optimal, but probably better than a full match. If this was Scheme I would just let the handler return a lambda taking the event as an argument and let the state be captured in the lambda. Can you do something like that in Rust?

segeljakt commented 3 years ago

It's possible if you box the closure. This might introduce more overhead when the state is small, (but could be beneficial maybe if the state is big? πŸ€” ). For the previous strategy, we could also use https://doc.rust-lang.org/std/hint/fn.unreachable_unchecked.html if we do not want any overhead. This is also something we could use in the unwrap!() macro if we trust that our codegen works.

struct Cont(Box<dyn Fn(Option<Event>) -> Self>);

type Event = i32;

struct S0 { x: i32 }
struct S1 { x: i32 }
struct S2 { x: i32 }
struct S3 { }

impl Cont {
    fn f0(S0 { x }: S0, _: Option<Event>) -> Self {
        Self(Box::new(move |event| Self::f1(S1 { x }, event)))
    }

    fn f1(S1 { x }: S1, _: Option<Event>) -> Self {
        if x > 5 {
            return Self(Box::new(move |event| Self::f3(S3, event)));
        }
        Self(Box::new(move |event| Self::f2(S2 { x }, event)))
    }

    fn f2(S2 { mut x }: S2, event: Option<Event>) -> Self {
        if let Some(event) = event {
            x += event;
            Self(Box::new(move |event| Self::f1(S1 { x }, event)))
        } else {
            unreachable!()
        }
    }

    fn f3(S3 { }: S3, _: Option<Event>) -> Self {
        unreachable!()
    }
}
segeljakt commented 3 years ago

I have faith in rustc doing the correct optimisations. Ferris will protect us πŸ¦€

segeljakt commented 3 years ago

One question still is how the infrastructure around this will work. I believe Arcorn needs will need to know the start- and end-state to know how to start and when to end the state machine. In the first version it is probably ok to assume the state machine cannot terminate. If the machine can terminate, then we should maybe add support for futures.

segeljakt commented 3 years ago

About the match vs. closure dilemma, I think we can postpone the decision on what the best solution should be for now and focus on getting things operational. If I manage to not screw up the implementation then it should be easy to switch from one to the other πŸ˜…

segeljakt commented 3 years ago

One thing which came to my mind, we only need to compile loops into FSM if they contain blocking code. For example in:

task Test(): ~i32 -> ~i32 {
    var x = 0;
    loop { if x < 4 { x += 1; } else { break; } };
    loop { if x < 8 { on a => x += a; } else { break; } };
}

The first loop can be left as it is.

segeljakt commented 3 years ago

Another observation, it could be helpful if we have so that the first state is empty. This means we can remove the on_start function.

frej commented 3 years ago

it could be helpful if we have so that the first state is empty.

Doesn't that just move the problem around? If you need initialization of the internal state you will need to arrange for a synthetic event that takes you from the uninitialized state to the first "real" state.

segeljakt commented 3 years ago

Hehe, true, hmm, the on_start function is currently input-less and only used for side-effects. I think we could setup the first real state in the task constructor. We probably need to setup the first real state in the on_start now that I think about it. The component is not "online" in the task constructor, so we cannot send events. Sending of events is non-blocking so that should not result in a point of preemption.

I wonder if states should contain the input parameters of the task. Alternatively maybe the input parameters could be stored outside of the states and be visible to them all.

frej commented 3 years ago

Is there something that stops us from initializing the FSM state in the task constructor?

segeljakt commented 3 years ago

If we for instance have this code:

task RollingReduce(init: i32): ~i32 -> ~i32 {
    var agg = init;
    emit agg;
    loop {
        on event => {
            agg += event;
            emit agg;
        }
    }
}

We will get four states (we can construct S0 in the task constructor).

struct RollingReduceTask {
    state: State,
}
enum State {
    S0 { init: i32 }, // State before executing the body of the task (contains only input parameters)
    S1 { init: i32, agg: i32 }, // Loop entry
    S2 { init: i32, agg: i32, event: i32 }, // After on event
    S3 { init: i32, agg: i32 }, // after loop
}

Another option is to stuff the parameters into the task itself:

struct RollingReduceTask {
    state: State,
    init: i32,
}
enum State {
    S0 {  }, // Empty starting state
    S1 { agg: i32 }, // State before entering loop
    S2 { agg: i32, event: i32 }, // State after receiving event
    S3 { agg: i32 }, // State after exiting loop
}

Even if S0 seems useless I think it is needed since it may not be possible to construct S1 in the task constructor. To construct S1 we need to execute an emit instruction. The emit instruction in turn depends on the component being fully initialised by the Kompact-system. Components are initialised after having executed the task constructor.

segeljakt commented 3 years ago

Another observation, we must insert points of preemption in more places than blocking code I think. Consider this:

task Billion(): () -> ~i32 {
    var x: i32 = 0;
    loop {
        if x < 1_000_000_000 {
            x += 1;
            emit x;
        } else {
            break;
        }
    }
}

If we don't preempt it's gonna lock down the Kompact-system. A solution is to preempt at the start of every iteration, independently of what is inside the loop. We could also unroll some iterations as an optimisation to make things run faster.

segeljakt commented 3 years ago

If we can make state transitions look like function calls then maybe MLIR could inline to effectively unroll for us.

frej commented 3 years ago

We could do like Erlang and have a reduction counter which only allows a fixed number of function calls before preemption. If we extend that to loops, we could explore have MLIR passes to analyze the boundaries of the loops and perhaps unroll and/or elide explicit counter operations.

segeljakt commented 3 years ago

Reduction counters will work very well with Kompact, maybe like this pseudocode πŸ€”

impl ComponentDefinition for MyComponent {
    fn execute(&mut self, max_events: usize, skip: usize) -> ExecuteResult {
        let mut counter = 0;
        while counter < max_events && self.can_make_transition() {
            self.make_transition();
            counter += 1;
        }
        return ExecuteResult::new(counter, skip);
    }
}

Does Erlang insert a preemption point for every function call?

frej commented 3 years ago

Reduction counters will work very well with Kompact, maybe like this pseudocode

Count down instead of up, a dec sets the zero flag if you're done, comparing against a limit requires an immediate or loading the constant in a register and then you will still have to update the counter. This is an optimization the compiler is not likely to do.

Does Erlang insert a preemption point for every function call?

The compiler assumes that preemption can happen when a function is called and while suspending to wait for a message to arrive, so that limits what it is allowed to do with respect to uninitialized terms on the heap etc at these points.

If you run the traditional BEAM interpreter, it maintains the reduction counter as part of the call instruction. If you use the jit, each function will start with:

# i_test_yield
L379:
    dec r14 ; The reduction counter has its own dedicated register
    short jg <function-body>
    lea rdx, qword ptr [L379]
    call <schedule_me>
<function-body>:
segeljakt commented 3 years ago

Count down instead of up, a dec sets the zero flag if you're done, comparing against a limit requires an immediate or loading the constant in a register and then you will still have to update the counter. This is an optimization the compiler is not likely to do.

Compiler hacks

segeljakt commented 3 years ago

I got some comments from Paris in an offline discussion:

An observation here is that FSM allows you to write operators as if they were functions. Tasks are like in Ray just async functions. It makes me wonder what could be done about actors. Actors are a kind of task which can be referenced. The main issue about actors is how you handle fault tolerance. After reading the Ray paper (and the Lineage Stash paper), it seems like Ray only has (or had) fault tolerance for dynamic tasks. A dynamic task is a task spawned by another task. Ray's model could be useful if we want to support this feature.

frej commented 3 years ago

[Adding a reference to @senorcarbone so he can follow the discussion]

Remember that the motivation for the whole FSM conversion was that we wanted to be able to write code like this (in Erlang syntax):

receive
  {something, X} ->
    ...,
    receive
      {something_else, Z} ->
         ....
    end
end

and run it on a runtime which, like Arcon, feeds you one event at a time. It then snowballed into thinking about allowing long-running computations to be preempted.

From Paris input it is clear that apparently most systems can live without that functionality and it is accepted as normal by the users. So the question reduces to: Is it worth the extra complications and is it a feature users don't know they are missing? Do we know of any applications where users roll their own FSM-conversions or is it a solution looking for a problem?

segeljakt commented 3 years ago

Paris wrote:

Expression wise fsm are nice to have in our system so please include max in this and try to implement this together if you think this is a feature we want in release To me the main cool addition compared to vanilla streaming is the ability to chain operations dependent on task state This makes it sooo much nicer to write all sorts of trigger functions and complex behavior

When programming in Flink you usually try to structure your programs around the pre-defined operators in the standard library. It is tough to roll your own operators since you need to basically implement the state machine by hand. Nested event handlers makes this easier. I think though the big win of having the FSM conversion is that users no longer have to think in terms of event-based operators. Instead they can think of their programs as sequentially executing functions.

frej commented 3 years ago

I think though the big win of having the FSM conversion is that users no longer have to think in terms of event-based operators. Instead they can think of their programs as sequentially executing functions.

That sounds like a good enough motivation for the FSMs.

segeljakt commented 3 years ago

From Seif's book (page 273), here is how you implement a "full adder" for adding three one-bit numbers x, y and z, giving a two-bit result c and s. For example, if x = 1, y = 1, z = 0, then x + y + z results in c = 1 and s = 0.

image

task And: ~i32 -> ~i32 {
    loop { on x => on y => emit x*y }
}

task Or: ~i32 -> ~i32 {
    loop { on x => on y => emit x+y - x*y }
}

task Nand: ~i32 -> ~i32 {
    loop { on x => on y => emit 1 - x*y }
}

task Nor: ~i32 -> ~i32 {
    loop { on x => on y => emit 1 - x-y + x*y }
}

task Xor: ~i32 -> ~i32 {
    loop { on x => on y => emit x+y - 2*x*y }
}

fun full_adder(x: ~i32, y: ~i32, z: ~i32): (~i32, ~i32) {
    val k = And(x, y);
    val l = And(y, z);
    val m = And(x, z);
    val c = Or(k, Or(l, m));
    val s = Xor(z, Xor(x, y));
    (c, s)
}

This shows how nested event handlers can be used for synchronisation between the two input channels. I will come up with more examples for showing off more data-science oriented use cases.

frej commented 3 years ago

What is the context in Seif's book (my copy is in Kista), is it a logic simulator or does it demonstrate streams?

For a logic simulator this isn't a very good implementation. With combinatorial logic like this you would expect, if you just look at the output of the and gate k, a sequence of input events: x=1,y=1,y=0,y=1,y=0 produce the output states ,1,0,1,0 but this implementation gives only ,1.

To do this right you will need a receive that can concurrently wait for events on multiple inputs.

segeljakt commented 3 years ago

What is the context in Seif's book (my copy is in Kista), is it a logic simulator or does it demonstrate streams?

This is the implementation, it is only to demonstrate streams, maybe not the most optimised

fun {GateMaker F}
    fun {$ Xs Ys}
        fun {GateLoop Xs Ys}
            case Xs#Ys of (X|Xr)#(Y|Yr) then
                {F X Y}|{GateLoop Xr Yr}
            end
        end
    in
        thread {GateLoop Xs Ys} end
    end
end

AndG  = {GateMaker fun {$ X Y} X*Y end}
OrG   = {GateMaker fun {$ X Y} X+Y-X*Y end}
NandG = {GateMaker fun {$ X Y} 1-X*Y end}
NorG  = {GateMaker fun {$ X Y} 1-X-Y+X*Y end}
XorG  = {GateMaker fun {$ X Y} X+Y-2*X*Y end}

proc {FullAdder X Y Z ?C ?S}
    K L M
in
    K = {AndG X Y}
    L = {AndG Y Z}
    M = {AndG X Z}
    C = {OrG K {OrG L M}}
    S = {XorG Z {XorG X Y}}
end
segeljakt commented 3 years ago

One thing Vlad clarified to me, there is a difference between preemptive and cooperative multitasking. I think we have cooperative multitasking.

"The term preemptive multitasking is used to distinguish a multitasking operating system, which permits preemption of tasks, from a cooperative multitasking system wherein processes or tasks must be explicitly programmed to yield when they do not need system resources." - https://en.wikipedia.org/wiki/Preemption_(computing)

senorcarbone commented 3 years ago

All this is cool and I like but it is a whole new direction that somehow needs to align with the rest of the project. I also like quantum computing but.. ☺️.

You have to assess the need for this with proper examples (not textbook expressions but modern ML pipelines people actually use). Then we have to see what needs to be done in the code generation and Arcon runtime together with Frej and Max. If the novelty and usefulness justifies it we go for it.

Some ideas : in streaming this type of expressiveness I believe would simplify: asynchronous calls to third systems, composition of custom stream sources, state-triggered expressions that simply invoke when a condition is matched. Mind that a state trigger can conpose output events which can in turn invoke state triggers of other operators. This is a good start to simplify the expression of ANY mechanism that uses punctuations I.e. watermarking, aggregation trees, special windows, regexp evaluation, progress tracking, bulk iterations and many more... Another thing that makes sense is to scope these states by key. E.g a state can be a countdown number that triggers an action when it is zero for a key.

Overall we can start by writing down examples of such scope and see how logic can be substituted.

segeljakt commented 3 years ago

You have to assess the need for this with proper examples (not textbook expressions but modern ML pipelines people actually use). Then we have to see what needs to be done in the code generation and Arcon runtime together with Frej and Max. If the novelty and usefulness justifies it we go for it.

I agree that we need more examples, this is something I will look into! On the other hand I think we now have a clear conceptual understanding now what this feature allows us to do. Simultaneously, it's important that we do not limit the language to our imagination.

I do not think we need to get Arcon or even Arc-MLIR involved since this is only a syntactic abstraction which gets desugared into what we had before. In addition I also think we must not mix the design of Arc-Script with the design of Arcon. There will inevitably be some impedance mismatch between the two but this problem should be solved by the compiler instead of adapting the language to the runtime or vice-versa.

Some ideas : in streaming this type of expressiveness I believe would simplify: asynchronous calls to third systems, composition of custom stream sources, state-triggered expressions that simply invoke when a condition is matched. Mind that a state trigger can conpose output events which can in turn invoke state triggers of other operators. This is a good start to simplify the expression of ANY mechanism that uses punctuations I.e. watermarking, aggregation trees, special windows, regexp evaluation, progress tracking, bulk iterations and many more... Another thing that makes sense is to scope these states by key. E.g a state can be a countdown number that triggers an action when it is zero for a key.

These are some good ideas. I think this issue has gotten a bit out of control so I will open new issues for these. The one I am most interested in is about state-triggered expressions.

segeljakt commented 3 years ago

I will set a reminder for myself to open the issues when you are back in office.

frej commented 3 years ago

This is the implementation, it is only to demonstrate streams, maybe not the most optimised ... fun {GateLoop Xs Ys} case Xs#Ys of (X|Xr)#(Y|Yr) then {F X Y}|{GateLoop Xr Yr} end end ...

So it assumes that you will post a new value for each input in each time step.

frej commented 3 years ago

One thing Vlad clarified to me, there is a difference between preemptive and cooperative multitasking. I think we have cooperative multitasking.

That's what we have, preemption (yield) is handled by code inserted by the compiler.

segeljakt commented 3 years ago

I thought preemptive multitasking was when the runtime can interrupt the task at any point

frej commented 3 years ago

I thought preemptive multitasking was when the runtime can interrupt the task at any point

Correct, we have cooperative multitasking, i.e. the runtime won't interrupt us. But from the point of view of the arc-script programmer (if we have the FSM transforms) it looks like we have preemptive multitasking, as the programmer will not have to insert yields. I use the term "preemption" (i.e. not the same as preemptive multitasking) for the mechanism that allows the user's program to suspend execution and restart from that point in the middle of what looks like an unbroken sequence of computations at the arc-script source code level.

segeljakt commented 3 years ago

it makes sense now πŸ‘

segeljakt commented 3 years ago

I started implementing FSM support now and realised something, the lowering algorithm must be extended to handle scopes. An example for illustration:

task X(): ~i32 -> ~i32 {
    val a: i32 = foo();
    if a > 0 {
        on event => emit event + a;
    } else {
        bar();
    }
    baz();
}

This creates four states:

enum State {
    S0({}),
    S1({a:i32}),
    S2({a:i32}),
    S3({}),
}

fun X0({}) {
    val a = foo();
    if a > 0 {
        return S1({a});
    } else {
        return S2({a});
    }
}

fun X1({a}, event) {
    emit event + a;
    return S3({});
}

fun X2({a}, event) {
    bar();
    return S3({});
}

fun X3({}, event) {
    baz();
}

A state transition is in needed to exit an asynchronous block. A block is asynchronous if it contains an event handler sub-expression.

Some additional notes:

segeljakt commented 3 years ago

I am trying to sketch the algorithm. Ideally it would be excellent if we could do the transformation during SSA conversion.

If we have a block:

val a = (on event x => x+1) + (on event y => y+1);
return a * 2;

The idea is to process expressions per statement bottom up. If an expression is a non-blocking operation we convert it into SSA form. If it is a blocking operation we create an SSA operation in the function of the current state which returns the next state (a data structure which captures all variables in scope) and a function for the next state. Any SSAs created after this point will be placed in the function of the next state (until we create a function for the next-next state). Return is replaced with a transition to a special state SR.

fun X0() {
    val a = (on event x => x+1) + (on event y => y+1);
    return a * 2;
}

------

fun X0() {
    return S1{};
}

fun X1(x) {
    val a = x+1 + (on event y => y+1);
    return a * 2;
}

------

fun X0() {
    return S1{};
}

fun X1(x) {
    val x0 = x+1;
    val a = x0 + (on event y => y+1);
    return a * 2;
}

------

fun X0() {
    return S1{};
}

fun X1(x) {
    val x0 = x+1;
    return S2{x0};
}

fun X2({x0}, y) {
    val a = x0 + y+1;
    return a * 2;
}

------

fun X0() {
    return S1{};
}

fun X1(x) {
    val x0 = x+1;
    return S2{x0};
}

fun X2({x0}, y) {
    let x1 = y+1;
    val a = x0 + x1;
    return a * 2;
}

------

fun X0() {
    return S1{};
}

fun X1(x) {
    val x0 = x+1;
    return S2{x0};
}

fun X2({x0}, y) {
    let x1 = y+1;
    val x2 = x0 + x1;
    return x2 * 2;
}

------

fun X0() {
    return S1{};
}

fun X1(x) {
    val x0 = x+1;
    return S2{x0};
}

fun X2({x0}, y) {
    let x1 = y+1;
    val x2 = x0 + x1;
    val x3 = x2 * 2;
    return x3;
}

------

fun X0() {
    return S1{};
}

fun X1(x) {
    val x0 = x+1;
    return S2{x0};
}

fun X2({x0}, y) {
    let x1 = y+1;
    val x2 = x0 + x1;
    val x3 = x2 * 2;
    return SR{x3};
}
segeljakt commented 3 years ago

Here is step-by-step for how to handle if-else. This is a lot trickier to implement. Before we had to just downwards propagate information about which bound variables are in scope. We now need to also upwards propagate information about whether a branch is an asynchronous block. Maybe a solution could be to have two kinds of scopes.

  1. An asynchronous scope which
    1. Is pushed whenever we encounter a blocking operation.
    2. Becomes a function representing a state and an SSA operation calling that function when popped. The function
      1. Takes a parameter for the bound variables.
      2. Takes an (optional) parameter for an input event.
      3. Is lazily constructed since we do not know its exit state prior to constructing it.
      4. Contains a block of the SSA variables within the scope.
  2. A synchronous scope which
    1. Is pushed whenever we enter a block.
    2. Is popped whenever we exit a block. We also pop any asynchronous scopes which may be on top of this scope.
    3. Becomes a regular block of SSA variables when popped.
val a = if true {
    on event x => x+1
} else {
    1
};
return a * 2;

------

fun X0() {
    val x0 = true;
    val a = if x0 {
        on event x => x+1
    } else {
        1
    };
    return a * 2;
}

------

fun X0() {
    val x0 = true;
    val a = if x0 {
        return S1();
    } else {
        1
    };
    return a * 2;
}

fun X1(x) {
    x+1
}

------

fun X0() {
    val x0 = true;
    val a = if x0 {
        return S1();
    } else {
        1
    };
    return a * 2;
}

fun X1(x) {
    val x1 = x+1;
    x1
}

------

fun X0() {
    val x0 = true;
    val a = if x0 {
        return S1();
    } else {
        1
    };
    return a * 2;
}

fun X1(x) {
    val x1 = x+1;
    return S2(x1)
}

------

fun X0() {
    val x0 = true;
    val a = if x0 {
        return S1();
    } else {
        val x2 = 1;
        return S2(x2);
    };
    return a * 2;
}

fun X1(x) {
    val x1 = x+1;
    return S2(x1)
}

------

fun X0() {
    val x0 = true;
    if x0 {
        return S1();
    } else {
        val x2 = 1;
        return S2(x2);
    }
}

fun X1(x) {
    val x1 = x+1;
    return S2(x1)
}

fun X2(a) {
    return a * 2;
}

------

fun X0() {
    val x0 = true;
    if x0 {
        return S1();
    } else {
        val x2 = 1;
        return S2(x2);
    }
}

fun X1(x) {
    val x1 = x+1;
    return S2(x1)
}

fun X2(a) {
    let x3 = a * 2;
    return SR(x3);
}
segeljakt commented 3 years ago

Me & Frej decided to move FSM compilation from Arc-Script to MLIR. The reason is that optimisations like operator fusion are a lot easier to apply on the high-level syntax, before compiling to FSM. I will try to outline pseudocode for the first-iteration of the algorithm for compiling to FSM below with a simplified grammar for a language in SSA form.

Program representation

type program = item list
and item =
  | ITask of path * param list * interface * interface * block
  | IEnum of path * (name * ty) list
and param = name * ty
and block = ssa list * var
and interface = ty
and ssa = var * ty * expr
and ty =
  | TRecord of (name * ty) list
  | TNominal of path
and var = name
and name = string
and path = name list
and expr =
  | EIf of var * block * block
  | ELoop of block
  | EReceive
  | EEmit of var
  | EEnwrap of path * var
  | EUnwrap of path * var
  | ERecord of (name * var) list
  | EAccess of name * var
  | EBreak of var
  | EContinue of var

Blocking variables

First, we need to calculate the SSA-variables in each task which contain blocking code. We refer to these as "blocking variables" or bvs. For each blocking variable we must later create one or two FSM transitions.

The calculation is divided into a bottom-up and top-down pass. In the bottom-up pass, an SSA is blocking if it is...

and bvs_block bvs b = b |> fst |> foldl (fun (is_async, bvs) -> let (is_async', bvs) = bvs_ssa in (is_async' || is_async, bvs) ) (false, bvs)

and bvsssa bvs (v, t, e) = match e with | EIf (, b0, b1) -> let (is_async0, bvs) = bvs_block bvs b0 in let (is_async1, bvs) = bvs_block bvs b1 in if is_async0 || is_async1 then (true, v::bvs) else (false, bvs) | ELoop b -> let (is_async, bvs) = bvs_block bvs b in if isasync then (true, v::bvs) else (false, bvs) | EReceive -> (true, v::bvs) | -> (false, bvs)

In the top-down pass, an SSA is async if it is a continue/break operation used within an async SSA operation.
```ocaml
let rec top_down_bvs (b:block) (bvs:var list) : (var list) =
   bvs_block false bvs b |> snd

and bvs_block is_async bvs b =
  b |> fst |> foldl (bvs_ssa is_async) bvs

and bvs_ssa is_async bvs (v, t, e) =
  match e with
  | EIf (_, b0, b1) ->
    let is_async = mem bvs v in
    let bvs = bvs_block is_async bvs b0 in
    let bvs = bvs_block is_async bvs b1 in
    bvs
  | ELoop b ->
    let is_async = mem bvs v in
    let bvs = bvs_block is_async bvs b in
    bvs
  | EBreak | EContinue when is_async -> v::bvs
  | _ -> bvs

At this point we can compute a list var list of all SSAs in a block which should be compiled to FSM.

let bvs_block b = bottom_up_bvs b |> top_down_bvs b

Program transformation

We implement the compilation to FSM by folding the AST. For each task, we construct an enum where each variant represents a possible state that the FSM of the task could be in. A state contains the variables which are live at a point of execution. There are a couple different kinds of states:

The idea behind eventless-intermediate states is that if we block inside a code-block, the block must be compiled into two functions. One function which contains the code before blocking, and one containing the code after blocking. It must in addition both be possible to transition into and out of these code-blocks. The same goes for any code-blocks which contains the blocking code-block.

There are a couple of cases where we need to store the execution state needed to make a transition:

module Ctx = struct
  type ctx = {
      bvs: name list;
      input_event: var * ty;
      state_stack: state list; (* A stack of states corresponding to blocks in the AST. *)
      scope_stack: scope list;
  }
  and scope = {
      ssas: ssa list;
      vars: (var * ty) list; (* Variables in scope, including task-parameters *)
  }
  (* State constructors *)
  and state =
    (* Block result of if/on *)
    | SIf of {
      create_result_func: block -> ctx -> ctx;
      create_result_variant: var -> ctx -> var * ctx;
    }
    | SReceive of {
      create_receive_func: block -> ctx -> ctx;
      create_receive_variant: ctx -> var * ctx;
    }
    (* Task return *)
    | STask of {
      create_task_func: block -> ctx -> ctx;
      create_return_variant: var -> ctx -> var * ctx;
    }
    (* Loop break and continue *)
    | SLoop of {
      create_continue_func: block -> ctx -> ctx;
      create_break_func: block -> ctx -> ctx;
      create_continue_variant: ctx -> var * ctx;
      create_break_variant: var -> ctx -> var * ctx;
    }

  let make bvs hir =
    { bvs; hir; state_stack = []; scope_stack = []; }

  let bind_param (v, t) ctx =
    bind_var ctx v t

  let bind_var v t ctx =
    match ctx.scope with
    | hd::tl -> { ctx with scope_stack = { hd with vars = (v, t)::hd.vars }::tl }
    | [] -> unreachable ()

  (* Returns the type of a variable by traversing the scope stack *)
  let typeof v ctx = ctx.scope_stack |> find_map (assoc_opt v) |> get

end

let fsm_program hir =
  hir |> foldl fsm_item []

and fsm_item hir' i =
  match i with
  | IEnum _ -> i::hir'
  | ITask (x, ps, t0, t1, b) ->
    let bvs = bvs_block b in
    let ctx = Ctx.make bvs in
    let ctx = ctx |> Ctx.push_scope in
    let ctx = ctx |> mapm Ctx.add_param ps in
    let ctx = ctx |> push_task_state in
    let ctx = fsm_block b ctx in
    let ctx = ctx |> Ctx.pop_scope in
    let i = IFsm (x, t0, t1, funs, enum) in
    i::hir'

and scoped f b ctx =
  let ctx = ctx |> Ctx.push_scope in
  let (v, ctx) = f b ctx in
  let (ssas, ctx) = ctx |> Ctx.pop_scope in
  ((ssas, v), ctx)

and fsm_block (ssas, v_res) ctx =
  match ssas with
  | [] -> (v_res, ctx)
  | (v_lhs, e_rhs)::ssas ->
    match e_rhs with
    | EIf (v0, b0, b1) when mem ctx.bvs v_lhs ->
        let ctx = ctx |> Ctx.push_if_state v_lhs in

        let (b0, ctx) = scoped fsm_block b0 ctx in
        let (b1, ctx) = scoped fsm_block b1 ctx in
        let ctx = ctx |> Ctx.add_ssa (v_lhs, Hir.EIf (v0, b0, b1)) in
        let (b, ctx) = scoped fsm_block (ssas, v_res) ctx in

        let ctx = ctx |> Ctx.pop_if_state b in
        (v_res, ctx)
    | EIf (v0, b0, b1) ->
        let (b0, ctx) = scoped fsm_block b0 ctx in
        let (b1, ctx) = scoped fsm_block b1 ctx in
        let ctx = ctx |> Ctx.add_ssa (v_lhs, Hir.EIf (v0, b0, b1)) in
        fsm_block (ssas, v_res) ctx
    | ELoop b when ctx |> Ctx.is_async v_lhs ->
        let ctx = ctx |> Ctx.push_loop_state v_lhs in

        (* Transition into continue state *)
        let (v_continue, ctx) = (ctx |> Ctx.resume_continue) ctx in

        (* Compile continue *)
        let (b_continue, ctx) = scoped fsm_block b ctx in

        (* Compile break *)
        let (b_break, ctx) =
          let ctx = ctx |> Ctx.push_block in
          let (v, ctx) = fsm_block (ssas, v_res) ctx in
          let (v, ctx) = (ctx |> Ctx.resume_outer_result) v ctx in
          let (ssas, ctx) = ctx |> Ctx.pop_block in
          ((ssas, v), ctx)
        in

        let ctx = ctx |> Ctx.pop_loop_state b_continue b_break in
        (v_continue, ctx)
    | ELoop b ->
        let ctx = ctx |> Ctx.add_ssa (v_lhs, Hir.ELoop b) in
        let (v_res, ctx) = fsm_block (ssas, v_res) ctx in
        (v_res, ctx)
    | EReceive ->
        let ctx = ctx |> Ctx.push_receive_state v0 in

        let (v_return, ctx) = (ctx |> Ctx.resume_receive) ctx in

        let ctx = ctx |> Ctx.push_scope in
        let (v, ctx) = fsm_block (ssas, v_res) ctx in
        let (v, ctx) = (ctx |> Ctx.resume_result) v ctx in
        let (ssas, ctx) = ctx |> Ctx.pop_scope in

        let b1 = (ssas, v) in
        let ctx = ctx |> Ctx.pop_receive_state b1 in

        (v_return, ctx)
    | Hir.EBreak _ when mem ctx.bvs v_lhs ->
        ctx |> Ctx.resume_break ctx
    | Hir.EContinue when mem ctx.bvs v_lhs ->
        ctx |> Ctx.resume_continue ctx
    | _ ->
        let ctx = ctx |> Ctx.add_ssa (v_lhs, e_rhs) in
        let ctx = fsm_block (ssas, v_lhs) ctx in
        ctx

It's not 100% complete yet. Some helper functions are missing. Tell me if anything is unclear / you want me to explain something.

The gist of the code above is that we recursively fold the block of each task and simultaneously maintain two stacks. One stack which stores scopes and one for storing states.

If we for example encounter a blocking loop expression, we push a state onto the state stack which contains 4 things:

SLoop of {
      create_continue_func: block -> ctx -> ctx;
      create_break_func: block -> ctx -> ctx;
      create_continue_variant: ctx -> var * ctx;
      create_break_variant: var -> ctx -> var * ctx;
    }

When we later pop the loop state, we will use the first two fields to construct two states in the state machine (code after continue and break). The two latter fields become relevant when we fold the loop body. If we for example encounter a blocking continue (EContinue) expression, we go through the state stack, find the topmost loop state, and call the create_continue_variant: ctx -> var * ctx. When we call this function, it constructs a return (EReturn) expression which returns a variant of the FSM-enum. The variant's tag indicates that the next state should be the function which we will later construct with create_continue_func: block -> ctx -> ctx. The variant will in this case store all variables which are live at the point of entering the loop.