delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2.16k stars 386 forks source link

Ability to get table updates in a streaming manner #413

Closed blogle closed 2 years ago

blogle commented 3 years ago

Description

I would like to read a stream of updates on a delta table, where the table is append only meaning no inserts/updates or deletes.

Use Case Today we have both a custom built messaging infrastructure built on-top of zmq and Google pub/sub acting as a message broker. For each topic there is a single producer that does some processing and submits the message, 1 or more consumers subscribe to the stream of records, do some transformation (invoke an ML model, create a transaction, etc) and might subsequently themselves publish their output as a stream for downstream consumers. To support offline use cases like testing, ml training, or analytics we have to maintain separate tooling to ETL this data into various db's and warehouses. I would like to evaluate

For additional context see the slack thread: https://delta-users.slack.com/archives/C013LCAEB98/p1629763325074200 Related Issue(s)

houqp commented 3 years ago

can delta lake be leveraged as a durable message broker obviating our need of pub/sub and related etl jobs?

Yes, we have lots of streaming pipelines (Spark or native Rust) that are powered by streaming delta tables at Scribd.

For me, the tradeoffs between streaming delta tables and realtime message queues like Kafka boils down to throughput v.s. latency. Large chunk of read/write with delta tables has significantly higher throughput compared to Kafka topics. On the other hand, Delta table streaming has much higher latency, on the order of seconds. So it really boils down to the latency requirement of your application. If you need real time sub second latency, then kafka or google pub/sub is the way to go. If you are OK with near real time latency like seconds or even minutes, delta table could be a great streaming backbone. It also has the added benefit of having higher availability and durability depending on which backing object store is being used.

can latency of reads be tuned such that delta lake is feasible for serving ml features in "soft" real-time, obviating our zmq tooling and related etl?

As I mentioned earlier, if by "soft" you meant couple seconds latency, then delta table can handle it out of the box. I believe there are ways to even further reduce it if you implement extra out of band logic to notify downstream consumers for new commits. But ultimately, it's capped at the latency SLA provided by the backing object store. For example, if it's backed by S3, you can't go lower than couple hundred ms.

houqp commented 3 years ago

Today, the ability to stream incremental table updates is already implemented, but not exposed as public interfaces. For example, the update_incremental method uses this pattern to update materialized table state: https://github.com/delta-io/delta-rs/blob/2acae721a8c24cb75b7e374d215af3351d902735/rust/src/delta.rs#L663.

For downstream streaming consumers that only care about newly appended files, but not materialized delta table states, we can expose a similar low level public method that returns the next new commit based on a given table version. Here is what it could look like from the consumer side:

let mut current_version = table.version;

loop {
    match table.peek_next_commit(current_version)? {
        PeekCommit::New{ version, actions } => {
            for action in actions {
                if let Action::add(add) = action && add.data_change {
                     // handle new file: add.file
                }
            }
            current_version = version;
        }
        PeekCommit::Uptodate => _,
    }
    // sleep for some amount of time before the next poll
}

Based on this API, we could also build higher level wrapper APIs like table.peek_appended_files.

The benefit of this design is we can stream append only updates directly to consumers without overhead of applying these commits to materialize the detla table state. If the consumer also requires access to a materialized table state, we could add a variant of the update_incremental method that updates the table state, but at the same time also returns the newly applied actions back to the caller as a vector.

In my mind, there are 3 types of streaming consumers: 1) only cares about appended files 2) only cares about new table state 3) cares about both appended files and updated table state. The current update_incremental API is optimized for 2).