cda-group / arc

Programming Language for Continuous Deep Analytics
https://cda-group.github.io/arc/
44 stars 6 forks source link

Use case: Connected Components #310

Closed segeljakt closed 1 year ago

segeljakt commented 3 years ago

This issue is about re-implementing an advanced application written in Flink's Scala API using Arc-Script. The application below is from Paris Carbone's Dissertation p.158:

case class GraphEdge(from: Long, to: Long)
case class VComponent(id: Long, component: Long)

val input: DataStream[GraphEdge] = getEdgeStream()

input
    .flatMap(e => List(e, GraphEdge(e.to, e.from)) // Add inverse
    .keyBy(e => e.from) // Scope Computation by Key
    .timeWindow(1 Min)
    .iterate(
        Termination.Fixpoint,
        Synchrony.Strict,
        (vComponent => vComponent.id), // Loop Key 
        // ENTRY FUNCTION
        (ctx: LoopContext, in: Iterable[GraphEdge], out: Collector[VComponent]) => {
            ctx.loopState("neighbors").setList(in.map(e => e.to)); ctx.loopState("cc").setValue(ctx.key)
            ctx.loopState("neighbors").foreach(n => out.collect(VComponent(n, ctx.key)))
        },
        // STEP FUNCTION
        (ctx: LoopContext, in: Iterable[VComponent], out: Collector[VComponent]) => { 
            val newcc = in.minBy(c => c.component).component
            var cc = ctx.loopState("cc")
            if(cc.value != newcc) {
                cc.setValue(newcc)
                ctx.loopState("neighbors").foreach(n => out.collect(VComponent(n, newcc)))
            }
        },
        // FINALIZE FUNCTION
        (ctx: LoopContext, out: Collector[VComponent]) => {
           out.collect(VComponent(ctx.key, ctx.loopState("cc").value))
        }
    );

Attempt at Explanation

The application calculates the connected component of a graph which is streamed by directed edge additions.

  1. First, flatMap is used to make edges unidirected.
  2. Edges are partitioned by their source vertex.
  3. Group partitioned edges by a tumbling window of 1 minute.
  4. Initiate an iterative process per triggered window
    1. The iterative process terminates when no more messages are exchanged (Termination.Fixed).
    2. The iterative process executes steps in bulk-synchronous iterative fashion (Synchrony.Strict)
    3. The key of the loop is a connected component vertex id.
    4. The Entry function initialises the vertex state, which is:
      1. A list of neighbours (edge destinations). Each edge destination is forwarded into the Step function.
      2. A connected component (initially the vertex id)
    5. The Step function calculates the minimum connected component of the window. If it has changed, then all vertices are updated and broadcast to the next step iteration.
    6. The Finalize function collects the connected component for the current key.

Requirements

With respect to Arc-Script

Requirements of this application which are not present in #308 are listed below:

  1. ❌ Must support an iterate operator.
  2. ❌ Must support optional arguments.