apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.21k stars 13.76k forks source link

Azure Datalake Storage V2 ObjectStoragePath connection issues #40410

Open fritz-astronomer opened 4 days ago

fritz-astronomer commented 4 days ago

Apache Airflow version

2.9.2

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Connection parsing seems buggy with the Azure implementation for ObjectStoragePath - requiring specific extras in specific places that don't really make sense. This is also inconsistent with the AzureDataLakeStorageV2Hook connection parsing

Additionally - there is no documentation at all about an Azure implementation for ObjectStoragePath - so we should make sure to have a doc associated with the provider.

Furthermore, this is a Microsoft problem - but why there are three solutions for the same thing, each with different terminology, in varying degrees of supported or deprecated - is wicked confusing.

What you think should happen instead?

No response

How to reproduce

1) ✅ extras.connection_string - works for both the Hook and Object Storage, without issue:

import os
os.environ["AIRFLOW_CONN_ADLS"] = '{"conn_type": "adls", "extra": {"connection_string": "..."}}'

from airflow.providers.microsoft.azure.fs.adls import get_fs
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeStorageV2Hook

# Hook
print(list(AzureDataLakeStorageV2Hook(adls_conn_id="ADLS").get_conn().list_file_systems()))
# Object Storage
get_fs("ADLS")

2) ❌ host+login+password+extras.tenant_id - ✅ works for the Hook, ❌ DOES NOT WORK for Object Storage:

import os
os.environ["AIRFLOW_CONN_ADLS"] = '{"conn_type": "adls", "host": "myfilesystem", "login": "...", "password": "...", "extra": {"tenant_id": "..."}}'

from airflow.providers.microsoft.azure.fs.adls import get_fs
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeStorageV2Hook

# Hook
print(list(AzureDataLakeStorageV2Hook(adls_conn_id="ADLS").get_conn().list_file_systems()))
# Object Storage
get_fs("ADLS")
# ValueError: unable to connect to account for Must provide either a connection_string or account_name with credentials!!

Error from adlfs.spec@do_connect

3) ❌ host+login+password+extras.tenant_id+extras.account_name (not documented). Works for both

import os
os.environ["AIRFLOW_CONN_ADLS"] = '{"conn_type": "adls", "host": "myfilesystem", "login": "...", "password": "...", "extra": {"tenant_id": "...", "account_name": "myfilesystem"}}'

from airflow.providers.microsoft.azure.fs.adls import get_fs
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeStorageV2Hook

# Hook
print(list(AzureDataLakeStorageV2Hook(adls_conn_id="ADLS").get_conn().list_file_systems()))
# Object Storage
get_fs("ADLS")

# test #2
get_fs("ADLS").ls("/")
# ClientAuthenticationError: Server failed to authenticate the request. Please refer to the information in the ww-authentication header.
# 'WWW-Authenticate': 'Bearer authorization_uri=https://login.microsoftonline.com/.../oauth2/authorize resource_id=https://storage.azure.com"

(edit: I initially thought this was working, as get_fs returns successfully, but as soon as I attempt to use it it fails. I've tried a number of other combinations, such as including account_url and client_secret_auth_config in extra - none are working)

Operating System

Astronomer/Docker

Versions of Apache Airflow Providers

No response

Deployment

Astronomer

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

fritz-astronomer commented 4 days ago

I can get this working with the client directly - the problem is 100% in the get_fs method

from azure.identity.aio import ClientSecretCredential
from adlfs import AzureBlobFileSystem

print(AzureBlobFileSystem(
  account_name="...",
  credential=ClientSecretCredential(
    tenant_id="...",
    client_id="...",
    client_secret="...",
).ls('/'))