cda-group / arc

Programming Language for Continuous Deep Analytics
https://cda-group.github.io/arc/
44 stars 6 forks source link

First Class Streams #265

Closed segeljakt closed 1 year ago

segeljakt commented 3 years ago

Streams in Arc-Script are currently second class since they cannot be passed around as values inside of tasks.

task Identity(): ~i32 -> ~i32 {
    on event => emit event;
}

instead of something like:

task Identity(in: ~i32, out: ~i32)  {
    on event from in => emit event into out;
}

Having first-class streams would increase the language's expressiveness (which could be a double-edged sword), by allowing for example the definition of loops in the dataflow:

task Identity(stream: ~i32)  {
    on event from stream => emit event into stream;
}

A solution is to introduce the concept of capabilities that restrict streams into being read-only and write-only.

Similar to this, Go supports first-class channels with capabilities. Channels support two operations:

import "fmt"

func main() {

messages := make(chan string)

go func() { messages <- "ping" }()

msg := <-messages
fmt.Println(msg)

}


I will fill in more information below. I think a problem here is going to be that first-class streams impose requirements on the runtime which we might not support in our current model.
frej commented 3 years ago

If we plan a transform-to-FSM-approach in order to make arc-script integrate with Arcon's event handling, will not first class streams greatly complicate the conversion to FSMs? Instead of just the top-level task, now any function, arbitrarily deep in the CFG can now need conversion.

segeljakt commented 3 years ago

You're correct. It is maybe possible to restrict it so it's only possible to block inside of tasks. Blocking inside of a function would be a compilation error. In the FSM you also need to store the stream as part of the state.

senorcarbone commented 3 years ago

I thought we decided against passing streams around in the last dev meeting. What is the main need apart from support for expressing loops? Can you provide some motivation on why you want this level of decomposition for something that has only static meaning? If the purpose is loops then we could instead focus on supporting loops right away.

segeljakt commented 3 years ago

I put it here mainly to document the design decision. I'll add the pros and cons from the dev meeting! I agree the feature is rejected for now, but I want to keep it in the back of my head just in case a convincing argument appears for adding them. First class streams are a tempting thing to add to the language because it adds a lot of expressiveness. Though it would likely make the language "too expressive" and make it possible to express programs which break correctness. Another mechanism might be needed to restrict the expressive power a bit. I will investigate what applications there are in Go which use this feature.

segeljakt commented 3 years ago

I think you hit the nail on the head. The main problem here is to decide how to support cycles in the dataflow graph. In my view there are a couple of solutions:

Solution 1: Builtin iterate operator

This approach represents cycles as a builtin higher-order operator named iterate. This operator is a function with a signature of:

iterate: A -> ((A, B) -> (B, C)) -> C

In other words, you give it a stream A and an iteration function and get a stream C back. The iteration function takes two streams of type A and B and returns two streams of type B and C. The system will apply A to the iteration function to get C and also create an implicit feedback-channel for B. In other words, the output B stream of the iteration function is fed as input to the iteration function. For example, here is an "identity loop" where events loop once through the function:

fun loop_once(a: ~i32): ~i32 {
    val c: ~i32 = iterate(a, fun(a: ~i32, b: ~i32): (a, b));
    c
}

Pros:

Cons:

Solution 2: Channels

This approach introduces channels which are a lower level construct that can be used to build what we currently have in arc-script (including loops).

--- High-level --- Operators, Pipelines, Queries, ... ---
--- Mid-level --- Tasks, Streams, Loops, ... ---
--- Low-level --- Channels, Asynchronous/Synchronous functions, ... ---

A channel supports two operations, push which pushes an event of type T into a channel, and pull which blocks until an event arrives on a channel (and returns that event).

push: T -> Channel[T] -> unit
pull: Channel[T] -> T

Pros:

Cons:

Solution 3: Channels and Streams

(TODO)

Solution 4: Single Assignment Streams

(TODO)