Eventual-Inc / Daft

Distributed data engine for Python/SQL designed for the cloud, powered by Rust
https://getdaft.io
Apache License 2.0
2.34k stars 164 forks source link

Serialization error with S3Config's credential_provider #3322

Closed desmondcheongzx closed 1 day ago

desmondcheongzx commented 3 days ago

Describe the bug

In https://dist-data.slack.com/archives/C041NA2RBFD/p1731789019058459, a user encountered a serialization error when using a callable credential provider for S3.

Code:

def get_credentials() -> daft.io.S3Credentials:
    session = boto3.Session()
    creds = session.get_credentials()
    return daft.io.S3Credentials(
        key_id=creds.access_key,
        access_key=creds.secret_key,
        session_token=creds.token,
        expiry=datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=1),
    )

s3_config = S3Config(
    credentials_provider=get_credentials,
)

io_config = IOConfig(s3=s3_config)

source = daft.read_parquet(
    "s3://noetik-datalake-prod/branches/main/warehouse/curated__cosmx_cells/*",
    io_config=io_config,
)

io_config = IOConfig(s3=s3_config)

source = daft.read_parquet(
    "s3://noetik-datalake-prod/branches/main/warehouse/curated__cosmx_cells/*",
    io_config=io_config,
)

Error:

(ScanWithTask-Project-FanoutHash [Stage:2] pid=3867337) thread '<unnamed>' panicked at src/daft-scan/src/python.rs:480:5: [repeated 30x across cluster]
(ScanWithTask-Project-FanoutHash [Stage:2] pid=3867337) pyo3_runtime.PanicException: called `Result::unwrap()` on an `Err` value: Custom("AttributeError: type object 'S3Credentials' has no attribute 'expiry'") [repeated 90x across cluster]
(ScanWithTask-Project-FanoutHash [Stage:2] pid=3867337) note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace [repeated 30x across cluster]
(ScanWithTask-Project-FanoutHash [Stage:2] pid=3867337) Traceback (most recent call last): [repeated 60x across cluster]
(ScanWithTask-Project-FanoutHash [Stage:2] pid=3867337)   File "python/ray/_raylet.pyx", line 2254, in ray._raylet.task_execution_handler [repeated 60x across cluster]
(ScanWithTask-Project-FanoutHash [Stage:2] pid=3867337)   File "python/ray/_raylet.pyx", line 2150, in ray._raylet.execute_task_with_cancellation_handler [repeated 60x across cluster]
(ScanWithTask-Project-FanoutHash [Stage:2] pid=3867337)   File "python/ray/_raylet.pyx", line 1839, in ray._raylet.execute_task [repeated 300x across cluster]
(ScanWithTask-Project-FanoutHash [Stage:2] pid=3867337)   File "/home/noetik/twm/.venv/lib/python3.10/site-packages/ray/_private/serialization.py", line 460, in deserialize_objects [repeated 120x across cluster]
(ScanWithTask-Project-FanoutHash [Stage:2] pid=3867337)     return context.deserialize_objects(data_metadata_pairs, object_refs) [repeated 60x across cluster]
(ScanWithTask-Project-FanoutHash [Stage:2] pid=3867337)     obj = self._deserialize_object(data, metadata, object_ref) [repeated 60x across cluster]
(ScanWithTask-Project-FanoutHash [Stage:2] pid=3867337)   File "/home/noetik/twm/.venv/lib/python3.10/site-packages/ray/_private/serialization.py", line 317, in _deserialize_object [repeated 60x across cluster]
(ScanWithTask-Project-FanoutHash [Stage:2] pid=3867337)     return self._deserialize_msgpack_data(data, metadata_fields) [repeated 60x across cluster]
(ScanWithTask-Project-FanoutHash [Stage:2] pid=3867337)   File "/home/noetik/twm/.venv/lib/python3.10/site-packages/ray/_private/serialization.py", line 272, in _deserialize_msgpack_data [repeated 60x across cluster]
(ScanWithTask-Project-FanoutHash [Stage:2] pid=3867337)     python_objects = self._deserialize_pickle5_data(pickle5_data) [repeated 60x across cluster]
(ScanWithTask-Project-FanoutHash [Stage:2] pid=3867337)   File "/home/noetik/twm/.venv/lib/python3.10/site-packages/ray/_private/serialization.py", line 262, in _deserialize_pickle5_data [repeated 60x across cluster]
(ScanWithTask-Project-FanoutHash [Stage:2] pid=3867337)     obj = pickle.loads(in_band) [repeated 60x across cluster]

S3Credentials seems to be missing the expiry attribute, which is optional for the user to provide. Either in some constructor, or in some serde path, we're dropping this.

To Reproduce

No response

Expected behavior

No response

Component(s)

Other

Additional context

No response

tvanhens commented 3 days ago

If I remove the expiry key I get a new error:

(ScanWithTask-LocalLimit [Stage:2] pid=359147) thread '<unnamed>' panicked at src/daft-scan/src/python.rs:455:5:
(ScanWithTask-LocalLimit [Stage:2] pid=359147) called `Result::unwrap()` on an `Err` value: Custom("AttributeError: type object 'S3Credentials' has no attribute 'access_key'")
(ScanWithTask-LocalLimit [Stage:2] pid=359147) note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
(ScanWithTask-LocalLimit [Stage:2] pid=359147) Traceback (most recent call last):
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "python/ray/_raylet.pyx", line 2254, in ray._raylet.task_execution_handler
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "python/ray/_raylet.pyx", line 2150, in ray._raylet.execute_task_with_cancellation_handler
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "python/ray/_raylet.pyx", line 1805, in ray._raylet.execute_task
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "python/ray/_raylet.pyx", line 1806, in ray._raylet.execute_task
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "python/ray/_raylet.pyx", line 1809, in ray._raylet.execute_task
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "python/ray/_raylet.pyx", line 1837, in ray._raylet.execute_task
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "python/ray/_raylet.pyx", line 1839, in ray._raylet.execute_task
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "/home/noetik/twm/.venv/lib/python3.10/site-packages/ray/_private/worker.py", line 841, in deserialize_objects
(ScanWithTask-LocalLimit [Stage:2] pid=359147)     return context.deserialize_objects(data_metadata_pairs, object_refs)
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "/home/noetik/twm/.venv/lib/python3.10/site-packages/ray/_private/serialization.py", line 460, in deserialize_objects
(ScanWithTask-LocalLimit [Stage:2] pid=359147)     obj = self._deserialize_object(data, metadata, object_ref)
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "/home/noetik/twm/.venv/lib/python3.10/site-packages/ray/_private/serialization.py", line 317, in _deserialize_object
(ScanWithTask-LocalLimit [Stage:2] pid=359147)     return self._deserialize_msgpack_data(data, metadata_fields)
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "/home/noetik/twm/.venv/lib/python3.10/site-packages/ray/_private/serialization.py", line 272, in _deserialize_msgpack_data
(ScanWithTask-LocalLimit [Stage:2] pid=359147)     python_objects = self._deserialize_pickle5_data(pickle5_data)
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "/home/noetik/twm/.venv/lib/python3.10/site-packages/ray/_private/serialization.py", line 262, in _deserialize_pickle5_data
(ScanWithTask-LocalLimit [Stage:2] pid=359147)     obj = pickle.loads(in_band)
(ScanWithTask-LocalLimit [Stage:2] pid=359147) pyo3_runtime.PanicException: called `Result::unwrap()` on an `Err` value: Custom("AttributeError: type object 'S3Credentials' has no attribute 'access_key'")
(ScanWithTask-LocalLimit [Stage:2] pid=359147) Exception ignored in: 'ray._raylet.task_execution_handler'
(ScanWithTask-LocalLimit [Stage:2] pid=359147) Traceback (most recent call last):
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "python/ray/_raylet.pyx", line 2254, in ray._raylet.task_execution_handler
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "python/ray/_raylet.pyx", line 2150, in ray._raylet.execute_task_with_cancellation_handler
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "python/ray/_raylet.pyx", line 1805, in ray._raylet.execute_task
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "python/ray/_raylet.pyx", line 1806, in ray._raylet.execute_task
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "python/ray/_raylet.pyx", line 1809, in ray._raylet.execute_task
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "python/ray/_raylet.pyx", line 1837, in ray._raylet.execute_task
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "python/ray/_raylet.pyx", line 1839, in ray._raylet.execute_task
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "/home/noetik/twm/.venv/lib/python3.10/site-packages/ray/_private/worker.py", line 841, in deserialize_objects
(ScanWithTask-LocalLimit [Stage:2] pid=359147)     return context.deserialize_objects(data_metadata_pairs, object_refs)
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "/home/noetik/twm/.venv/lib/python3.10/site-packages/ray/_private/serialization.py", line 460, in deserialize_objects
(ScanWithTask-LocalLimit [Stage:2] pid=359147)     obj = self._deserialize_object(data, metadata, object_ref)
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "/home/noetik/twm/.venv/lib/python3.10/site-packages/ray/_private/serialization.py", line 317, in _deserialize_object
(ScanWithTask-LocalLimit [Stage:2] pid=359147)     return self._deserialize_msgpack_data(data, metadata_fields)
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "/home/noetik/twm/.venv/lib/python3.10/site-packages/ray/_private/serialization.py", line 272, in _deserialize_msgpack_data
(ScanWithTask-LocalLimit [Stage:2] pid=359147)     python_objects = self._deserialize_pickle5_data(pickle5_data)
(ScanWithTask-LocalLimit [Stage:2] pid=359147)   File "/home/noetik/twm/.venv/lib/python3.10/site-packages/ray/_private/serialization.py", line 262, in _deserialize_pickle5_data
(ScanWithTask-LocalLimit [Stage:2] pid=359147)     obj = pickle.loads(in_band)
(ScanWithTask-LocalLimit [Stage:2] pid=359147) pyo3_runtime.PanicException: called `Result::unwrap()` on an `Err` value: Custom("AttributeError: type object 'S3Credentials' has no attribute 'access_key'")
(ScanWithTask-LocalLimit [Stage:2] pid=359147) [2024-11-18 23:00:04,425 C 359147 359147] task_receiver.cc:213:  Check failed: objects_valid
tvanhens commented 3 days ago

It seems like something might be amiss with the S3Credentials object itself. It looks like it has no session_token property. When I try printing the resolved values for access key and secret key, they print fine. When I try to access the session token like so:

def get_credentials() -> daft.io.S3Credentials:
    session = boto3.Session()
    creds = session.get_credentials()
    return daft.io.S3Credentials(
        key_id=creds.access_key,
        access_key=creds.secret_key,
        session_token=creds.token,
        expiry=(datetime.datetime.now() + datetime.timedelta(seconds=1)),
    )

credentials = get_credentials()
print(credentials.session_token)

I get this error:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
File /home/noetik/twm/src/twm/data/ray/cosmx.py:71
     [65](https://vscode-remote+ssh-002dremote-002bcoder-002dvscode-002ecoder-002enoetikdev-002ecom-002d-002dtvanhens-002d-002dohio-002dtest-002d-002ddev.vscode-resource.vscode-cdn.net/home/noetik/twm/src/twm/data/ray/cosmx.py:65)     return daft.read_parquet(
     [66](https://vscode-remote+ssh-002dremote-002bcoder-002dvscode-002ecoder-002enoetikdev-002ecom-002d-002dtvanhens-002d-002dohio-002dtest-002d-002ddev.vscode-resource.vscode-cdn.net/home/noetik/twm/src/twm/data/ray/cosmx.py:66)         "s3://noetik-datalake-prod/branches/main/warehouse/curated__cosmx_cells/*",
     [67](https://vscode-remote+ssh-002dremote-002bcoder-002dvscode-002ecoder-002enoetikdev-002ecom-002d-002dtvanhens-002d-002dohio-002dtest-002d-002ddev.vscode-resource.vscode-cdn.net/home/noetik/twm/src/twm/data/ray/cosmx.py:67)         io_config=io_config,
     [68](https://vscode-remote+ssh-002dremote-002bcoder-002dvscode-002ecoder-002enoetikdev-002ecom-002d-002dtvanhens-002d-002dohio-002dtest-002d-002ddev.vscode-resource.vscode-cdn.net/home/noetik/twm/src/twm/data/ray/cosmx.py:68)     )
     [70](https://vscode-remote+ssh-002dremote-002bcoder-002dvscode-002ecoder-002enoetikdev-002ecom-002d-002dtvanhens-002d-002dohio-002dtest-002d-002ddev.vscode-resource.vscode-cdn.net/home/noetik/twm/src/twm/data/ray/cosmx.py:70) setup_ray()
---> [71](https://vscode-remote+ssh-002dremote-002bcoder-002dvscode-002ecoder-002enoetikdev-002ecom-002d-002dtvanhens-002d-002dohio-002dtest-002d-002ddev.vscode-resource.vscode-cdn.net/home/noetik/twm/src/twm/data/ray/cosmx.py:71) source = read_cosmx_cells()
     [73](https://vscode-remote+ssh-002dremote-002bcoder-002dvscode-002ecoder-002enoetikdev-002ecom-002d-002dtvanhens-002d-002dohio-002dtest-002d-002ddev.vscode-resource.vscode-cdn.net/home/noetik/twm/src/twm/data/ray/cosmx.py:73) source.show()

File /home/noetik/twm/src/twm/data/ray/cosmx.py:56
     [54](https://vscode-remote+ssh-002dremote-002bcoder-002dvscode-002ecoder-002enoetikdev-002ecom-002d-002dtvanhens-002d-002dohio-002dtest-002d-002ddev.vscode-resource.vscode-cdn.net/home/noetik/twm/src/twm/data/ray/cosmx.py:54) def read_cosmx_cells() -> daft.DataFrame:
     [55](https://vscode-remote+ssh-002dremote-002bcoder-002dvscode-002ecoder-002enoetikdev-002ecom-002d-002dtvanhens-002d-002dohio-002dtest-002d-002ddev.vscode-resource.vscode-cdn.net/home/noetik/twm/src/twm/data/ray/cosmx.py:55)     credentials = get_credentials()
---> [56](https://vscode-remote+ssh-002dremote-002bcoder-002dvscode-002ecoder-002enoetikdev-002ecom-002d-002dtvanhens-002d-002dohio-002dtest-002d-002ddev.vscode-resource.vscode-cdn.net/home/noetik/twm/src/twm/data/ray/cosmx.py:56)     print(credentials.session_token)
     [57](https://vscode-remote+ssh-002dremote-002bcoder-002dvscode-002ecoder-002enoetikdev-002ecom-002d-002dtvanhens-002d-002dohio-002dtest-002d-002ddev.vscode-resource.vscode-cdn.net/home/noetik/twm/src/twm/data/ray/cosmx.py:57)     s3_config = daft.io.S3Config(
     [58](https://vscode-remote+ssh-002dremote-002bcoder-002dvscode-002ecoder-002enoetikdev-002ecom-002d-002dtvanhens-002d-002dohio-002dtest-002d-002ddev.vscode-resource.vscode-cdn.net/home/noetik/twm/src/twm/data/ray/cosmx.py:58)         # Waiting on: https://github.com/Eventual-Inc/Daft/issues/3322
     [59](https://vscode-remote+ssh-002dremote-002bcoder-002dvscode-002ecoder-002enoetikdev-002ecom-002d-002dtvanhens-002d-002dohio-002dtest-002d-002ddev.vscode-resource.vscode-cdn.net/home/noetik/twm/src/twm/data/ray/cosmx.py:59)         credentials_provider=get_credentials,
   (...)
     [62](https://vscode-remote+ssh-002dremote-002bcoder-002dvscode-002ecoder-002enoetikdev-002ecom-002d-002dtvanhens-002d-002dohio-002dtest-002d-002ddev.vscode-resource.vscode-cdn.net/home/noetik/twm/src/twm/data/ray/cosmx.py:62)         # session_token=credentials.session_token,
     [63](https://vscode-remote+ssh-002dremote-002bcoder-002dvscode-002ecoder-002enoetikdev-002ecom-002d-002dtvanhens-002d-002dohio-002dtest-002d-002ddev.vscode-resource.vscode-cdn.net/home/noetik/twm/src/twm/data/ray/cosmx.py:63)     )
     [64](https://vscode-remote+ssh-002dremote-002bcoder-002dvscode-002ecoder-002enoetikdev-002ecom-002d-002dtvanhens-002d-002dohio-002dtest-002d-002ddev.vscode-resource.vscode-cdn.net/home/noetik/twm/src/twm/data/ray/cosmx.py:64)     io_config = daft.io.IOConfig(s3=s3_config)

AttributeError: 'builtins.S3Credentials' object has no attribute 'session_token'
kevinzwang commented 2 days ago

Hey @tvanhens , thank you for your patience.

To follow up from Slack, I have a PR out that will fix the Ray serialization issue for writes (#3339). However, S3Config.credentials_provider will not work yet since it is not yet being used in our writers. Will make another PR to fix that. Once both are in, I'd expect your get_credentials() workflow to work for writing to S3.

kevinzwang commented 1 day ago

Since the serialization fixes have been merged into main, I created a new issue to track S3 credentials provider for writes: #3367