apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.51k stars 3.53k forks source link

[Python] Specifying schema does not prevent arrow from reading metadata on every single parquet? #34145

Open cboettig opened 1 year ago

cboettig commented 1 year ago

Describe the bug, including details regarding any error messages, version, and platform.

Consider the following reprex, in which we open a partitioned parquet dataset on a remote S3 bucket:

import pyarrow.dataset as ds
from pyarrow import fs
from timebudget import timebudget

@timebudget
def without_schema():
  s3 = fs.S3FileSystem(endpoint_override = "data.ecoforecast.org", anonymous = True)
  df = ds.dataset(
      "neon4cast-forecasts/parquet/terrestrial_30min",
      format="parquet",
      filesystem=s3
  )
  return(df)

df = without_schema()
schema = df.schema

This takes a whooping 102 seconds on my machine. I believe most of the computation is associated with checking the metadata found in each parquet file, since there are many individual partitions in this data. This process should not be necessary though if we provide the schema:

@timebudget
def with_schema():
  s3 = fs.S3FileSystem(endpoint_override = "data.ecoforecast.org", anonymous = True)
  df2 = ds.dataset(
      "neon4cast-forecasts/parquet/terrestrial_30min",
      format="parquet",
      filesystem=s3,
      schema = schema
  )
  return(df2)

with_schema()

But observe the execution time is once again 102 seconds. Note that if we manually specify a single partition, the process takes only 2.7 seconds:

@timebudget
def single():
  s3 = fs.S3FileSystem(endpoint_override = "data.ecoforecast.org", anonymous = True)
  df = ds.dataset(
      "neon4cast-forecasts/parquet/terrestrial_30min/model_id=climatology/reference_datetime=2022-12-01 00:00:00/date=2022-12-02",
      format="parquet",
      filesystem=s3
  )
  return(df)

single()

I would have expected similar performance between these two cases: ds.dataset() should just be establishing a connection with the schema and not reading any data. After all part of the promise of partitioned data is that we could open the dataset at the root of the parquet db and rely on filtering operations to extract a specific subset without the code ever touching all the other parquet files.

My guess here is that arrow is trying to read all the parquet file metadata to ensure they all match the schema, even though that is not the expected behavior. I think this is the same issue as seen in R https://github.com/apache/arrow/issues/33312. But maybe I'm not doing something correct or miss-understand the expected behavior?

Component(s)

Parquet, Python

westonpace commented 1 year ago

The discovery code is roughly:

def discover_dataset(directory):
  files = list_files_recursive(directory)
  if unify_schemas:
    return unify([get_schema(file) for file in files])
  else:
    return get_schema(files[0])

I think the problem is that all of this time is being spent in list_files_recursive. There may be two problems:

  1. Looking at the code, if there is a lot of nesting in the dataset, we may be doing too many list objects calls because I think list objects is inherently recursive.
  2. Even if we are not doing too many list objects calls we should be able to abort early if we know we only want one file.

I'm going to investigate #1 a bit.

westonpace commented 1 year ago

Ok. I've filed #34213 for #1. I suspect, if that is solved, #2 won't be very noticeable until you have tens of thousands of files. Either way we can leave this issue open for addressing #2.

yonil7 commented 1 year ago

Is it possible somehow to order ds.dataset(...) to not run its discovery/schema inference procedure? This procedure is taking way too long when the dataset folder contains thousands of files. I tried passing schema and exclude_invalid_files=False but it does not help.