empirical-soft / empirical-lang

A language for time-series analysis
https://www.empirical-soft.com/
Other
168 stars 13 forks source link

Streaming computation #57

Open chrisaycock opened 4 years ago

chrisaycock commented 4 years ago

TL;DR From the roadmap (#1), we want streaming computation, even on arbitrary expressions and user-defined functions.

func wavg(xs, ws):
  return sum(xs * ws) / sum(ws)
end

let trades = load("trades.csv")
let vwaps = from trades select vwap=wavg(price, size) by symbol, bar(time, 5s)
store(vwaps, "vwaps.csv")

Goal

Streaming computation means handling data that arrives in partial segments, such as chunks from a large file or a continuous feed from a real-time source. While there are plenty of streaming platforms, none of them do what we want, which is to be able to handle arbitrary expressions, user-defined functions, multiple aggregations, and differing source contexts.

Consider the Sharpe ratio:

let sharpe = mean(xs) / stdev(xs)

If the aggregations are run independently of each other, then the data source must be read twice. That's really slow for a historical source and generally impossible for a real-time source.

Problems are harder when non-streaming expressions are interspersed.

let df = load(...)
print(count(df))
let result = from df select ...
print(count(result))

Printing a full accounting of the Dataframe requires all data to be present, which is the exact opposite of how streaming works! So Empirical will have to pay careful attention to when streaming is allowed.

Design

Our proposal is that Empirical should be able to track when to enter and exit a stream, and then schedule the computation accordingly.

The good news is that Empirical already has some features that will help with development. The recently added traits and mode were designed in part to handle tracking. Also, the Vector Virtual Machine can explicitly loop over the partial data sets as long as Empirical schedules appropriately.

VVM's operations will have to maintain partial state as part of an online algorithm. This is complicated by user-defined functions since that state must exist outside the function body. (Hypothesis: closures.)

Empirical will need to maintain a lineage to schedule a loop onto VVM. (Caveats: consider compute graphs and topological sorting.)

Implementation

The stream mode is the key to tracking. So long as an expression chain is stream, we will not schedule or run the computation. Similarly, stream implies there is a loop running in VVM.

Once an expression chain is no longer stream, Empirical must schedule and run the computation. Examples for triggering a computation:

Functions

Considerations

Since the REPL triggers a computation, the following does what the user expects:

>>> let df = load(...)
>>> let result = from df select ...
>>> stream_store(result, ...)

That is, even though stream_store() is stream, the attempt to show a result in the REPL will force everything to run. Similarly, putting the above in a file will work just fine because of the EOF.

Further, streaming output for a batch process is fine because the loop will exit after a single iteration.

>>> stream_store(batch_load(...), ...)

Container results (arrays, Dataframes, etc) must be appended. Ie., "the collection must be collected". This occurs at the end of the loop for whatever value is passed to the batch consumer.

If the user passes a stream computation to two non-stream expressions, then Empirical will have to run everything twice. There isn't much getting around this.

Likewise, if the user weaves independent expressions between a stream chain, they will not execute in order. The below example will produce the wrong runtime metric because the clock will be retrieved back-to-back without waiting for the result to finish.

let df = load(...)
let start = now()
let result = from df select ...
let runtime = now() - start

We will likely want to distinguish between chunked and continuous, not just stream. Punting for now.

Grouped aggregation is stream and will emit the full result during the loop by default. However, the user may set a flag that emits changes instead of the full result. (This is like outputMode("update") in Spark or emit changes in ksqldb.)

chrisaycock commented 4 years ago

Thinking through some items, perhaps load() should be batch by default; users must ask for stream_load(). VVM ops can optionally have a streaming version takes a state. If a linear op has no streaming version, then there is no need for a state and therefore the op can be called directly.

chrisaycock commented 4 years ago

To build this, we will keep the streaming data as a stub; the stream_load() function will be a batch operation, but its mode will allow us to practice lineage and state. Here is the order of tasks:

  1. Lineage: schedule the full stream as a loop when output is needed (Codegen)
  2. State: maintain a separate register in VVM
  3. Chunking: finally yield the streaming data
chrisaycock commented 4 years ago

The "streaming register" passed to a streaming function (whether UDF or VVM op) must exist outside the scope of the function definition and must be initialized before the loop. Trying to make VVM's interpreter fast (and the IR compact) conflicts with the explicitness that LLVM requires once we make a JIT. The proposed compromise is that streaming registers will be implicit offsets from a passed register bank.

sum_i64s %1 %2 *0    ; the *0 is a streaming register

The streaming register bank is like this in C++: an implicit struct whose pointer is passed to the member function. The streaming registers are then offset from this bank.

All UDFs must get a bank passed by the caller, which implies that the bank is still represented as a register.

call foo 4 *1 %1 %2 %3    ; the *1 is a register to the caller and a bank to the callee

LLVM will require that the shape of the bank be known in advance to put this on the stack. One possible solution is that each UDF will have a type definition for the streaming bank recursively determined by all of the streaming types of invoked UDFs and ops. This requires nested type definitions, which VVM currently does not support! We may punt on the typedef issue since "JIT is next year's problem", but we do need to be aware of this.

One other note is that functions that do not require any streaming state (such as simple addition) do not need a register! That will save us some performance (and readability) if we know in advance that the streaming type is empty.

chrisaycock commented 4 years ago

Streaming registers present an issue for grouped aggregations: the split DFs need their own state. Additionally, the categories and indices of groupings need to be saved from one chunk to another. We might use a register window in which the register bank is slid over for each split DF.

chrisaycock commented 4 years ago

Optimization: Since stateless VVM ops don't need a streaming register, a UDF that only calls stateless ops is also stateless. And this is recursive; just need to track that a UDF is stateless. Having two types of UDFs will likely require two types of the call instruction: one that takes a streaming register and one that does not.

Additionally, we can have a streaming-enabled version of a UDF or VVM op that is produced only in Stream mode. This optimization is desired, but possibly harder because regeneration is required.

chrisaycock commented 4 years ago

The proposal is to issue a pre-release of chunked streaming without optimizations or table syntax. Then 0.7.0 will add grouped aggregations and joins where the right table is batch. After 0.7.0 we'll add continuous streaming and joins with bounded streams.

chrisaycock commented 4 years ago

The first pass at state registers still needs work on UDF. Proposal is to have Sema duplicate the function definition when the call is in Stream mode, similar to how generics work. The caller's parameters will become streaming args in the duplicated function.