apache / datafusion-python

Apache DataFusion Python Bindings
https://datafusion.apache.org/python
Apache License 2.0
321 stars 64 forks source link

Missing pushdowns for pyarrow datasets #703

Closed adriangb closed 1 week ago

adriangb commented 1 month ago

Some predicates are not pushed down to pyarrow datasets correctly. I'm guessing it's just not implemented but I couldn't find any issue tracking what is and what isn't implemented.

This example is a bit contrived but the point is that in both the integer and timestamp case datafusion should push down the filter and thus match no fragments, read no files and return an empty result set. The missing file trick is just a way to verify that no files are being read.

from datetime import datetime, timezone
import shutil

import datafusion
import pyarrow
import pyarrow.compute
import pyarrow.dataset
import pyarrow.fs

shutil.rmtree('data', ignore_errors=True)  # proof that there's no data in the data directory

format = pyarrow.dataset.ParquetFileFormat()
filesystem = pyarrow.fs.SubTreeFileSystem('data', pyarrow.fs.LocalFileSystem())
fragments = [
    # note that the partition_expression is totally wrong
    format.make_fragment(
        '1.parquet',
        filesystem=filesystem,
        partition_expression=(pyarrow.dataset.field('a') <= pyarrow.compute.scalar(1)),
    )
]

dataset = pyarrow.dataset.FileSystemDataset(
    fragments, pyarrow.schema([pyarrow.field('a', pyarrow.int64())]), format, filesystem
)

fragments = list(dataset.get_fragments(pyarrow.dataset.field('a') == pyarrow.scalar(2)))
assert fragments == []

ctx = datafusion.SessionContext()
ctx.register_dataset('dataset', dataset)

df = ctx.sql('SELECT * FROM dataset WHERE a = 2')
assert df.collect() == [], fragments

format = pyarrow.dataset.ParquetFileFormat()
filesystem = pyarrow.fs.SubTreeFileSystem('data', pyarrow.fs.LocalFileSystem())
fragments = [
    # note that the partition_expression is totally wrong
    format.make_fragment(
        '1.parquet',
        filesystem=filesystem,
        partition_expression=(
            pyarrow.dataset.field('a') == pyarrow.scalar(datetime(2000, 1, 1, tzinfo=timezone.utc), pyarrow.timestamp('ns', '+00:00'))
        ),
    )
]

dataset = pyarrow.dataset.FileSystemDataset(
    fragments, pyarrow.schema([pyarrow.field('a', pyarrow.timestamp('ns', '+00:00'))]), format, filesystem
)

fragments = list(
    dataset.get_fragments(
        pyarrow.dataset.field('a')
        == pyarrow.scalar(datetime(2024, 1, 1, tzinfo=timezone.utc), pyarrow.timestamp('ns', '+00:00'))
    )
)
assert fragments == [], fragments

ctx = datafusion.SessionContext()
ctx.register_dataset('dataset', dataset)

df = ctx.sql("SELECT * FROM dataset WHERE a = '2024-01-01T00:00:00+00:00'")
# error because it tries to access the file that doesn’t exist
assert df.collect() == []
Michael-J-Ward commented 1 month ago

Hi @adriangb,

Thanks for the issue.

Could you highlight a little more clearly where the behavior in your script deviates from expectations and how you'd expect it to behave?

adriangb commented 1 month ago

I would expect that for the timestamp case the result is empty and the file is never accessed, just like the integer case.

Michael-J-Ward commented 1 week ago

Sorry for the delay.

Have a PR fix up. Hopefully will get this in next release.