apache / iceberg-python

Apache PyIceberg
https://py.iceberg.apache.org/
Apache License 2.0
480 stars 177 forks source link

Is `pyiceberg.Table` thread-safe? #1305

Closed chengchengpei closed 1 week ago

chengchengpei commented 2 weeks ago

Question

Is pyiceberg.Table thread-safe? can we call table.append(table_data) in multiple threads or processes to write to the same table parallelly?

kevinjqliu commented 2 weeks ago

the write process rely on the catalog's atomic swap to guarantee serializable isolation. Multiple threads can call append but only one will succeed at a time, the others have to retry the commit https://iceberg.apache.org/spec/?h=concurrency#optimistic-concurrency

chengchengpei commented 2 weeks ago

@kevinjqliu

i tried to run the following codes: (multiple processes to append to the same iceberg table)

import os
import time
from multiprocessing import Pool

import pyarrow as pa
import pyarrow.parquet as pq
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, BinaryType
import base64
import boto3

from utils import list_files

def process_batch(batch):
    print('start processing batch')
    ids = []
    image_data = []
    for image_path, image_name in batch:
        with open(image_path, "rb") as f:
            image_data.append(base64.b64encode(f.read()))
            ids.append(image_name)
    table_data = pa.Table.from_pydict({"id": ids, "image_data": image_data})
    start = time.time()
    catalog = load_catalog("glue", **{
        "type": "glue",
        "region": "us-east-1",
        "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
        "s3.access-key-id": os.getenv("AWS_KEY_ID"),
        "s3.secret-access-key": os.getenv("AWS_SECRET_ACCESS_KEY"),
        "max-workers": 8
    })
    catalog.load_table("test.imagenet-object-localization-challenge-10000").append(table_data)
    end = time.time()
    print('uploaded {} in {} seconds'.format(len(ids), end - start))
    return len(ids), end - start

if __name__ == "__main__":
    # Create a schema for the Iceberg table
    schema = Schema(
        NestedField(1, "id", StringType()),
        NestedField(2, "image_data", BinaryType())
    )

    # Load the Iceberg catalog
    catalog = load_catalog("glue", **{
        "type": "glue",
        "region": "us-east-1",
        "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
        "s3.access-key-id": os.getenv("AWS_KEY_ID"),
        "s3.secret-access-key": os.getenv("AWS_SECRET_ACCESS_KEY"),
        "max-workers": 8
        # "write.parquet.compression-codec": "snappy"
    })

    catalog.create_namespace_if_not_exists("test")

    # Create an Iceberg table
    table = catalog.create_table_if_not_exists(
        identifier="test.imagenet-object-localization-challenge-10000",
        schema=schema,
        location="s3://test/iceberg-data/")

    # Load images and convert to base64
    images_list = list_files("/Users/ILSVRC/data/CLS-LOC/test/", extension=".JPEG")
    batch_size = 10000
    total_batches = 10
    processes = []
    batches = [images_list[i:i + batch_size] for i in
               range(0, min(len(images_list), total_batches * batch_size), batch_size)]

    with Pool(4) as pool:
        results = pool.map(process_batch, batches)

    for result in results:
        print('uploaded {} in {} seconds'.format(result[0], result[1]))

but got

Traceback (most recent call last):
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 48, in mapstar
    return list(map(*args))
  File "/Users/test/write_images_to_iceberg.py", line 51, in process_batch
    catalog.load_table("test.imagenet-object-localization-challenge-10000").append(table_data)
  File "/Users/test/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", line 1578, in append
    tx.append(df=df, snapshot_properties=snapshot_properties)
  File "/Users/tst/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", line 289, in __exit__
    self.commit_transaction()
  File "/Users/test/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", line 712, in commit_transaction
    self._table._do_commit(  # pylint: disable=W0212
  File "/Users/test/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", line 1638, in _do_commit
    response = self.catalog._commit_table(  # pylint: disable=W0212
  File "/Users/test/venv/lib/python3.9/site-packages/pyiceberg/catalog/glue.py", line 484, in _commit_table
    updated_staged_table = self._update_and_stage_table(current_table, table_request)
  File "/Users/test/venv/lib/python3.9/site-packages/pyiceberg/catalog/__init__.py", line 835, in _update_and_stage_table
    requirement.validate(current_table.metadata if current_table else None)
  File "/Users/tests/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", line 1262, in validate
    raise CommitFailedException(
pyiceberg.exceptions.CommitFailedException: Requirement failed: branch main has changed: expected id 2633742078255924117, found 3998254648540280684
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/test/write_images_to_iceberg.py", line 92, in <module>
    results = pool.map(process_batch, batches)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 771, in get
    raise self._value
pyiceberg.exceptions.CommitFailedException: Requirement failed: branch main has changed: expected id 2633742078255924117, found 3998254648540280684

i have too many rows to write to the same iceberg table.... how to speed it up?

Thanks

kevinjqliu commented 2 weeks ago

pyiceberg.exceptions.CommitFailedException

This is not a thread issue. This is expected when you have concurrent writers. The writer should retry the commit. See #1084 for a similar issue. And #269 as the issue to track this feature. See https://github.com/apache/iceberg-python/issues/269#issuecomment-2402969232 for a workaround.

i have too many rows to write to the same iceberg table.... how to speed it up?

You can try writing batches as parquet files, and then collect them all in the main method. Use the add_files method to commit all parquet files to the iceberg table, this will commit only once to the table.