apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.19k stars 14.05k forks source link

Unsupported credential type AzureIdentityCredentialAdapter when using AzureDataLakeStorageV2Hook with DefaultAzureCredential as authentication method (AKS workload indentity) #39521

Open coy2k opened 4 months ago

coy2k commented 4 months ago

Apache Airflow Provider(s)

microsoft-azure

Versions of Apache Airflow Providers

apache-airflow-providers-microsoft-azure==9.0.1 azure-identity==1.15.0 azure-storage-blob==12.19.1 azure-storage-file-datalake==12.14.0

Apache Airflow version

Airflow v2.8.3 with Python 3.10.14

Operating System

Debian GNU/Linux 11 (bullseye)

Deployment

Official Apache Airflow Helm Chart

Deployment details

k8s v1.27.3 (AKS with Microsoft Entra Workload ID enabled)

Airflow helm chart v1.13.1

values.yml

# Airflow Worker Config
workers:
  serviceAccount:
    annotations:
      azure.workload.identity/client-id: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
  podAnnotations:
    azure.workload.identity/skip-containers: worker-log-groomer;worker-kerberos;git-sync;git-sync-init;wait-for-airflow-migrations;volume-permissions
  labels:
    azure.workload.identity/use: "true"

Credentials injected by workload identity as environment variables

(airflow)env | grep AZURE
AZURE_AUTHORITY_HOST=https://login.microsoftonline.com/
AZURE_CLIENT_ID=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
AZURE_FEDERATED_TOKEN_FILE=/var/run/secrets/azure/tokens/azure-identity-token
AZURE_TENANT_ID=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx

Microsoft Azure Data Lake Storage Gen2 Connection environment variable

(airflow)env | grep ADLS
AIRFLOW_CONN_ADLS_DEFAULT=adls://<storage_name>

What happened

[2024-05-09, 10:35:29 UTC] {operators.py:47} INFO - Getting list of file systems
[2024-05-09, 10:35:29 UTC] {base.py:83} INFO - Using connection ID 'adls_default' for task execution.
[2024-05-09, 10:35:29 UTC] {data_lake.py:368} INFO - account_url: https://<storage_name>.dfs.core.windows.net
[2024-05-09, 10:35:29 UTC] {taskinstance.py:2731} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
  File "/opt/airflow/plugins/adls/operators.py", line 48, in execute
    return hook.list_file_system()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/microsoft/azure/hooks/data_lake.py", line 519, in list_file_system
    file_system = self.service_client.list_file_systems(
  File "/usr/local/lib/python3.10/functools.py", line 981, in __get__
    val = self.func(instance)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/microsoft/azure/hooks/data_lake.py", line 333, in service_client
    return self.get_conn()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/microsoft/azure/hooks/data_lake.py", line 370, in get_conn
    return DataLakeServiceClient(
  File "/home/airflow/.local/lib/python3.10/site-packages/azure/storage/filedatalake/_data_lake_service_client.py", line 96, in __init__
    self._blob_service_client = BlobServiceClient(blob_account_url, credential, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/azure/storage/blob/_blob_service_client.py", line 139, in __init__
    super(BlobServiceClient, self).__init__(parsed_url, service='blob', credential=credential, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/azure/storage/blob/_shared/base_client.py", line 110, in __init__
    self._config, self._pipeline = self._create_pipeline(self.credential, sdk_moniker=self._sdk_moniker, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/azure/storage/blob/_shared/base_client.py", line 234, in _create_pipeline
    raise TypeError(f"Unsupported credential: {type(credential)}")
TypeError: Unsupported credential: <class 'airflow.providers.microsoft.azure.utils.AzureIdentityCredentialAdapter'>

What you think should happen instead

DefaultAzureCredential authentication method should work as expected using workload identity

How to reproduce

I've created a custom operator to list the filesytem of Azure Data Lake Storage Gen2 using the method "list_file_system" from AzureDataLakeStorageV2Hook class.

class ADLSListFileSystemOperator(BaseOperator):

    template_fields: Sequence[str] = ()
    ui_color = "#901dd2"

    def __init__(
        self,
        *,
        prefix: str | None = None,
        include_metadata: bool = False,
        adls_conn_id: str = "adls_default",
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.prefix = prefix
        self.include_metadata = include_metadata
        self.adls_conn_id = adls_conn_id

    def execute(self, context: Context) -> list:
        hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.adls_conn_id)
        self.log.info("Getting list of file systems")
        return hook.list_file_system()

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 4 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.