apache / iceberg-python

Apache PyIceberg
https://py.iceberg.apache.org/
Apache License 2.0
402 stars 147 forks source link

[bug?] cannot run integration test #1162

Closed kevinjqliu closed 1 week ago

kevinjqliu commented 2 weeks ago

Apache Iceberg version

main (development)

Please describe the bug 🐞

On Mac, anyone having issue running make test-integration on the current main branch?

I'm having issues but not sure if its due to my local env

soumya-ghosh commented 2 weeks ago

Been facing the same issue on my Mac (Python 3.11) since yesterday when I rebased latest main branch to my dev branch. Just realized that tests are not running on main branch itself. I did make clean and make test-integration-rebuild and now I'm seeing more errors than before. Although I did run some of those integration tests through IDE and are running fine.

Error message - OSError: When initiating multiple part upload for key 'default/arrow_table_v1_with_multiple_partitions/metadata/snap-6162354366671903431-0-18a934c0-df15-4f4f-acec-3aedfe585765.avro' in bucket 'warehouse': AWS Error NETWORK_CONNECTION during CreateMultipartUpload operation: curlCode: 43, A libcurl function was given a bad argument

kevinjqliu commented 2 weeks ago

Thanks for confirming. I see the same issue https://gist.github.com/kevinjqliu/c8310b6253beab52cce93391df03bfe4

And only for commits at and after 1971fcfe0875eeb200dbcb66f385e504cfad6609 https://github.com/apache/iceberg-python/commits/main/

The commit before (b5933756b5b488ec51cd56d5984731b6cc347f2b) does not have this issue

The CI integration tests are fine. And @sungwy confirmed that running the integration tests via codespace also works

kevinjqliu commented 1 week ago

Full pytest report here: https://gist.github.com/kevinjqliu/a0e8e2199bd8064757eb2b40409e0794

Here's the breakdown of the errors:

171 pyarrow/error.pxi:92: OSError
* 87 pyiceberg/io/pyarrow.py:2426: in write_parquet
* 83 with write_manifest_list(

write_parquet stack trace:

pyiceberg/io/pyarrow.py:2426: in write_parquet
    with fo.create(overwrite=True) as fos:
pyiceberg/io/pyarrow.py:311: in create
    output_file = self._filesystem.open_output_stream(self._path, buffer_size=self._buffer_size)
pyarrow/_fs.pyx:887: in pyarrow._fs.FileSystem.open_output_stream
    ???
pyarrow/error.pxi:155: in pyarrow.lib.pyarrow_internal_check_status
    ???

write_manifest_list stack trace

pyiceberg/table/update/snapshot.py:253: in _commit
    with write_manifest_list(
pyiceberg/manifest.py:924: in __enter__
    self._writer.__enter__()
pyiceberg/avro/file.py:258: in __enter__
    self.output_stream = self.output_file.create(overwrite=True)
pyiceberg/io/pyarrow.py:311: in create
    output_file = self._filesystem.open_output_stream(self._path, buffer_size=self._buffer_size)
pyarrow/_fs.pyx:887: in pyarrow._fs.FileSystem.open_output_stream
    ???
pyarrow/error.pxi:155: in pyarrow.lib.pyarrow_internal_check_status
    ???
kevinjqliu commented 1 week ago

One realization is that the manifest cache is implemented as a global cache. https://github.com/apache/iceberg-python/blob/de47590c6ac4f507cb2337c20504a62c484339f9/pyiceberg/table/snapshots.py#L234-L238

And kept in memory until eviction.

And since ManifestFile has an IO component, I think the file descriptors are kept alive and never cleaned up.

Why does this only affect M1 Mac? I have no idea.

kevinjqliu commented 1 week ago

1185 reimplements the manifest cache as specific to the Snapshot instance. So the cache can be cleaned up once the Snapshot is GC'ed.

This also fits the feature request in #595, to be based on a specific Snapshot

sungwy commented 1 week ago

And since ManifestFile has an IO component, I think the file descriptors are kept alive and never cleaned up.

Hmmm I don't think Manifest file has an IO component. It's only used as an input parameter to one of the class methods:

https://github.com/apache/iceberg-python/blob/de47590c6ac4f507cb2337c20504a62c484339f9/pyiceberg/manifest.py#L555-L620

kevinjqliu commented 1 week ago

I should be more specific. Reading (and caching) a ManifestFile object (or a list of them) ultimately keeps a bunch of PyArrowFile objects alive (anytime io.new_input is called).

In PyArrowFile, the open and create methods do not automatically clean up resources https://github.com/apache/iceberg-python/blob/de47590c6ac4f507cb2337c20504a62c484339f9/pyiceberg/io/pyarrow.py#L258-L318

I think this was part of the issue since I had to increase the system file descriptors limit.

sungwy commented 1 week ago

That's a very interesting observation πŸ‘€ I'm so curious to understand what's happening. If that's in fact what's happening, I think your proposed solution sounds promising...

I'm still trying to understand this issue thoroughly

In the lru cached _manifests we are caching a list of manifest files, but in fact we are only reading a single manifest list, and deserializing each of the records as ManifestFile Records. So the only PyArrowFile that I'd expect to be left 'open' according to this theory would be the manifest_list

https://github.com/apache/iceberg-python/blob/de47590c6ac4f507cb2337c20504a62c484339f9/pyiceberg/manifest.py#L623-L639

kevinjqliu commented 1 week ago

I took a step back and realized the fundamental issue was the newly introduced cache.

Without the cache, everything works fine. With the cache, things break.

Going a layer deeper, this probably means the bug is only for cache hits, as cache misses will just recompute. So the failure scenario is when the cache hits, but the return value is wrong.

Fundamentally, there are a couple issues with the function definition

@lru_cache
def _manifests(io: FileIO, manifest_list: str) -> List[ManifestFile]:
    """Return the manifests from the manifest list."""
    file = io.new_input(manifest_list)
    return list(read_manifest_list(file))

First, the cache key is both io and manifest_list, whereas we just want the key to be manifest_list Second, the result is a list, which can be mutated leading to the wrong result.

Here’s an example to showcase the different cache keys

cache = {}

def _manifests(io: FileIO, manifest_list: str, snapshot: Snapshot) -> List[ManifestFile]:
    """Return the manifests from the manifest list."""
    # key = (manifest_list, )  # works
    # key = (manifest_list, io)  # fails
    key = (manifest_list, snapshot)  # works
    if key in cache:
        return cache[key]
    cache[key] = list(read_manifest_list(io.new_input(manifest_list)))
    return cache[key]

Without digging into where it is breaking or why only for M1 Macs, there are 2 potential solutions:

  1. Move the manifest cache to the Snapshot instance
  2. Use the cachetools library to specify manifest_list as the only cache key (see stack overflow)
kevinjqliu commented 1 week ago

1185 uses Snapshot instance specific cache

1187 uses global cache via the cachetools library to specific the cache key

I recommend #1187