pydiverse / pydiverse.pipedag

A data pipeline orchestration library for rapid iterative development with automatic cache invalidation allowing users to focus writing their tasks in pandas, polars, sqlalchemy, ibis, and alike.
https://pydiversepipedag.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
15 stars 2 forks source link

Local file caching for dataframe/eager tasks #73

Closed windiana42 closed 1 year ago

windiana42 commented 1 year ago

It would be nice if database table writing/reading (with table lock) would be as fast as writing a parquet file. But currently, this is not the case. So we like to cache pulled tables also locally but under the same cache invalidation regime of the table store.

windiana42 commented 1 year ago

@NMAC427 In pipedag.yaml, we already have a little magic of what happens to arguments before they are handed over to a class like table_store. For the local_table_cache, I like have some generic attributes which control how it is invoked and some arguments to the class. Here is an example how we could make it more obvious which attributes are handed over to the class:

    table_store:
      table_store_connection: postgres
      class: "pydiverse.pipedag.backend.table.SQLTableStore"
      args:
        create_database_if_not_exists: true
        print_materialize: true
        print_sql: true

      local_table_cache:
        cache_inputs: true
        cache_outputs: true
        class: "pydiverse.pipedag.backend.table_cache.ParquetTableCache"
        args:
          base_path: "/tmp/pipedag/table_cache"

    blob_store:
      class: "pydiverse.pipedag.backend.blob.FileBlobStore"
      args:
        base_path: "/tmp/pipedag/blobs"

    lock_manager:
      class: "pydiverse.pipedag.backend.lock.ZooKeeperLockManager"
      args:
        hosts: "localhost:2181"

    orchestration:
      class: "pydiverse.pipedag.engine.SequentialEngine"
NMAC427 commented 1 year ago

I like the idea of moving initialization specific arguments into an args section.

NMAC427 commented 1 year ago

Wouldn't local file caching just increase performance for reading? Don't we always need to write the outputs of a task to the main DB?

windiana42 commented 1 year ago

Wouldn't local file caching just increase performance for reading? Don't we always need to write the outputs of a task to the main DB?

Correct. But storing dataframes to parquet is always good for debugging. Thus I suggest to allow it both for input and output. I plan a third parameter "ignore_input_cache" which controls whether input is actually used for caching or whether the data ist just stored for manual (debug) retrieval.

windiana42 commented 1 year ago

With Nick, we came up with an alternative naming for the local_table_cache attributes:

      local_table_cache:
        store_inputs: true
        store_outputs: true
        use_stored_input_as_cache: true

use_stored_input_as_cache=true and store_inputs=false should result in a good error message

windiana42 commented 1 year ago

There are some less obvious consequences from the decision to introduce the args: level: We either need to ask for adding the args: also to the predefined connections. Or we cannot have table_store_connections chose attributes on the class: level. I suggest this way:

    table_store_connections:
        postgres:
            args:
                url: "postgresql://postgres:pipedag@127.0.0.1/{instance_id}"
                schema_prefix: "myflow_"

    table_store:
        table_store_connection: postgres
        class: "pydiverse.pipedag.backend.table.SQLTableStore"
windiana42 commented 1 year ago

For the anchor syntax, this does not work since there is no deep-merge implemented in the standard. Thus the same looks like this:

    _table_store_connections:
        postgres: &db_postgres
            url: "postgresql://postgres:pipedag@127.0.0.1/{instance_id}"
            schema_prefix: "myflow_"
...
    table_store:
        class: "pydiverse.pipedag.backend.table.SQLTableStore"
        args: 
            >>: *db_postgres
            print_sql: true