influxdata / flux

Flux is a lightweight scripting language for querying databases (like InfluxDB) and working with data. It's part of InfluxDB 1.7 and 2.0, but can be run independently of those.
https://influxdata.com
MIT License
760 stars 152 forks source link

Proposal: Stream Processing in Flux #1255

Open affo opened 5 years ago

affo commented 5 years ago

Stream Processing in Flux

This document gathers some thoughts about including in Flux Stream Processing (SP) functionalities and some cool features that characterize state-of-the-art Stream Processors (SPs); e.g. Flink and Spark.

It provides:

About Stream Processing

Stream Processing is the art of processing streams of data. One of the key factors that differentiates streams from datasets is that they are (possibly) unbounded, while datasets are finite.

This poses fundamental questions on how a SP should behave and on how much different can streaming queries be from "standard" ones:

  1. If the input is unbounded, when should a SP produce an output? Would it be the user himself to specify result-triggering policies?
  2. Are there any queries that are possible on datasets that are not possible for streams (think of a distinct query on a stream with a finite amount of memory)?
  3. If the query involves time, what happens if element comes out of order and we can't stop the stream and re-order elements before computing the result?
  4. If a computation on a dataset fails, it can always be restarted. What about a streaming one?

Answers:

  1. In a streaming query the user usually defines time windows that define the triggering policy for the computation under examination. Otherwise, the result is rolling (or you never output any result 😅); i.e. the result must be produced for every record in input: we have "rolling average", "rolling maximum", "rolling count", and so on.
  2. Suppose you need to answer which are the distinct elements in a stream. If you have limited memory (and I bet you have), you must approximate your result, because you cannot store every possible element you have seen so far (they are infinite!). Luckily, there is plenty of work for streaming algorithms to compute approximate results on streams based on summaries or "sketches".
  3. You need to manage out-of-order events: https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101.
  4. See below (Flux engine).

SP in Flux (Language)

Flux comes with everything it needs to express stream processing computation. Take, for example, our "hello world" Flux script:

from(bucket: "db")
  |> range(start: -1m)
  |> filter(fn: (r) => ...)
  |> group(columns: [...])
  |> count()

In our current specification, we suppose that if the user does not specify a stop parameter to range, then it defaults to now. However we could change it, and simply state that if no stop is specified (or no range transformation is specified), then the query never stops, thus making it a streaming query.

In this case the count would become a rolling count instead of an aggregate one. So, it would emit results (by key) for every new record it ingests. Given that there is not a time in which the computation ends and we can trigger its execution, count must execute and produce a meaningful result every time that a new record comes.

Note, that this would be true at a high level of abstraction, it doesn't mean that this should actually happen at processing time. The underlying engine can still batch results and count execute per batch, but the results would be per-record.

This difference could be tricky for users, because count (and other aggregators) would behave in two different ways if used with a bounded or unbounded dataset.

We could change aggregators behavior to produce rolling results, and let the user add last to keep only the last value of the counter (we could use planner rules to optimize aggregators behavior later). This would also come in handy when the user wants to explore the whole evolution of the, for instance, counter and not only the final result.

One thing that Flux is missing right now is user-defined stateful operators. At the moment, we provide users with many stateful operators that hide the complexity of updating an internal state (count, max, derivative, ...). It would be cool to provide the user with the tools to build custom ones at Flux language level. This would relieve us from the burden of implementing in Go every operator that requires state, and also give huge flexibility to Flux users. This is the reason why stateful, user-defined functions is a feature that every modern stream processor provides to users.

As an example, this is a user-defined counter:

from()
  |> filter(...)
  |> group(...)
  |> map(fn: {
    state: {c: 0},
    call: (s, r) => {
      s.c = s.c + 1
      return {v: r._value, c: s.c}
    },
  })

A stateful function is an object that has state and call property. call function gets called every time a new record must be processed. The state is passed to it as first parameter. Every transformation that accepts a function should also be able to accept a stateful function. Note that state is per-group, multiple replicas of the map transformation must run in parallel without ever touching the same piece of state; this is key to scaling out computation.

SP in Flux (Engine)

Rolling operations

The engine should accomodate rolling operations. This poses the problem that most of our code supposes that each transformation has only one table per group key, and errors otherwise. With streaming computations we would have multiple tables with the same group key for different point in time. This would require our transformations not to error if they see the same group key twice or more.

However, we can imagine the streaming job to be divided into epochs and that records in a certain epoch are the input to one batch processing computation. In other words, we can see the streaming job as a chaining of multiple batch jobs (what makes a streaming job more then simply chaining batch ones, is that every batch job starts over where the last one left, and so, jobs carry state one to the other).

If sources manage the beginning and end of an epoch, they can attach an epoch-id to every table group key, so that our code wouldn't need to change. The epoch-id wouldn't be actually displayed to the user, it would only be for internal usage in order to guarantee that the streaming computation is working properly (i.e. no group-key clashes).

Failure

Managing failure is quite an interesting issue.
Suppose that your pipeline contains more than one stateful operator:

IN --> S1 --> S2 --> OUT

IN produces records, S1 and S2 are stateful operators, and OUT consumes results. What if something fails in between: should we replay some record r? What if it already affected S1's state, but not S2's? If you replay r, then you could see it affected S1 twice, and S2 once; if you don't, you'll see it affected S1 once and never affected S2.

The same would happen with a single stateful operator and an operator that produces more then one record for a single record in input (e.g. flatMap):

IN --> FM --> S --> OUT

FM ingests r and produces r1 and r2. You would expect that the processing of r happens atomically, and so that either both changes from r1 and r2 are applied to S, or none. You can imagine that, if something fails in between, you can never be sure whether to replay r or not (similarly to what is explained above).

What we need is consistent snapshots on the internal state: https://arxiv.org/abs/1506.08603. They would give us the possibility to restore state from a consistent snapshot and start replaying records in order to affect the transformations state exactly once.

Note that, in order to replay records, you need a reliable source. Luckily InfluxDB and Kafka are such sources; i.e. they can replay input on demand given an offset (or timestamp).

Complex Event Processing

Complex Event Processing (CEP) is detecting patterns of interest in stream of events to trigger new ones. While a "standard" streaming query could be "calculate the average temperature for rooms in a building in the last 5 minutes every minute", a CEP one could be "if the temperature for some is room greater then 40°C in the last 5 minutes, then trigger a fire alert".

There is no evident relation between continuous query languages and CEP ones. The implementation of engines is different too: while SP engines are focused on using data-independency criteria (grouping) to achieve high parallelism (scaling out) and deploying replicas of operators on many cores; CEP engines implementation is about implementing an efficient state machine that is able to recognize the pattern specified by the user. However, CEP is stream processing too.

These are examples of a CEP engine implementation, and a CEP language specification.

We could decide to offer primitives for CEP queries in Flux. It could be a more comprehensive approach than Kapacitor's to alerting and event handling.

rsobrochado-v commented 4 years ago

Any news on SP in Flux?

affo commented 4 years ago

Hey @rsobrochado! Thank you for your interest! Not that I know now.

This issue has been iceboxed.

Do you have any precise requirement or use case to tell us?

rsobrochado-v commented 4 years ago

Thank you @affo. It's for the upgrade from Kapacitor stream UDFs (~20) to stream processing frameworks with Flux. I think we will adapt the stream UDFs to batch, for now, as some frameworks help with that. But it'll be really nice if Flux gets this in the future =)

ssoroka commented 4 years ago

This would be great for:

  1. executing queries and then streaming results to the UI for live queries. 1b. executing really huge queries and seeing results instantly in the UI as it streams the results back.
  2. using flux as a stream processor; start a query and then pass data through it (separate from InfluxDB), eg: as a processor language in Telegraf.
  3. long running data manipulation queries, eg importing a never-ending data stream from another location