pytorch / data

A PyTorch repo for data loading and utilities to be shared by the PyTorch domain libraries.
BSD 3-Clause "New" or "Revised" License
1.13k stars 153 forks source link

SQL Pipe #502

Open Nintorac opened 2 years ago

Nintorac commented 2 years ago

🚀 The feature

Allows for sourcing datasets from SQL queries. Should allow to substitute in different backends. eg Athena, Presto, Postgres. Should do smart batching to minimize number of query requests and maximize data returned per query.

Batching will probably be backend dependent. Eg Athena/Presto do not support LIMIT .. SKIP ... queries

Motivation, pitch

Data is often stored in large datalakes on which SQL queries can be run to transform and load data. At some point this data must be turned into tensors however loading of this takes quite a lot of care and each query can take multiples of seconds meaning it cannot be done

Alternatives

Could not find any existing implementations. I have written my own solutions that allows user to index on a query but it is very hacky.

Current solution looks something like this


from itertools import groupby
class SQLTAble():

    @cached
    def get_patient(self, id) -> pd.DataFrame:

        return self.get_patients([id])

    def get_patients(self, ids) -> pd.DataFrame:

        ids = '(' + ','.join(ids) + ')'

        query = f"""
        SELECT patientid, offset, parameterid, value from icu_data
        where patientid in {ids}
        order by patientid, offset, parameterid
        """

        return client.query(query)

    def prefill_cache(self, all_patients):

        for patients in batch(all_patients, batch_size):

            results = self.get_patients(patients)

            patient_iter, patient_results_iter = groupby(results, lambda x: x['patientid'])

            for patient, patient_results in zip(patient_iter, patient_results_iter):

                self.add_cache(patient, patient_results)

the @cached decorator check if the patient is already in an lmdb database and shortcuts having to perform the query if it is.

Naievely fetching patient by patient is far too slow but it's also not possible to fetch all patients at once as this would result in memory issues.


Not specifically related to the SQLPipe ... but for this specific use case that I am working on at the moment there would also be a need to transform the data from columnar format to timeseries format. This is also quite expensive at scale and might benefit from a Pipe function.

eg above example has (patientid, offset, parameterid, value), this should be turned into a tensor of shape (patient x seq_len x n_parameters) and the values filled from value. Basically the data is queried in sparse COO format and needs to be densified. (There may be an SQL way of doing this, if so would love to hear it. My SQL is admittedly weak)

Additional context

No response

ejguan commented 2 years ago

This might be one OSS story integrating both TorchArrow and TorchData, where TorchArrow handles DataFrame transformation and forward to backend SQL engine cc: @wenleix @VitalyFedyunin

VitalyFedyunin commented 2 years ago

Note: If we allow such DataPipe we would need to take special care of order nondeterministic nature of most SQL database engines.

wenleix commented 2 years ago

Yeah sounds makes sense to use Arrow memory layout to fetch data from warehouse, for example Snowflakes Python client support fetching result batch as Arrow table: https://docs.snowflake.com/en/user-guide/python-connector-distributed-fetch.html#retrieving-the-list-of-result-batches .

The data buffer in Arrow Array already implements Python buffer protocol so can be converted to Tensor via torch.frombuffer, e.g. https://github.com/wenleix/axolotls/blob/ccce45361d4e78c530d23d8e70e7a2686315cd81/axolotls/string_column.py#L98-L102

cc @dracifer , @bearzx

dracifer commented 2 years ago

Agree it should be achievable. But in that case, users will use 2 sets of languages (SQL and Dataframe-style) to describe transformation logic? Is it a reasonable user experience?