PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.35k stars 1.5k forks source link

Error when using blob_storage_list #13106

Open ibombonato opened 1 year ago

ibombonato commented 1 year ago

I am receiving what seems a pydantic validation error when trying to list blob from storage.

Crash detected! Execution was interrupted by an unexpected exception: KeyError: 0

If I use list_blobs, from oficial Azure Storage package, it works. The problem happens when I use the prefect azure package.

Steps to reproduce:

Python 3.8.10

start a docker container with Azurite to simulate an Azure Blob Storage. Fwiw, the problem happens in a real Azure Storage to. docker run --name azurite -d -p 10000:10000 -v blob_storage:/var/lib/azure-blob/data mcr.microsoft.com/azure-storage/azurite azurite-blob --blobHost 0.0.0.0 --loose

The flow bellow will connect to Azurite, create a container, put some files on it, list it and after all, delete the container.

import tempfile
import asyncio
from prefect import flow, get_run_logger
from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_list, blob_storage_upload

@flow
def sample_blob_storage_upload(blob_storage_credentials, container):

    with tempfile.TemporaryFile("rb") as f:
        for i in range(2):
            blob_storage_upload(
                data=f.read(),
                container=container,
                blob=f"upload_blob_{i}.txt",
                blob_storage_credentials=blob_storage_credentials,
                overwrite=True,
            )
@flow
async def clean(blob_service_client, container_name):
    # remove container
    await blob_service_client.delete_container(container_name)

@flow
async def run_blob_files():
    logger = get_run_logger()
    logger.info("Connecting...")
    connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;"
    container_name = "my-container"
    blob_storage_credentials = AzureBlobStorageCredentials(
        connection_string=connection_string,
    )

    blob_service_client = blob_storage_credentials.get_client()

    # create container
    logger.info("Creating container...")
    try:
        await blob_service_client.create_container(container_name)
    except:
        pass

    # upload files
    logger.info("Uploading files...")
    sample_blob_storage_upload(blob_storage_credentials, container_name)

    # list files
    logger.warning("Listing files...")

    #ERROR HAPPENS HERE!
    blobs = await blob_storage_list(
        container=container_name,
        blob_storage_credentials=blob_storage_credentials,
    )

    assert blobs == list(range(2))

    # remove container
    logger.info("Removing container...")
    await clean(blob_service_client, container_name)

    logger.info("Closing connection...")
    await blob_service_client.close()

asyncio.run(run_blob_files())

Error:

python flows/azure_error.py 
17:09:49.874 | INFO    | prefect.engine - Created flow run 'naughty-taipan' for flow 'run-blob-files'
17:09:50.060 | INFO    | Flow run 'naughty-taipan' - Connecting...
17:09:50.063 | INFO    | Flow run 'naughty-taipan' - Creating container...
17:09:50.077 | INFO    | Flow run 'naughty-taipan' - Uploading files...
17:09:51.935 | INFO    | Flow run 'naughty-taipan' - Created subflow run 'delightful-octopus' for flow 'sample-blob-storage-upload'
17:09:52.040 | INFO    | Flow run 'delightful-octopus' - Created task run 'blob_storage_upload-43105911-0' for task 
'blob_storage_upload'
17:09:52.041 | INFO    | Flow run 'delightful-octopus' - Executing 'blob_storage_upload-43105911-0' immediately...
17:09:52.096 | INFO    | Task run 'blob_storage_upload-43105911-0' - Uploading blob to container my-container with key upload_blob_0.txt
17:09:52.197 | INFO    | Task run 'blob_storage_upload-43105911-0' - Finished in state Completed()
17:09:52.254 | INFO    | Flow run 'delightful-octopus' - Created task run 'blob_storage_upload-43105911-1' for task 
'blob_storage_upload'
17:09:52.258 | INFO    | Flow run 'delightful-octopus' - Executing 'blob_storage_upload-43105911-1' immediately...
17:09:52.303 | INFO    | Task run 'blob_storage_upload-43105911-1' - Uploading blob to container my-container with key upload_blob_1.txt
17:09:52.337 | INFO    | Task run 'blob_storage_upload-43105911-1' - Finished in state Completed()
17:09:52.375 | INFO    | Flow run 'delightful-octopus' - Finished in state Completed('All states completed.')
17:09:52.377 | WARNING | Flow run 'naughty-taipan' - Listing files...
17:09:52.413 | INFO    | Flow run 'naughty-taipan' - Created task run 'blob_storage_list-266e54b8-0' for task 'blob_storage_list'
17:09:52.414 | INFO    | Flow run 'naughty-taipan' - Executing 'blob_storage_list-266e54b8-0' immediately...
17:09:52.463 | INFO    | Task run 'blob_storage_list-266e54b8-0' - Listing blobs from container my-container
17:09:52.471 | ERROR   | Task run 'blob_storage_list-266e54b8-0' - Crash detected! Execution was interrupted by an unexpected exception:
KeyError: 0

17:09:52.500 | ERROR   | Flow run 'naughty-taipan' - Encountered exception during execution:
17:09:52.552 | ERROR   | Flow run 'naughty-taipan' - Finished in state Failed('Flow run encountered an exception. KeyError: 0\n')
Traceback (most recent call last):
  File "flows/azure_error.py", line 65, in <module>
    asyncio.run(run_blob_files())
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/engine.py", line 237, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/engine.py", line 603, in orchestrate_flow_run
    result = await flow_call()
  File "flows/azure_error.py", line 50, in run_blob_files
    blobs = await blob_storage_list(
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/engine.py", line 874, in get_task_call_return_value
    return await future._result()
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/task_runners.py", line 204, in submit
    result = await call()
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1108, in begin_task_run
    return await orchestrate_task_run(
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1264, in orchestrate_task_run
    terminal_state = await return_value_to_state(
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/states.py", line 257, in return_value_to_state
    return Completed(data=await result_factory.create_result(retval))
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/states.py", line 430, in Completed
    return schemas.states.Completed(cls=cls, **kwargs)
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 233, in Completed
    return cls(type=StateType.COMPLETED, **kwargs)
  File "pydantic/main.py", line 340, in pydantic.main.BaseModel.__init__
  File "pydantic/main.py", line 1076, in pydantic.main.validate_model
  File "pydantic/fields.py", line 884, in pydantic.fields.ModelField.validate
  File "pydantic/fields.py", line 1094, in pydantic.fields.ModelField._validate_singleton
  File "pydantic/fields.py", line 892, in pydantic.fields.ModelField.validate
  File "pydantic/fields.py", line 1148, in pydantic.fields.ModelField._apply_validators
  File "pydantic/class_validators.py", line 318, in pydantic.class_validators._generic_validator_basic.lambda13
  File "pydantic/main.py", line 716, in pydantic.main.BaseModel.validate
  File "/home/ibombonato/repos/prefect-pipelines/.venv/lib/python3.8/site-packages/azure/storage/blob/_shared/models.py", line 197, in __getitem__
    return self.__dict__[key]
KeyError: 0

requirements.txt

prefect
prefect-azure
adlfs

pip freeze:

adal==1.2.7
adlfs==2022.10.0
aiohttp==3.8.3
aiosignal==1.3.1
aiosqlite==0.17.0
alembic==1.8.1
anyio==3.6.2
apprise==1.2.0
asgi-lifespan==2.0.0
async-timeout==4.0.2
asyncpg==0.27.0
attrs==22.1.0
azure-common==1.1.28
azure-core==1.26.1
azure-datalake-store==0.0.52
azure-identity==1.12.0
azure-mgmt-containerinstance==10.0.0
azure-mgmt-core==1.3.2
azure-mgmt-resource==21.2.1
azure-storage-blob==12.14.1
cachetools==5.2.0
certifi==2022.9.24
cffi==1.15.1
charset-normalizer==2.1.1
click==8.1.3
cloudpickle==2.2.0
colorama==0.4.6
commonmark==0.9.1
coolname==2.0.0
croniter==1.3.7
cryptography==38.0.3
docker==6.0.1
fastapi==0.87.0
frozenlist==1.3.3
fsspec==2022.11.0
google-auth==2.14.1
greenlet==2.0.1
griffe==0.24.0
h11==0.14.0
h2==4.1.0
hpack==4.0.0
httpcore==0.15.0
httpx==0.23.0
hyperframe==6.0.1
idna==3.4
importlib-metadata==5.0.0
importlib-resources==5.10.0
isodate==0.6.1
jsonpatch==1.32
jsonpointer==2.3
kubernetes==25.3.0
Mako==1.2.4
Markdown==3.4.1
MarkupSafe==2.1.1
msal==1.20.0
msal-extensions==1.0.0
msrest==0.7.1
multidict==6.0.2
oauthlib==3.2.2
orjson==3.8.1
packaging==21.3
pathspec==0.10.2
pendulum==2.1.2
portalocker==2.6.0
prefect==2.6.7
prefect-azure==0.2.2
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycparser==2.21
pydantic==1.10.2
Pygments==2.13.0
PyJWT==2.6.0
pyparsing==3.0.9
python-dateutil==2.8.2
python-slugify==6.1.2
pytz==2022.6
pytzdata==2020.1
PyYAML==6.0
readchar==4.0.3
requests==2.28.1
requests-oauthlib==1.3.1
rfc3986==1.5.0
rich==12.6.0
rsa==4.9
six==1.16.0
sniffio==1.3.0
SQLAlchemy==1.4.44
starlette==0.21.0
text-unidecode==1.3
toml==0.10.2
typer==0.7.0
typing-extensions==4.4.0
urllib3==1.26.12
uvicorn==0.19.0
websocket-client==1.4.2
yarl==1.8.1
zipp==3.10.0
chicago-joe commented 1 year ago

can report the same issue using Python 3.9.15

env: requirements.txt

ahuang11 commented 1 year ago

Thanks! I can reproduce with:

from prefect import flow

from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_list, blob_storage_upload

@flow
def example_blob_storage_list_flow():
    blob_storage_credentials = AzureBlobStorageCredentials.load("azure-connection-string")
    container = "test-container"
    blob_storage_upload(
        b"some-data",
        container=container,
        blob_storage_credentials=blob_storage_credentials,
        blob="test-data"
    )
    data = blob_storage_list(
        container=container,
        blob_storage_credentials=blob_storage_credentials,
    )
    return data

example_blob_storage_list_flow()