vega / vegafusion

Serverside scaling for Vega and Altair visualizations
https://vegafusion.io
BSD 3-Clause "New" or "Revised" License
317 stars 17 forks source link

Push down column selections when using `__dataframe__` protocol #386

Closed ivirshup closed 9 months ago

ivirshup commented 1 year ago

Hi,

I would like to expose a __dataframe__ protocol on a large object where you would never actually want to request all columns. In this feature request:

over on the altair repo it was suggested that vegafusion should be able to push down column selections when handling the dataframe interchange protocol. This does not seem to be happening at first glance.

In this example, I create a subclass of pandas dataframe interchange object that prints the column name being retrieved every time get_column_by_name is called. Making a 2d histogram of "Origin" by "Miles_per_Gallon", I would only expect to see those two columns accessed. However:

import pandas as pd
import vega_datasets
import altair as alt
import vegafusion
vegafusion.enable()

from pandas.core.interchange.dataframe import PandasDataFrameXchg

class NoisyDfInterface(pd.core.interchange.dataframe.PandasDataFrameXchg):
    def __dataframe__(self, allow_copy: bool = True):
        return NoisyDfInterface(self._df, allow_copy=allow_copy)

    def get_column_by_name(self, name):
        print(f"get_column_by_name('{name}')")
        return super().get_column_by_name(name)

cars = vega_datasets.data.cars()

(
    alt.Chart(NoisyDfInterface(cars))
    .mark_rect()
    .encode(
        x=alt.X("Origin"),
        y=alt.Y("Miles_per_Gallon:Q", bin=True),
        color="count()",
    )
)
get_column_by_name('Origin')
get_column_by_name('Name')
get_column_by_name('Miles_per_Gallon')
get_column_by_name('Cylinders')
get_column_by_name('Displacement')
get_column_by_name('Horsepower')
get_column_by_name('Weight_in_lbs')
get_column_by_name('Acceleration')
get_column_by_name('Year')
get_column_by_name('Origin')

This is using latest altair and vegafusion.

Environment info Output of `sessioninfo.show(dependencies=True, html=False)` ``` ----- altair 5.1.1 pandas 2.1.0 session_info 1.0.0 vega_datasets 0.9.0 vegafusion 1.4.0 ----- anyio NA appnope 0.1.2 arrow 1.2.3 asttokens NA attr 23.1.0 attrs 23.1.0 babel 2.12.1 backcall 0.2.0 certifi 2022.09.24 chardet 5.1.0 charset_normalizer 2.1.0 cloudpickle 2.2.1 colorama 0.4.6 cython_runtime NA dateutil 2.8.2 debugpy 1.5.1 decorator 5.1.0 duckdb 0.8.1 executing 0.8.2 fastjsonschema NA fqdn NA google NA idna 3.3 importlib_metadata NA ipykernel 6.17.1 ipywidgets 8.0.7 isoduration NA jedi 0.18.1 jinja2 3.1.1 json5 NA jsonpointer 2.4 jsonschema 4.18.0 jsonschema_specifications NA jupyter_events 0.6.3 jupyter_server 2.7.0 jupyterlab_server 2.23.0 markupsafe 2.1.1 mpl_toolkits NA nbformat 5.9.0 numexpr 2.8.1 numpy 1.24.4 overrides NA packaging 23.1 parso 0.8.2 pexpect 4.8.0 pickleshare 0.7.5 pkg_resources NA platformdirs 3.8.1 polars 0.18.15 prometheus_client NA prompt_toolkit 3.0.38 psutil 5.9.0 ptyprocess 0.7.0 pure_eval 0.2.1 pyarrow 13.0.0 pydev_ipython NA pydevconsole NA pydevd 2.6.0 pydevd_concurrency_analyser NA pydevd_file_utils NA pydevd_plugins NA pydevd_tracing NA pygments 2.13.0 pythonjsonlogger NA pytz 2022.7.1 referencing NA requests 2.31.0 rfc3339_validator 0.1.4 rfc3986_validator 0.1.1 rpds NA ruamel NA send2trash NA setuptools 65.6.3 simplejson 3.17.6 sitecustomize NA six 1.16.0 sniffio 1.2.0 sphinxcontrib NA stack_data 0.1.4 toolz 0.12.0 tornado 6.2 traitlets 5.6.0 typing_extensions NA uri_template NA urllib3 1.26.12 vegafusion_embed NA vegafusion_jupyter 1.4.0 vl_convert 0.13.1 wcwidth 0.2.5 webcolors 1.13 websocket 1.2.1 yaml 5.4.1 zipp NA zmq 24.0.1 zoneinfo NA ----- IPython 8.14.0 jupyter_client 8.3.0 jupyter_core 5.3.1 jupyterlab 4.0.2 notebook 7.0.2 ----- Python 3.9.16 (main, Dec 7 2022, 10:15:13) [Clang 13.0.0 (clang-1300.0.29.30)] macOS-13.4.1-x86_64-i386-64bit ----- Session information updated at 2023-09-05 23:09 ```

Any idea what's up here? Is my expectation correct?

jonmmease commented 1 year ago

Thanks for raising the issue @ivirshup. This is currently expected. VegaFusion will build up a query against the source dataset that only references and produces the required columns, but the current default DataFusion implementation converts the input object to Arrow before evaluating the query against it.

Could you say a little more about your usecase? For your __dataframe__ object, do you want the computation to be performed in the kernel by VegaFusion's default DataFusion query engine? Or does your object provide it's own query functionality?

In https://github.com/altair-viz/altair/issues/3134, I was referring to an idea that @jcrist is looking at for pushing computation into an Ibis table. In this (future) scenario, the DataFrame interchange protocol would be used by Altair to perform column type inference, and then VegaFusion would push calculations into Ibis, where Ibis would control which columns are pulled in from the database along with performing the data transformations.

I think it would be possible to add support for using the DataFusion query engine directly against __dataframe__ objects, where only the necessary columns are converted to Arrow. Just wanted to check that this is your ideal end state.

ivirshup commented 1 year ago

Thanks for the quick response!

Could you say a little more about your usecase?

Sure! This is for anndata. A simplified explanation of this object: a large 2d array (often a sparse array) with labeled dimensions aligned with a dataframe. The set of possible columns would be one of those dimensions + other annotations. This would typically lead to tens of thousands of possible columns.

We really don't want to create an "actual" in memory dataframe with all those columns since that would require densifying the whole matrix, which is expensive and often we wouldn't have the available memory for.

Also, sometimes this object may be stored on disk. We can handle fairly efficient random access to the on disk format, but also really wouldn't want to load the whole thing into memory + densify it for a plot.

For your dataframe object, do you want the computation to be performed in the kernel by VegaFusion's default DataFusion query engine? Or does your object provide its own query functionality?

I would like to be able to use VegaFusion's computation engine(s).

I think it would be possible to add support for using the DataFusion query engine directly against dataframe objects, where only the necessary columns are converted to Arrow. Just wanted to check that this is your ideal end state.

This is what I'm looking for right now. Maybe something more complex would be nice, but I think there would be a ton of value from just column selection.

jonmmease commented 1 year ago

Thanks for the additional context, anndata sounds really useful and the scenario you describe in the original issue does sound like a good fit.

To make this work, I think what we'd need to do is write a custom DataFusion DataSource (See example in https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/custom_datasource.rs). This DataSource implementation would be in Rust, but would use PyO3 to wrap a Python object that implements the __dataframe__ protocol. The execute function here would use the column projection that DataFusion computed from the query to request the necessary columns from the __dataframe__ object before converting the these columns into arrow RecordBatches.

https://github.com/apache/arrow-datafusion/blob/47fd9bf5b7a1b931e6e8bd323a01ae54fda261e5/datafusion-examples/examples/custom_datasource.rs#L260-L270

We'd then change this chunk of vegafusion's Python logic to not eagerly convert __dataframe__ objects to pyarrow, but instead pass them through the way SqlDataset and DataFrameDataset are.

https://github.com/hex-inc/vegafusion/blob/41478dee63e8b6824271bfb43f23c6f5f1052c5a/python/vegafusion/vegafusion/runtime.py#L162-L173

jonmmease commented 9 months ago

A few more design notes:

In _import_or_register_inline_datasets, when we check for __dataframe__, instead of converting to arrow using pyarrow's dataframe interchage support, add the object as-is to imported_inline_datasets.

https://github.com/hex-inc/vegafusion/blob/ddc14af269609a08a8bc968720902659981dbe69/python/vegafusion/vegafusion/runtime.py#L232

Then in the vegafusion-python-embed's process_inline_datasets, check for the presense of a __dataframe__ property (alongside checks for SqlDataset and DataFrameDataset).

https://github.com/hex-inc/vegafusion/blob/ddc14af269609a08a8bc968720902659981dbe69/vegafusion-python-embed/src/lib.rs#L141

When found, create a VegaFusionDataset::DataFrame from the DataFrame created by calling a new self.connection.scan_dfi associated function on the Connection trait.

https://github.com/hex-inc/vegafusion/blob/ddc14af269609a08a8bc968720902659981dbe69/vegafusion-dataframe/src/connection.rs#L11

We'll need to add a pyo3 feature flag to the vegafusion-dataframe crate to enable this trait. Then we implement this associated function for the DataFusionConnection trait (also behind a pyo3 feature flag). This implementation will create a new DataFusion DataSource that uses select_columns_by_name to filter down the columns before using pyarrow's dfi support to convert the selected columns to arrow.

This approach should also improve the performance of working with pandas, and will hopefully close the performance gap between the DataFusion and DuckDb data connections.

jonmmease commented 9 months ago

This is available to try out in version 1.6.0-rc1. Let me know if you have a chance to try it with anndata

ivirshup commented 7 months ago

Hey @jonmmease, thanks for implementing this! I'm just getting a change to try this now.

Using the code above I am still seeing every column get accessed. Is this expected, and should I be trying something different? It does look like your example from #438 does have the desired behaviour though:

Demo ```python import pandas as pd import vega_datasets import altair as alt # import vegafusion # vegafusion.enable() alt.data_transformers.enable("vegafusion") from pandas.core.interchange.dataframe import PandasDataFrameXchg class NoisyDfInterface(pd.core.interchange.dataframe.PandasDataFrameXchg): def __dataframe__(self, allow_copy: bool = True): return NoisyDfInterface(self._df, allow_copy=allow_copy) def get_column_by_name(self, name): print(f"get_column_by_name('{name}')") return super().get_column_by_name(name) cars = vega_datasets.data.cars() chart = ( alt.Chart(NoisyDfInterface(cars)) .mark_rect() .encode( x=alt.X("Origin"), y=alt.Y("Miles_per_Gallon:Q", bin=True), color="count()", ) ) chart ``` ``` get_column_by_name('Origin') get_column_by_name('Name') get_column_by_name('Miles_per_Gallon') get_column_by_name('Cylinders') get_column_by_name('Displacement') get_column_by_name('Horsepower') get_column_by_name('Weight_in_lbs') get_column_by_name('Acceleration') get_column_by_name('Year') get_column_by_name('Origin') ``` ```python movies = pd.read_json("https://raw.githubusercontent.com/vega/vega-datasets/main/data/movies.json") movies = pd.concat([movies]*3200, axis=0).reset_index(drop=False) print(len(movies)) chart = alt.Chart(NoisyDfInterface(movies)).mark_bar().encode( alt.X("IMDB Rating:Q", bin=True), y='count()', ) chart ``` ``` 10243200 get_column_by_name('index') get_column_by_name('Title') ```

Is there some sort of threshold under which this doesn't kick in?

jonmmease commented 7 months ago

Hi @ivirshup. I think the calls to get_column_by_name are coming from Altair (in order to access the schema of the DataFrame). This call is required to return a DFI column object (which includes the column type), but this doesn't need to involve physically load the column's underlying data.

See https://github.com/altair-viz/altair/pull/3114 for how Altair is using the __dataframe__ protocol to extract type information. And see https://github.com/ibis-project/ibis/pull/6733 for an example of how Ibis implements the protocol to provide column info for all of the columns without actually performing the underlying query.

The part that VegaFusion added in #438 was to add a call to dfi.select_columns_by_name(columns) before loading the dataframe into memory with pyarrow. For anndata, I think you could follow the pattern Ibis uses for implementing get_column_by_name lazily (this assumes you have cheap access to the schema of all of the columns without loading them all into memory). And then also implement dfi.select_columns_by_name(columns) to filter out the set of available columns.