apache / polaris

Apache Polaris, the interoperable, open source catalog for Apache Iceberg
https://polaris.apache.org/
Apache License 2.0
1.17k stars 130 forks source link

[BUG] Some clients, PyIceberg, need more ADLS properties for Vended Credentials to work #418

Open sfc-gh-tbenroeck opened 3 weeks ago

sfc-gh-tbenroeck commented 3 weeks ago

Is this a possible security vulnerability?

Describe the bug

The adls.sas-token key:value being emitted is adls.sas-token.{account_host}:{sas-token} which works in Spark client but not other clients like PyIceberg #1146.

To support more clients Polaris should also set:

To Reproduce

  1. Use PyIceberg
from pyiceberg.catalog import load_catalog
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq

catalog = load_catalog(
    **{
        "type": "rest",
        "header.X-Iceberg-Access-Delegation": "vended-credentials",
        "uri": f"https://{account}.snowflakecomputing.com/polaris/api/catalog",
        "credential": f"{principal_client_id}:{principal_secret}",
        "warehouse": catalog_name,
        "scope": role,
        "token-refresh-enabled": "true",
        "py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO",
    }
)

table = catalog.load_table(f"{catalog_namespace}.{catalog_namespace_tablename}")
tablescan = table.scan()
df = tablescan.to_arrow()
Traceback (most recent call last):
  File "/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/pyiceberg/catalog/rest.py", line 375, in _fetch_access_token
    response.raise_for_status()
  File "/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/requests/models.py", line 1024, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 401 Client Error: Unauthorized for url: https://tv23016.west-us-2.azure.snowflakecomputing.com/polaris/api/catalog/v1/oauth/tokens

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/tbenroeck/Documents/code/polaris_testing/simple_polaris.py", line 21, in <module>
    catalog = load_catalog(
              ^^^^^^^^^^^^^
  File "/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/pyiceberg/catalog/__init__.py", line 261, in load_catalog
    return AVAILABLE_CATALOGS[catalog_type](name, cast(Dict[str, str], conf))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/pyiceberg/catalog/__init__.py", line 136, in load_rest
    return RestCatalog(name, **conf)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/pyiceberg/catalog/rest.py", line 263, in __init__
    self._fetch_config()
  File "/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/pyiceberg/catalog/rest.py", line 386, in _fetch_config
    with self._create_session() as session:
         ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/pyiceberg/catalog/rest.py", line 280, in _create_session
    self._refresh_token(session, self.properties.get(TOKEN))
  File "/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/pyiceberg/catalog/rest.py", line 546, in _refresh_token
    self.properties[TOKEN] = self._fetch_access_token(session, self.properties[CREDENTIAL])
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/pyiceberg/catalog/rest.py", line 377, in _fetch_access_token
    self._handle_non_200_response(exc, {400: OAuthError, 401: OAuthError})
  File "/Users/tbenroeck/Documents/code/polaris_testing/.venv/lib/python3.12/site-packages/pyiceberg/catalog/rest.py", line 471, in _handle_non_200_response
    raise exception(response) from exc
pyiceberg.exceptions.OAuthError: unauthorized_client: The client is not authorized

Actual Behavior

No response

Expected Behavior

No response

Additional context

I created a custom FileIO fix as a temporary workaround

catalog = load_catalog(
    **{
        "type": "rest",
        "header.X-Iceberg-Access-Delegation": "vended-credentials",
        "uri": f"https://{account}.snowflakecomputing.com/polaris/api/catalog",
        "credential": f"{principal_client_id}:{principal_secret}",
        "warehouse": catalog_name,
        "scope": role,
        "token-refresh-enabled": "true",
        "py-io-impl": "custom_fsspec.CustomFsspecFileIO",
    }
)
from pyiceberg.io.fsspec import FsspecFileIO, _adls
from urllib.parse import urlparse
from pyiceberg.io import (ADLS_ACCOUNT_NAME,ADLS_SAS_TOKEN, ADLFS_ACCOUNT_NAME, ADLFS_SAS_TOKEN)
from pyiceberg.utils.properties import get_first_property_value
from fsspec import AbstractFileSystem
from pyiceberg.typedef import Properties

class CustomFsspecFileIO(FsspecFileIO):
    def __init__(self, properties):
        # Short term fix for https://github.com/apache/iceberg-python/issues/961 and https://github.com/apache/iceberg-python/issues/1146
        base_location = properties.get('default-base-location')
        if base_location and base_location.startswith('abfs'):
            account_name = get_first_property_value(properties,ADLS_ACCOUNT_NAME,ADLFS_ACCOUNT_NAME)
            sas_token = get_first_property_value(properties,ADLS_SAS_TOKEN,ADLFS_SAS_TOKEN)

            if sas_token is None:
                for key, value in properties.items():
                    key = key.replace('adlfs.', 'adls.')
                    if key.startswith(ADLS_SAS_TOKEN):
                        properties[ADLS_SAS_TOKEN] = value
                        if key.endswith('.windows.net'):
                            if account_name is None:
                                account_host = key.removeprefix(f"{ADLS_SAS_TOKEN}.")
                                account_name = account_host.split('.')[0]
                                properties[ADLS_ACCOUNT_NAME] = account_name
                                properties['adls.account-host'] = account_host
                        break  # Exit loop after finding the first match
        super().__init__(properties)

    def _get_fs(self, scheme: str):
        if scheme in ["abfs", "abfss", "wasb", "wasbs"]:
            if scheme in ["wasb"]:
                scheme = 'abfs'
            if scheme in ["wasbs"]:
                scheme = 'abfss'
            adls_fs = _adls(self.properties)
            return adls_fs

        # If not adls proceed with the original behavior
        return super()._get_fs(scheme)

    def new_input(self, location: str):
        # Replace wasb(s):// with adfs(s):// in the location
        uri = urlparse(location)
        if uri.scheme in ["wasb"]:
            location = location.replace(f"{uri.scheme}://", "abfs://")
        if uri.scheme in ["wasbs"]:
            location = location.replace(f"{uri.scheme}://", "abfss://")
        return super().new_input(location)

    def new_output(self, location: str):
        # Replace wasb(s):// with adfs:// in the location
        uri = urlparse(location)
        if uri.scheme in ["wasb"]:
            location = location.replace(f"{uri.scheme}://", "abfs://")
        if uri.scheme in ["wasbs"]:
            location = location.replace(f"{uri.scheme}://", "abfss://")
        return super().new_output(location)

def _adls(properties: Properties) -> AbstractFileSystem:
    from adlfs import AzureBlobFileSystem
    return AzureBlobFileSystem(
        account_host = properties['adls.account-host'],
        account_name=properties[ADLS_ACCOUNT_NAME],
        sas_token=properties[ADLS_SAS_TOKEN]
    )

System information

No response

eric-maynard commented 2 weeks ago

@sfc-gh-tbenroeck do you think we can just fix it in pyiceberg? From the linked thread, the community over there seems receptive to it