oceanprotocol / pdr-backend

Instructions & code to run predictoors, traders, more.
Apache License 2.0
33 stars 24 forks source link

[Lake][I/O] Techspike - Middleware for parquet appending/mutating/other #572

Closed idiom-bytes closed 9 months ago

idiom-bytes commented 10 months ago

Motivation

Parquet files work w/ partitioning, but you need to have a way about it. As we work towards building an incremental pipeline, we need an approach to append records rather than what we have today. Today, we simply append data and yield a new parquet file. As data grows, this i/o will become more expensive.

Towards a solution

Rather than creating new records, parquet/polars supports partitions. This works very similarly as a regular .parquet file, except you have file_partitions.parquet folder, w/ many files, that you can read natively by polars by pointing at the folder name and treating it like a regular file. Just like any other db, your partioning can impact performance.

To build the partitions effectively, perhaps we should look at some thin middleware like fastparquet or duckdb to handle the partitioning/bucketing.

Potential solutions

As we look to support enough tables + size on-disk, we'll need a solution sooner rather than later to avoid long disk I/O times.

First, Apache Arrow which is the standard for how polars is built, supports a broad approach to partitioning as described below. By using pyarrow.parquet.write_to_dataset

image

https://arrow.apache.org/cookbook/py/io.html#writing-partitioned-datasets https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_to_dataset.html https://stackoverflow.com/questions/76121937/how-to-append-new-data-to-an-existing-parquet-file

Second, Fastparquet append/write w/ partitions: https://fastparquet.readthedocs.io/en/latest/quickstart.html#writing Polars read parquet w/ partitions: https://docs.pola.rs/py-polars/html/reference/api/polars.read_parquet.html

Another potential solution would be to use Duckdb. However, as far as I understand it, we might need to first save the data to duckdb (db in-memory, persistent storage on analytics.db file). Such that we append to DuckDB, and then write new parquet files. DuckDB looks very promising, but leaves gaps as to whether it will allow us to append/grow incrementally.

Duckdb partitioned writes also seem to implement append in the same as pyarrow.parquet (using overwrite rules). Please techspike to test/verify.

https://duckdb.org/docs/data/partitioning/hive_partitioning https://duckdb.org/docs/data/partitioning/partitioned_writes#overwriting

Near-parquet

Polars supports other data-formats and processes (like Delta Lake) that may be able to help us across all 2 stages of the pipeline.

  1. Delta Lake - Upstream/Fetching. Where we need to join + mutate records. I.e. Database like, to yield bronze-tables.
  2. Arrow/Duck - Downstream/Computing. Where we just need to process records incrementally and can append-to-end/rebuild, to yield silver (aggregate) and gold (summary) tables.

DoD

idiom-bytes commented 9 months ago

I found a really interesting approach that uses a vector db written in rust that uses the arrow format as the base layer. Please prioritize tech spiking with that rather than with iceberg.

Motivation

Benefits

lancedb as ETL

Gives you the ability to work on a columnar file format, with everything on-disk, in-process, that handles appending/upserts/etc... and because it's all arrow in the base layer, you get all the serialization benefits, and integration w/ the data ecosystem.

It does this by using an inverted indexer which is different than how everyone else is doing things... This specific architecture unlocks what we're looking for here. https://lancedb.github.io/lancedb/concepts/index_ivfpq/

Lancedb looks promising as it's all in-process, so there are no extra servers to run or infra (like competing spark solutions), duckdb, etc... As an example, rather than our db being saved in ".db" or some other db-format, everything is in .arrow files.

https://lancedb.github.io/lancedb/#open-source-and-cloud-solutions

Please look at the following blogpost to see how to query a lance db table w/ polars lazyframe.

https://blog.lancedb.com/lancedb-polars-2d5eb32a8aa3

DuckDB as ETL

DuckDB (and other columnar dbs, whether in-memory or in-process) are still looking like strong contenders. However, will require a mixture of SQL w/ python, and it's unlikely to provide clean polar connectors. This means that all pre-filtering of records should likely be done through a duck-db SQL query, to reduce in-memory requirements for compute/data jobs.

It's storage is fairly compressed and will continue to lead towards effective storage/compute. https://duckdb.org/internals/storage.html

100 GB of uncompressed CSV files into a DuckDB database file will require 25 GB of disk space, while loading 100 GB of Parquet files will require 120 GB of disk space.

Clickhouse as ETL

Beyond DuckDB, if an OSS cluster, self-hosted db solution to do ELT is needed, clickhouse (and others) can serve this purpose.

Other solutions

We know clustered MPPs like Spark/Hadoop or columnar DBs like Redshift will solve large scale problems. This isn't the goal of this research.

idiom-bytes commented 9 months ago

We may need to consider some of the limitations for how lance exposes polars functionality, such we can optimize for max performance.

TLDR; "We use the scan_pyarrow_dataset to convert to LazyFrame. Polars is able to push-down some filters to a pyarrow dataset, but the pyarrow dataset expects pyarrow compute expressions while Lance expects SQL strings. This means that we’ve had to disable the filter push-downs, meaning that Polars won’t be able to take advantage of the fast filtering and indexing features of Lance."

To use the features of lance we could create a library of filters: -filter_by_column -filter_by_timestamp -filter_by_datetime -filter_by_daterange

And then follow a set of steps every time: (1) before doing the polars work, you fast filter/index w/ lance-friendly-sql (2) we can take a combination of the filters we created above to generate a lance-sql statement (3) we submit lance friendly filters to leverage their engine (4) we get a lazy_frame where we can do_fancy_polars_stuff()

kdetry commented 9 months ago

I have spent some time to look around with techs, and my consideration is below.

Database Technologies

1. DuckDB

2. ClickHouse ( I think this one is fit for us)

3. LanceDB

4. CouchDB

5. MariaDB ColumnStore

6. Apache Kudu

File Formats

1. Parquet

2. ORC

3. CSV or JSON

Considerations for Scalability

idiom-bytes commented 9 months ago

I'm looking specifically for an arrow-based approach that has polars-native integration such that we can do e2e testing on top of simple, on-disk data, without having to resort to integrating w/ a local db.

I'm also looking for an in-process approach such that we don't have to pass down large memory and compute requirements to our users such that they require a cluster off-the-bat in order to support a basic filesystem + queries.

Clickhouse + other DBs require multiple servers to keep the FS + querying available. Arrow-based systems run on-top off S3. They only need whatever is querying to run the query. It's serverless.

This passes a much smaller set of requirements and scope down to our users.

LanceDB is built on-top of the arrow, columnar-based filesystem, as a base-level implementation it's a columnar database

The discussion is of whether we want to continue focus on a serverless, in-process, arrow-based system, that supports all the needs we have and can solve our problems at-scale, rather than a cluster-based approach.

idiom-bytes commented 9 months ago

[Further on Lance dovetailing with AI] Am also now realizing the benefits that a vector db w/ good random sampling can do for the ML and MPP, and how one doevetailing the other more and more...

[Reading from filesytem for training] Example... downstream, spark/cluster/data will have to read/get data to test the model sample(x_train,y_train), which is exactly what a vectordb is designed to do efficiently. For other db models (sql, couch), we'll either need an inefficient connector and because there are many things hitting the cluster, more and more provisioning. image

With lancedb, provisioning is done at the process level (i.e. every predictoor reading from disk pays their part) rather than at the cluster level (shared service).

[Distributed computing and MPP] Due to arrow/provisioning for ML workloads, it's incredibly easy to sample/shuffle data for distributed workloads. Other dbs like redshift/etc... do not have this. All of this data also needs to exist on disk (i.e. S3) so it can be sampled/shuffled by the system.... this is one of those ops + infra + serialization steps that takes a ton of time to get implemented and costs a ton of time + $$$ every time you run.

https://lancedb.github.io/lance/integrations/tensorflow.html#distributed-training-and-shufflingexanpo

Rather, our paradigm because of polars + arrow + lance, would be to have all of this already computed, and on-disk. This means that there is a lot less "exporting db, loading to clusters, bootstrapping filesystem" . Everything is already partitioned and structured on-disk, ready to go.

So again, things that look like will be solved now "by just using OLTP, or a cluster", I believe will lead to a ton of hurdles and costs further down the timeline. 95% of our pipeline is OLAP, and further from it, it's AI/ML. The more I look at it, the more this makes sense to me.

The I believe the recommendations above can help us separate our core ETL pipelines such that they can be done end-to-end on a serverless environment, at-scale.

Outside this core, layer-of-the-onion, we can then implement other compute solutions and platforms. Such as spark, etc...

idiom-bytes commented 9 months ago

Research and tech spikes delivered. Decision is to build out infra w/ duckdb.