apache / iceberg-python

Apache PyIceberg
https://py.iceberg.apache.org/
Apache License 2.0
473 stars 172 forks source link

[Feature Request] Speed up InspectTable.files() #1229

Open DieHertz opened 1 month ago

DieHertz commented 1 month ago

Feature Request / Improvement

Creating from https://github.com/apache/iceberg-python/pull/614#issuecomment-2375186118 as suggested.

I have a couple of moderately heavy tables with millions of parquet files and ~100 manifest files, which I routinely need to query using the .files metatable.

The existing implementation goes over the manifests sequentially and takes quite a while constructing the resulting pyarrow.Table.

Since reporting the issue in #614 I have spent a while thinking about it and came up with a temporary solution that works in my case.

I'm using a ProcessPoolExecutor and distributing the manifests each to their own mapper, returning pyarrow.RecordBatch from each. Then I'm using pyarrow.Table.from_batches() to construct the resulting table. Ideally I would like to process AVRO manifests on per-block basis to further speed things up, but doing so with the ProcessPoolExecutor seems to come with too much overhead.

Before writing my custom code, .files() on a table with 350k files took ~70 seconds, and around ~200 seconds on a 1m file table. The code below, which is not an apples to apples comparison because I only process the data I need, takes less than 5 seconds.

Here's what I did in my particular case to give the general idea:

_FILES_SCHEMA = pa.schema(
    [
        pa.field('path', pa.string(), nullable=False),
        pa.field('event_id', pa.string(), nullable=False),
        pa.field('ts_min', pa.int64(), nullable=False),
        pa.field('ts_max', pa.int64(), nullable=False),
    ]
)

def get_message_table_files(
    table: pyiceberg.table.Table,
) -> pa.Table:
    schema = table.metadata.schema()
    snapshot = table.current_snapshot()
    if not snapshot:
        return pa.Table.from_pylist(
            [],
            schema=_FILES_SCHEMA,
        )

    with ProcessPoolExecutor() as pool:
        return pa.Table.from_batches(
            pool.map(
                partial(_process_manifest, schema, table.io),
                snapshot.manifests(table.io),
            ),
            schema=_FILES_SCHEMA,
        )

def _process_manifest(
    table_schema: Schema,
    io: FileIO,
    manifest: ManifestFile,
) -> pa.RecordBatch:
    ts_field = table_schema.find_field('ts')

    rows = [
        dict(
            path=entry.data_file.file_path,
            event_id=entry.data_file.partition.event_id,
            ts_min=from_bytes(
                ts_field.field_type,
                entry.data_file.lower_bounds.get(ts_field.field_id),
            ),
            ts_max=from_bytes(
                ts_field.field_type,
                entry.data_file.upper_bounds.get(ts_field.field_id),
            ),
        )
        for entry in manifest.fetch_manifest_entry(io)
        if entry.data_file.file_format == FileFormat.PARQUET
        and entry.status != ManifestEntryStatus.DELETED
    ]

    return pa.RecordBatch.from_pylist(
        rows,
        schema=_FILES_SCHEMA,
    )
sungwy commented 1 month ago

Hi @DieHertz - thank you for raising this issue, and for sharing your benchmarks. I think this is a great idea, that I think we should also consider applying to other Inspect methods that loops through each of the manifest entries in accumulating the table stats.

Would you be interested in working on this issue?

DieHertz commented 1 month ago

Would you be interested in working on this issue?

Yes, I'd be happy to contribute back

kevinjqliu commented 1 month ago

Thanks for the write-up! This looks great!

Parallel processing is how we sped up other parts of the program as well. For example, for planning and for writing parquet files

There's already an ExecutorFactory, do you think we can use that instead of ProcessPoolExecutor?

kevinjqliu commented 1 month ago

As an aside, I think parallelly reading multiple manifests is something we'd want to reuse at other parts of the program

DieHertz commented 1 month ago

There's already an ExecutorFactory, do you think we can use that instead of ProcessPoolExecutor?

The issue with the ExecutorFactory is it's using a ThreadPoolExexutor instead of a process pool, and the operation we're trying to parallelize doesn't play well with GIL in one process. At least I'm not seeing such a significant profit when using it

P.S. I'm talking about CPython as it's the only implementation I have experience with, where Global Interpreter Lock doesn't give much headroom with thread-based parallelism

kevinjqliu commented 1 month ago

That's interesting. I thought the ThreadPoolExexutor is good for I/O bound tasks such as reading from the avro manifest files. If you have a PoC, its something I'd want to play with and test out :)

DieHertz commented 1 month ago

Indeed it is good enough for I/O-bound tasks, but in my understanding this part is CPU-bound.

I think so because I'm observing close to 100% CPU usage when inside plan_files/inspect.files. Also I have profiled how long it takes to perform snapshot.manifests(io) and manifest.fetch_manifest_entry_io), and the amount spent actually downloading the .avro files seems negligible. Most of the time is spent processing the manifests record-by-record and converting each record to a dict

If you have a PoC, its something I'd want to play with and test out :)

Will try to provide something over the next two days

kevinjqliu commented 1 month ago

Most of the time is spent processing the manifests record-by-record and converting each record to a dict

Heres a snippet using threads to parallelize both reading the manifest files and conversion to records.

This is generated by Claude, so please double-check.


def get_message_table_files(
    table: pyiceberg.table.Table,
) -> pa.Table:
    schema = table.metadata.schema()
    snapshot = table.current_snapshot()
    if not snapshot:
        return pa.Table.from_pylist([], schema=_FILES_SCHEMA)

    with ThreadPoolExecutor() as pool:
        return pa.Table.from_batches(
            pool.map(
                partial(_process_manifest, schema, table.io),
                snapshot.manifests(table.io),
            ),
            schema=_FILES_SCHEMA,
        )

def _process_manifest(
    table_schema: Schema,
    io: FileIO,
    manifest: ManifestFile,
) -> pa.RecordBatch:
    ts_field = table_schema.find_field('ts')

    entries = list(manifest.fetch_manifest_entry(io))
    valid_entries = [
        entry for entry in entries
        if entry.data_file.file_format == FileFormat.PARQUET
        and entry.status != ManifestEntryStatus.DELETED
    ]

    with ThreadPoolExecutor() as pool:
        rows = list(pool.map(
            partial(_process_entry, ts_field),
            valid_entries
        ))

    return pa.RecordBatch.from_pylist(rows, schema=_FILES_SCHEMA)

def _process_entry(ts_field: Schema.Field, entry: ManifestEntry) -> Dict:
    return {
        'path': entry.data_file.file_path,
        'event_id': entry.data_file.partition.event_id,
        'ts_min': from_bytes(
            ts_field.field_type,
            entry.data_file.lower_bounds.get(ts_field.field_id),
        ),
        'ts_max': from_bytes(
            ts_field.field_type,
            entry.data_file.upper_bounds.get(ts_field.field_id),
        ),
    }
kevinjqliu commented 1 month ago

BTW I'm not opposed to using ProcessPoolExecutor. I'm just curious why ThreadPoolExecutor cant hit the same performance profile

corleyma commented 1 month ago

Most of the time is spent processing the manifests record-by-record and converting each record to a dict

I haven't looked at this closely, but if memory serves, pyiceberg implements its own avro reader/writer using Cython. Concurrency is great, but I wonder if we can make big gains by implementing a more direct conversion of avro records to pyarrow recordbatches somewhere at that level? Then, processing the manifests could probably be implemented using pyarrow compute functions (C++) for a lot of performance gain?

kevinjqliu commented 1 month ago

pyiceberg implements its own avro reader/writer using Cython

yep, optimistically build avro decoder, fall back to pure python. See https://github.com/apache/iceberg-python/issues/1093#issuecomment-2341456013

koenvo commented 1 month ago

In my experience CPU becomes the next bottleneck when I/O isn’t the bottleneck anymore.

The tests I did also showed that huge improvements can be achieved by using all available cores. Threading only uses a single core. I did have some issues with passing arguments to subprocesses due to serialization. Had to solve it with cloudpickle. Some other parts didn’t really benefit as the serialization of large objects nullified the improvement gained from multiprocessing.

happy to help where possible

DieHertz commented 3 weeks ago

So I haven't tried any actual changes yet, but decided to collect some baseline measurements with py-spy.

First there's pyiceberg 7.0.1 .inspect.files() on my big phat table:

In [3]: start = time.time()
   ...: f = m.inspect.files()
   ...: print('elapsed', time.time() - start)
elapsed 110.91524386405945

In [4]: len(f)
Out[4]: 401188

In [5]: len(m.current_snapshot().manifests(m.io))
Out[5]: 688

m files baseline

It can be seen that most of the time is spent in the AvroFile constructor, where some initial decoding occurs, and inside the list comprehension for manifest entries, where the Avro records get transformed into dicts.

I argue that this is a CPU load rather than IO, and to prove that conclusively I run the same code with a quickly-crafted memoized Snapshot/Manifest:

import pyiceberg
from concurrent.futures import ThreadPoolExecutor

class IOFromBytes:
    def __init__(self, bytes_: bytes):
        self._bytes = bytes_

    def open(self):
        return self

    def __enter__(self):
        return self

    def __exit__(self, a, b, c):
        ...

    def read(self):
        return self._bytes

    def new_input(self, *args, **kwargs):
        return self

class MemoryManifest:
    def __init__(self, manifest, io):
        self._manifest = manifest
        with io.new_input(manifest.manifest_path).open() as f:
            self._io = IOFromBytes(f.read())

    def fetch_manifest_entry(self, *args, **kwargs):
        return self._manifest.fetch_manifest_entry(self._io, **kwargs)

class MemorySnapshot:
    def __init__(self, table: pyiceberg.table.Table):
        with ThreadPoolExecutor() as pool:
            self._manifests = list(pool.map(
                lambda manifest: MemoryManifest(manifest, table.io),
                table.current_snapshot().manifests(table.io),
            ))

    def manifests(self, *args, **kwargs):
        return self._manifests

Now we can see the actual IO takes less than 1 second for a total of ~112 MiB (without ThreadPoolExecutor here it was closer to 11 seconds):

In [35]: start = time.time()
    ...: snapshot = MemorySnapshot(m)
    ...: print('elapsed', time.time() - start)
elapsed 0.4690868854522705

In [37]: len(snapshot._manifests)
Out[37]: 688

In [39]: sum(len(manifest._io._bytes) for manifest in snapshot._manifests) / 1024 / 1024
Out[39]: 112.21551609039307

Now .inspect.files() over already downloaded data:

In [36]: start = time.time()
    ...  m.inspect._get_snapshot = lambda self_: snapshot
    ...: f = m.inspect.files()
    ...: print('elapsed', time.time() - start)
elapsed 97.30642795562744

In [38]: len(f)
Out[38]: 401188

m files no-io

It can be seen that IO takes a little more than 10% of the total time taken by .inspect.files(), and that's about it for the improvement I'm expecting to get if we use just the ThreadPoolExecutor.

DieHertz commented 3 weeks ago

Had to solve it with cloudpickle.

With functors.partial it doesn't seem to be necessary, as I've shown in my earlier messages.

yep, optimistically build avro decoder, fall back to pure python.

Early in my quest for faster .inspect.files() I have tried using fastavro, it didn't help at all in my case. Seemingly the Python implementation in pyiceberg is fast enough.

more direct conversion of avro records to pyarrow recordbatches somewhere at that level

Yes, that might be the way to go. I somewhat did that as my mapper in ProcessPoolExecutor returns pyarrow.RecordBatch. They are also much faster to return than current lists of dicts.

DieHertz commented 3 weeks ago

By the way, there is a related goal in Arrow: https://github.com/apache/arrow/issues/16991

IMO it makes sense to wait until it gets implemented or contribute there, rather than writing our own general-purpose Avro-To-Arrow code. In the meantime the approach with the ProcessPoolExecutor should give a significant improvement

DieHertz commented 3 weeks ago

Here I have extracted the code returning list[dict] of entries for each Manifest and run it inside the ThreadPoolExecutor provided by the pyiceberg.utils.concurrent.ExecutorFactory: https://github.com/DieHertz/iceberg-python/commit/34c28457191ca9225417828e4bdafee22d1e088b

No matter the max_workers value: 1, 4, unlimited, it takes the same time to process. py-spy struggles to sample multiple threads at any usable rate on my machine, so I'm not providing a flame graph for now :-(

Edit: I have realized that I don't know how the generator works in this instance, whether it means back-and-forth communication with the thread for each element returned. Replaced the function to return the entire list instead: https://github.com/DieHertz/iceberg-python/commit/62224f84b55c153235b48f3ff40374802f95ea21 Basically the same result, slightly faster than before, regardless of the number of threads.

Edit: This version which replaced Executor with ProcessPoolExecutor in ~3 times less amount of time. Will try to py-spy it later: https://github.com/DieHertz/iceberg-python/commit/6fb01c5ba916daffffbfc2a420b00d54dbd231dc

corleyma commented 3 weeks ago

IMO it makes sense to wait until it gets implemented or contribute there, rather than writing our own general-purpose Avro-To-Arrow code.

We don't need general-purpose avro to arrow code, though, we only need a fast path for iceberg manifests to arrow. Since we already have a layer that's decoding avro quickly and then building python dictionaries, we could bypass the dictionaries are build a pyarrow table.

The request for avro read support in Arrow cpp is 7 years old at this point, so I don't think I'd wait on that.

In the meantime the approach with the ProcessPoolExecutor should give a significant improvement

Using multiprocessing comes with a lot of potential headaches around cross-platform support, and especially when it happens transparently to callers, can create difficulties for other projects that want to use pyiceberg as an sdk. If there's a way to improve performance without spawning subprocesses it's worth exploring.

If the work is done in Cython avro decoder -> pyarrow recordbatches using PyArrow Cython API, then that also leaves room to release the GIL for meaningful threaded concurrency.

DieHertz commented 3 weeks ago

If the work is done in Cython avro decoder -> pyarrow recordbatches using PyArrow Cython API, then that also leaves room to release the GIL for meaningful threaded concurrency.

I like this idea, will try pursuing it then