zio / zio-analytics

Distributed stream processing using ZIO
Apache License 2.0
61 stars 4 forks source link

Design session summary #1

Open iravid opened 5 years ago

iravid commented 5 years ago

Purpose and requirements

The purpose of this library is to provide a solution for creating distributed stream processing applications. These applications read data from data sources that (optionally) support partitioning, in parallel, process them through collection-like operators and write them out to data sinks.

The library’s representation for processing plans will be modeled in a reified manner from day 1, allowing for introspection and optimization. The library will also allow to perform stateful computations in an efficient manner. State will be kept persistently per-partition or per-key (similar to how Flink handles state).

Deployment model, distribution and coordination

We started off by comparing two possible deployment models:

I noted that the first model is notoriously hard to get right and operate in modern deployment environment, such as Kubernetes. The problem is that it creates a “platform-within-a-platform”: you’ve got a K8s cluster running your applications, and on top of that you create a cluster that runs more applications.

On the other hand, with the homogenous model, multiple JVMs are running the same application jar: they start up, parse their configuration, bind an HTTP server, run other stuff. They all run the same API calls for creating the analytics plan. Imagine something like this being run on multiple nodes:

def main = 
  for {
    config <- parseConfig
    _      <- startHttpServer
    plan = CassandraSource("table").map(...).fold(...).to(S3Destination("bucket", "prefix"))
    _ <- analytics.execute(plan)
  } yield 0

When the nodes get to analytics.execute, they start a coordination process that does some sort of leader election, analysis of the plan and work distribution. With this model, we no longer have to worry about transferring code or closures between the master and the workers: all classes required to run the application are already present on all nodes.

So to summarize, we will be going for the homogenous, embedded library model. To clarify - this does not preclude creating an application that can receive new plans online! There’s no reason why the JVM nodes cannot expose an HTTP interface that can receive JSON representations of the plans and execute them. In fact, this is quite harder with the master/worker model, because the platform “runs us” vs. us running the cluster.

API concepts

We will follow up on the excellent work done on scalaz-analytics last year. The concept there is that users of the library do not embed arbitrary functions and closures into the plan, but rather write it in a specialized DSL. Following is a short excerpt; see github.com/scalaz/scalaz-analytics for the full details:

trait AnalyticsModule {
  // A description of a stream processing plan
  type DataStream[A]
  // An abstract arrow that represents a transformation between types in the plan
  type =>:[A, B]
  // A type that can be represented in the plan
  type Type[A]

  trait StdLib {
    def andThen[A, B, C](f: A =>: B, g: B =>: C): A =>: C
    // more operations
  }
}

If we limit all analytics plans to be written without use of actual Scala functions, and only use the abstract arrows for transformations, we guarantee full introspection and optimization opportunities for the plans.

Extensibility

Three things cannot be expressed with abstract arrows and the standard library from the analytics module:

For these, we will need to devise open traits that users will implement with a strict specification of how they will work. This remains to be fleshed out.

State and Time

One extremely useful concept that has been implemented in Flink is the pervasive use of state to implement stateful operators. Every operator in Flink can define persistent state that is local to either a partition of an operator or a key (in case the operator follows a groupBy operation). These states can then be accessed and mutated freely within the operator.

The state is not just some opaque value with get/set/modify operations; it is further reified into types such as ListState, MapState, AggregatingState and so forth. Reifying the state in this manner allows for efficient persistence and memory usage when storing the data in something like RocksDB. When accessing a MapState[K, V], for example, only the key which is being modified needs to be deserialized into memory.

It remains to be fleshed out how this will be solved, but most likely the standard library of operations will contain a state module with operations that pertain to state mutation and access.

For time handling, we need to support the installation of timers in operators to perform periodic work, and support for accessing the current time - both in terms of event time and processing time.

Next steps

I will soon(tm) work on a plan that will hopefully lead us to an MVP. Stay tuned for updates!

regiskuckaertz commented 5 years ago

Have you seen Tweag's recent announcement for their porcupine library?

SemanticBeeng commented 5 years ago
  • Data sources: operators that actually produce data by performing external IO;
  • Data sinks: operators that write data to outside data stores by performing external IO.

Please consider we need to differentiate between computation runtime state and data at rest. This is needed when implementing, for example, a #ML #FeatureStore : computation that performs the feature extraction logic happens incrementally but/and data is managed "outside"; the main value of the data is when computation finished. (Flink management of runtime state is complementary to this).

Will need a few more iterations to explain.

Please consider reviewing #ApacheHudi (see how commits works as a means to capture incremental state mutation "effects")

  1. https://twitter.com/semanticbeeng/status/1173510586472325121
  2. https://github.com/semanticbeeng/hudipoc

In this context #ApacheHudi (or #DeltaLake) would be the "data store" but more precisely a #DataLake / #FeatureStore in the data analytics world.

Note: Used hash tags to suggest more can be found on twitter on these topics.

SemanticBeeng commented 5 years ago

The state is not just some opaque value with get/set/modify operations; it is further reified into types such as ListState, MapState, AggregatingState and so forth. Reifying the state in this manner allows for efficient persistence and memory usage when storing the data in something like RocksDB.

Always felt that Flink is missing an abstraction to separate the computation logic (thought as pure function) from the state mutation effects that are persisted in RocksDB (thought as particular I/O implementation). If there was a way to have such abstraction then it would easier to reason about when data is stored "to outside data stores".

Such I/O abstraction would ideally unify between RocksDB used for managing runtime state and (say) #ApacheHudi used for managing state at rest (in #FeatureStore).

Please give me a sign and will elaborate.