bluesky / tiled

API to structured data
https://blueskyproject.io/tiled
BSD 3-Clause "New" or "Revised" License
56 stars 48 forks source link

Store tabular data in SQL #778

Open danielballan opened 3 weeks ago

danielballan commented 3 weeks ago

We recently added support for appendable tabular data storage, in the CSVAdapter. This was done as a prototype in support of the flyscaning project. It is fundamentally not a sound approach, because it relies on an ordinary file on disk as the appendable data store. If a client attempts to read while another client is writing, it is possible for the read to see the file in an inconsistent state (e.g. with a partially-written row). If a client attempts to write while another client is writing---particularly on networked storage---file corruption can result.

Fundamentally, given these two requirements:

  1. Tiled servers scaled horizontally (requests load balanced across multiple server processes) across multiple nodes
  2. The ability to read a partial dataset while it is being written to

we inevitably need a proper database. (We could maybe get by with fancy file-locking logic, but that is tantamount to inventing your own database.)

Therefore, I think we need to remove append support from CSVAdapter, as it is not robust, and add in its place a new adapter that is backed by a SQL database. This would be a separate SQL database from the others we already have:

  1. The authentication database, the first one we added, managed in tiled.authn_database, which holds (bashed) API keys and other authentication state
  2. The catalog databases, which store metadata, filepaths, and information about to how open the files. This is managed in tiled.catalog. (There can be multiple catalog databases for a given tiled server, specified in the config file.)
  3. The new database, proposed here, which would store actual tabular data itself

Currently, when data is written to Tiled it is always written into ordinary files, and the path for writing is configured thus:

    args:
      uri: postgresql+asyncpg://tiled:${TILED_DATABASE_PASSWORD}@postgresql.nsls2.bnl.gov/tiled_hex_bluesky
      writable_storage: /nsls2/data1/hex/assets/tiled_hex_bluesky
      readable_storage:
        # Paths should be in terms of /nsls2/data/ but there may be some given
        # as /nsls2/data1 so we include that too.
        - /nsls2/data/hex/proposals

We will need to extend the configuration to provide not only a writable portion of the filesystem for placing files but a writable database as well, where this new Adapter can create, read, and append to tables.

    args:
      uri: postgresql+asyncpg://tiled:${TILED_DATABASE_PASSWORD}@postgresql.nsls2.bnl.gov/tiled_hex_bluesky
      writable_storage:
        filesystem: /nsls2/data1/hex/assets/tiled_hex_bluesky
        sql: postgresql://...
      readable_storage:
        # Paths should be in terms of /nsls2/data/ but there may be some given
        # as /nsls2/data1 so we include that too.
        - /nsls2/data/hex/proposals

The work can begin by defining a self-contained Adapter class and testing it. Then integration with Tiled, including this configuration file, can follow. The Adapter will look like:

class SQLAdapter
    def __init__(self, uri):
         if uri.startswith("sqlite"):
            self.conn = ...

    def append(self, dataframe):
        # Use ADBC or SQLAlchemy

The job of this class is to present the same interface (methods and attributes) as the other tabular adapters in Tiled, as defined by https://github.com/bluesky/tiled/blob/7f7329de1b4ab39f502075656102585cdcc35f7c/tiled/adapters/protocols.py#L104-L127

It will return pandas DataFrames in read() and read_partition(...). It will consume pandas DataFrames in write(...), write_partition(...) and append(...) (or whatever). Internally, it use SQL queries to fetch data and write data. SQLAlchemy could be used to this. It is "Pythonic" and it supports the SQL backends we care about (SQLite for small deployments and PostgreSQL for scaled deployments). However, SQLAlchemy operates row-wise on the data. We would need to decompose the DataFrame from columnar memory-efficient structures in Python tuples, one per row. This is not efficient. ADBC enables us to operate directly on pyarrow objects, which we translate to and from pandas DataFrames object without memory copies.

ADBC it does not currently support variable-length lists, which is a rare but important case that we need to cover. It seems that support should be possible and may be added upstream soon. We can proceed to use ADBC for most data and either (1) hope that support for LIST is added in time and (2) fall back to using SQLAlchemy for this edge case if it is not ready in time.

With either ADBC or SQLAlchemy, we have to decide how to organize the data in tables.

  1. Create a separate table in SQL for every table that we create in Tiled. For example, in the context of Bluesky experiments, this would mean a separate table in SQL per Run per Stream. So, there a table dedicated to "Scan 42, 'primary' stream" from Bluesky and so on for each experiment and stream.
Scan 42 x y temp
. . .
. . .
Scan 43 x y temp
. . .
. . .
Scan 44 x y temp
. . .
. . .
Scan 45 y temp z
. . .
. . .
  1. Create one giant table with the union of all the columns, e.g. for all data ever taken a given beamline.
The Big Table id x y temp z
42 . . . .
42 . . . .
43 . . . .
43 . . . .
44 . . . .
44 . . . .
45 . . . .
45 . . . .
  1. Create a table for each unique schema (each unique group of column names and data types, such as motor FLOAT, detector FLOAT, sample_position INT).
{fingerprint A} id x y temp
42 . . .
42 . . .
43 . . .
43 . . .
44 . . .
44 . . .
{fingerprint B} id y temp z
45 . . .
45 . . .

Let us evaluate the trade-offs:

  1. Technically, PG and SQLite can both support huge numbers of tables (PG is 4 billion) so it is feasible. However, we will reach some practical limits here: it can take a long time to list the tables if there are 4 billion of them. Also, this would make it hard/inefficient to ever pose queries, "Did motor X ever exceed the limit 5? Did sample holder Y ever get hotter than 300 K?"
  2. This would get very messy. The table would grow wide over time, accumulating the union of every detector and motor ever measured. It would mostly sparse, as many instruments would not often used. There is also a technical that this is unworkable. It's not uncommon to start collecting data with some instrument at FLOAT and later realize that it should be INT or vice versa. Under this system, a given column can only have one data type, and so there is no clean way to recover from this.
  3. Arrow provides a formally-defined binary schema describing the column names and data types. This can be used to generate a fingerprint (for example, the md5 hash of the schema) that uniquely identifies these columns. This can be used as the name of the table.

If we go with (3), this is how it might work:

There should be a specially-named column that identifies which logical dataset in Tiled a given row belongs to, so that when data is read we can do (in ADBC or SQLAlchemy) SELECT * FROM {hash_table_name} WHERE special_column={dataset_id}.

In the example of a Bluesky experiment, the data flow is:

Bluesky RunEngine -> Tiled client -> Tiled server -> SQLAdapter -> SQL

danielballan commented 3 weeks ago

As @skarakuzu noted, we can hard-code that tabular datasets stored in SQL always have 1 partition. Partitioning has values in file-backed storage but not for database-backed storage.

danielballan commented 3 weeks ago

Notes from conversation:

  1. Delete generate_data_sources method, as that is used in a file registration use case (tiled register ... or tiled serve directory ...) which is not applicable here.
  2. In init_storage return list with a single Asset with just a data_uri that comes from init_storage argument.

Notes for later:

DataSource
  mimetype: application/x-sql-table
  parameters: {"table_name": "schema_hash_0fxdkfjsd...", "_id": "..."}
Assets
  data_uri: postgresql://... or sqlite://...