cda-group / arc

Programming Language for Continuous Deep Analytics
https://cda-group.github.io/arc/
44 stars 6 forks source link

Use case: Analysing Sensor Temperatures #308

Closed segeljakt closed 1 year ago

segeljakt commented 3 years ago

This issue is about re-implementing an application written in Flink's Scala API using Arc-Script. The application below is from Paris Carbone's Dissertation p.16:

Scala code

case class SensorEvent(sensorID: Long, temperature: Double);
case class TemperatureWarning(sensorID: Long, temperature: Double);

class AverageTemperature extends ProcessWindowFunction[SensorEvent, SensorEvent, Long, TimeWindow] {
    def process(key: Long, context: Context, input: Iterable[SensorEvent], out: Collector[SensorEvent]): () = {
        var sum = 0.0;
        var count = 0L;
        for (evt <- input) {
            sum = sum + evt.temperature;
            count = count + 1;
        }
        out.collect(SensorEvent(key, sum / count))
    }
}

// STREAM SOURCE
val sensors: DataStream[SensorEvent] = env.addSource(KafkaConsumer("sensorTemperatures"));
//------------------------------------------------------------
// STREAM TRANSFORMATIONS
val filteredEvents: DataStream[SensorEvent] =
    sensors
        // Distribute Computation by SensorID
        .keyBy { _.sensorID }
        // Ignore the first 5 temperatures per sensor
        .filterWithState((evt: SensorEvent, count: Option[Int]) =>
            count match {
                case Some(c) => (c > 5, Some(c+1))
                case None => (true, Some(1))
            });
val avgTempStream: DataStream[SensorEvent] =
    filteredEvents
        // Compute 1h rolling average of temperature/sensor every 8min 
        .timeWindow(Time.hours(1), Time.minutes(8))
        .aggregate(AverageTemperature());
    // Output warnings for average temperatures above 40 degrees
val warnings: DataStream[TemperatureWarning] =
    avgTempStream
        .flatMap((evt, collector) =>
            if(evt.temperature > 40)
                collector.collect(TemperatureWarning(evt.sensorID, evt.temperature)));
//------------------------------------------------------------
// STREAM SINK
warnings.addSink(KafkaProducer("tempWarnings"));
//EXECUTION
env.execute()

Transformations

The stream transformation section requires the following operators:

Requirements

With respect to Arc-Script

The application has the following requirements.

  1. ✅ Must be able to partition events by key.
  2. ✅ Must be able to keep state inside transformations.
  3. ❌ Must be able to define time-based sliding windows and aggregations on them.
    • ❌ 3.1. Must be able to support lazy iterators over events in a holistic window.
    • ❌ 3.2. Must be able to iterate over iterators (foreach/loop etc.).
  4. ❌ Must be able to read from sources and write into sinks.
    • Note: Can be handled outside of the language.
  5. ❌ Must be able to have generic data types (e.g., Option[T]).
  6. ✅ Must be able to represent durations (e.g., Time.hours(5)).
  7. ✅ Must be able to represent custom data types (e.g., SensorEvent).
  8. 💭 Must be able to pattern match on sum types (e.g., match match { ... }).
    • Note: You can pattern match but only using if-else expressions.
  9. ❌ Must be able to represent other types of streams (e.g., KeyedStream, WindowedStream)
  10. ✅ Must be able to represent variables, integers, floats, booleans, sums, records, tuples, if-else expressions, comments.
  11. ❌ Must be able to emit inside of closures.

Arc-Script (Application)

As it stands the application cannot be implemented in Arc-Script. This section will therefore have some pseudo code to represent what the application could look like if it was possible to implement it.

type SensorEvent = { sensor_id: i64, temperature: f64 };
type TemperatureWarning = { sensor_id: i64, temperature: f64 };

fun application(sensors: ~SensorEvent): ~TemperatureWarning {
    val filtered_events: ~SensorEvent =
        sensors
            | KeyBy(|event| event.id)
            | task(): 
            | FilterWithState((event: SensorEvent, count: Option[Int]) {
                match count {
                    Some(c) => (c > 5, Some(c+1)),
                    None => (true, Some(1)),
                }
            });
    val avg_temp_stream: ~SensorEvent =
        filtered_events
            | TimeWindow(1h, 1min, |key, events| {
                var sum = 0.0;
                var count = 0;
                for event in events {
                    sum += event.temperature;
                    count += 1;
                }
                emit { sensor_id: key, temperature: sum / count };
            })
    val warnings = ~TemperatureWarning =
        avg_temp_stream
            | FlatMap(|event| if event.temperature > 40 { emit event });
    warnings
}

After having sketched this, there are a couple things I noticed:

Arc-Script (Operators)

The operators of arc-script can be sketched as follows:

task KeyBy[T, K](extractor: fun(T): K): ~T -> ~T {
    loop { on event => by extractor(event) emit event }
}

# fun fib(i32):i32;

# task KeyBy[K,V](fun(V):K): ~{key:K, val:V} -> ~{key:K, val:V};

# task KeyBy(extractor) {
#     loop {
#         on event => emit event with key = extractor(event.val)
#     }
# }

task Map(f) {
    loop {
        on event => emit event with val = f(event.val)
    }
}

task FilterWithState[T, S](filter: fun(T, S): (bool, S)): ~T -> ~T {
    var state = None;
    loop {
        on event => {
            val (cond, new_state) = filter(event, state);
            new_state = state;
            if cond { emit event }
        }
    }
}

task TimeWindow[T](width: duration, stride: duration, aggregator: fun(iter[T])): ~T -> ~T {
    # TODO
}

task FlatMap[T](collector_fun: fun(T)): ~T -> ~T {
    loop { on event => collector_fun(event) }
}