PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.48k stars 1.64k forks source link

Deployment with `azure` block and `docker-container` infra cannot retrieve the flow from the storage #6503

Closed trymzet closed 2 years ago

trymzet commented 2 years ago

First check

Bug summary

It seems like Prefect is trying to use the local file system as if it were ADL, as well as not using the path defined in the storage block, resulting in IsADirectoryError (/home/viadot is the working directory in my container -- I would expect Prefect to be looking for the flow in the basepath defined in my Azure storage block).

Reproduction

prefect deployment build fake_ingest.py:run \
  -n test_deployment \
  -t dev \
  -q dev \
  -sb azure/my-azure-block \
  -ib docker-container/my-container-block \
  -o deployments/test_deployment.yaml
prefect deployment apply deployments/test_deployment.yaml
prefect agent -q dev
prefect deployment run <FLOW_NAME>/test_deployment

Error

14:48:52.178 | INFO    | prefect.infrastructure.docker-container - Docker container 'organic-bear' has status 'running'
12:48:54.903 | ERROR   | Flow run 'organic-bear' - Flow could not be retrieved from deployment.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 246, in retrieve_flow_then_begin_flow_run
    flow = await load_flow_from_flow_run(flow_run, client=client)
  File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 105, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect/deployments.py", line 55, in load_flow_from_flow_run
    await storage_block.get_directory(from_path=None, local_path=".")
  File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 555, in get_directory
    return await self.filesystem.get_directory(
  File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 251, in get_directory
    return self.filesystem.get(from_path, local_path, recursive=True)
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 111, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 96, in sync
    raise return_result
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 53, in _runner
    result[0] = await coro
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 546, in _get
    return await _run_coros_in_chunks(
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 267, in _run_coros_in_chunks
    await asyncio.gather(*chunk, return_exceptions=return_exceptions),
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 408, in wait_for
    return await fut
  File "/usr/local/lib/python3.10/site-packages/adlfs/spec.py", line 1629, in _get_file
    with open(lpath, "wb") as my_blob:
IsADirectoryError: [Errno 21] Is a directory: '/home/viadot/.'

Versions

Version:             2.1.1
API version:         0.8.0
Python version:      3.10.6
Git commit:          dc2ba222
Built:               Thu, Aug 18, 2022 10:18 AM
OS/Arch:             linux/x86_64
Profile:             ***
Server type:         hosted

Additional context

For full picture, I'm also getting the following warning when running prefect deployment build. I'll ask somewhere on the forums how to fix it as creating a new block didn't help. /usr/local/lib/python3.10/site-packages/prefect/blocks/core.py:573: UserWarning: Block document has schema checksum sha256:cc5fd7bf7e129ced2bf2b25cb0fac04e0c0596464bb5a001c699ddf43d05f752 which does not match the schema checksum for class 'DockerContainer'. This indicates the schema has changed and this block may not load. return cls._from_block_document(block_document)

cicdw commented 2 years ago

Hi @Trymzet - could you share the following information so I can investigate a bit deeper:

Thank you!

trymzet commented 2 years ago

Hi, yes, I still get this with Prefect 2.3.2

###
### A complete description of a Prefect Deployment for flow 'Ingest Fake C4C Data'
###
name: c4c_contact
description: |-
  Extract & Load flow for the Cloud for Customers source.

  The parameters are made up as we generate fake data for demoing/development purposes.

  Args:
      schema_name (str): The name of the schema from which to pull the table.
      table_name (str): The name of the table to be replicated.
      other_param (str, optional): Some other parameter. Parameters are defined on a per-source basis. Defaults to None.
version: a1cabc481f6e82d0a52d4531abc156e1
# The work queue that will handle this deployment's runs
work_queue_name: dev
tags:
- dev
- ingestion
parameters: {}
schedule: null
infra_overrides: {}

###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: Ingest Fake C4C Data
manifest_path: null
infrastructure:
  type: docker-container
  env:
    TEST_ENV: test_env_value
  labels: {}
  name: null
  command:
  - python
  - -m
  - prefect.engine
  image: viadot:orion
  image_pull_policy: null
  image_registry: null
  networks: []
  network_mode: null
  auto_remove: false
  volumes:
  - /home/trymzet/dyvenia/viadot/.config/credentials.json:/home/viadot/.config/credentials.json
  - /home/trymzet/.databricks-connect:/home/viadot/.databricks-connect
  stream_output: true
  _block_document_id: 25394993-6a73-49be-a805-b260b9c510ea
  _block_document_name: viadot-orion
  _is_anonymous: false
  _block_type_slug: docker-container
storage:
  bucket_path: tests/data_platform/prefect/flows
  azure_storage_connection_string: null
  azure_storage_account_name: '**********'
  azure_storage_account_key: '**********'
  _block_document_id: ab389c14-e6f1-4389-ae6d-4eefc4e10b34
  _block_document_name: adls-dyvenia
  _is_anonymous: false
  _block_type_slug: azure
path: ''
entrypoint: cloud_for_customer.py:run
parameter_openapi_schema:
  title: Parameters
  type: object
  properties:
    schema_name:
      title: schema_name
      type: string
    table_name:
      title: table_name
      type: string
    other_param:
      title: other_param
      type: string
  required:
  - schema_name
  - table_name
  definitions: null

Everything looks ok, I'm only not sure what the bucket_path parameter means because 1) in Azure you have containers, not buckers, and 2) they don't really have paths AFAIK but rather names. The bucket name in my example is called tests, however I'd like to load the files inside the tests/data_platform/prefect/flows path, not directly under the bucket.

The only change is that now I'm getting two warnings instead of one about the block document has schema checksum, now for both Azure and DockerContainer.

trybjo commented 2 years ago

Hi, I experience the same issue. But the prefect agent (docker) is able to load the flow script if it is stored directly in the blob container (named prefect in this case) . The issue might be related to the way folder functionality is implemented in Azure datalake gen2 compared to AWS S3. And, also the bucket_path argument for Azure storage is a bit misleading as mentioned above.

Setup:

Deployment with flow script located at root of Azure container (runs just fine)

###
### A complete description of a Prefect Deployment for flow 'healthcheck-root'
###
name: health
description: null
version: 5a276c8302490e18630c5f7d8be77666
# The work queue that will handle this deployment's runs
work_queue_name: dev-queue
tags: []
parameters: {}
schedule: null
infra_overrides: {}
infrastructure:
  type: process
  env: {}
  labels: {}
  name: null
  command:
  - python
  - -m
  - prefect.engine
  stream_output: true
  _block_type_slug: process

###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: healthcheck-root
manifest_path: null
storage:
  bucket_path: prefect
  azure_storage_connection_string: '**********'
  azure_storage_account_name: null
  azure_storage_account_key: null
  _block_document_id: ba1910b3-a57c-4498-932a-33ddb82ea562
  _block_document_name: prefect
  _is_anonymous: false
  _block_type_slug: azure
path: ''
entrypoint: healthcheck.py:healthcheck_root
parameter_openapi_schema:
  title: Parameters
  type: object
  properties: {}
  required: null
  definitions: null

image

Deployment with flow script located in flows folder of Azure container (fails every time)

###
### A complete description of a Prefect Deployment for flow 'healthcheck'
###
name: health
description: null
version: bc67df0fd3d93103af327c9151571ebb
# The work queue that will handle this deployment's runs
work_queue_name: dev-queue
tags: []
parameters: {}
schedule: null
infra_overrides: {}

###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: healthcheck
manifest_path: null
infrastructure:
  type: process
  env: {}
  labels: {}
  name: null
  command:
  - python
  - -m
  - prefect.engine
  stream_output: true
  _block_document_id: f1e005e8-1257-4c0a-a123-58546f265f99
  _block_document_name: anonymous-a8313519-6cdb-4b04-9219-d521ae3da5a0
  _is_anonymous: true
  _block_type_slug: process
storage:
  bucket_path: prefect
  azure_storage_connection_string: '**********'
  azure_storage_account_name: null
  azure_storage_account_key: null
  _block_document_id: ba1910b3-a57c-4498-932a-33ddb82ea562
  _block_document_name: prefect
  _is_anonymous: false
  _block_type_slug: azure
path: ''
entrypoint: flows\healthcheck.py:healthcheck
parameter_openapi_schema:
  title: Parameters
  type: object
  properties: {}
  required: null
  definitions: null

image

trymzet commented 2 years ago

Ok I tried specifying just the container name (tests) in the Azure block as you did and passing the path in prefect deployment build --path (data_platform/prefect/flows), it gives me a different error now (the file is uploaded to the correct ADLS path as expected). I also get the same error when trying to replicate what @trybjo did (providing container name as bucket_path and then not specifying the path in prefect deployment build. The commands I use:

prefect deployment build cloud_for_customer.py:run \
    -n c4c_contact \
    -t dev \
    -t ingestion \
    -q dev \
    -ib docker-container/viadot-orion \
    -sb azure/adls-dyvenia \
    --path tests/data_platform/prefect/flows \
    -o deployments/c4c_contact.yaml \
    --skip-upload
prefect deployment apply --upload deployments/c4c_contact.yaml

Testing without the path arg:

prefect deployment build cloud_for_customer.py:run \
    -n c4c_contact \
    -t dev \
    -t ingestion \
    -q dev \
    -ib docker-container/viadot-orion \
    -sb azure/adls-dyvenia \
    -o deployments/c4c_contact.yaml \
    --skip-upload
prefect deployment apply --upload deployments/c4c_contact.yaml
Flow could not be retrieved from deployment.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 254, in retrieve_flow_then_begin_flow_run
    flow = await load_flow_from_flow_run(flow_run, client=client)
  File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect/deployments.py", line 55, in load_flow_from_flow_run
    await storage_block.get_directory(from_path=None, local_path=".")
  File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 574, in get_directory
    return await self.filesystem.get_directory(
  File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 260, in get_directory
    return self.filesystem.get(from_path, local_path, recursive=True)
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 111, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 96, in sync
    raise return_result
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 53, in _runner
    result[0] = await coro
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 546, in _get
    return await _run_coros_in_chunks(
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 267, in _run_coros_in_chunks
    await asyncio.gather(*chunk, return_exceptions=return_exceptions),
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 408, in wait_for
    return await fut
  File "/usr/local/lib/python3.10/site-packages/adlfs/spec.py", line 1626, in _get_file
    async with self.service_client.get_blob_client(
  File "/usr/local/lib/python3.10/site-packages/azure/storage/blob/aio/_blob_service_client_async.py", line 671, in get_blob_client
    return BlobClient( # type: ignore
  File "/usr/local/lib/python3.10/site-packages/azure/storage/blob/aio/_blob_client_async.py", line 115, in __init__
    super(BlobClient, self).__init__(
  File "/usr/local/lib/python3.10/site-packages/azure/storage/blob/_blob_client.py", line 158, in __init__
    raise ValueError("Please specify a container name and blob name.")
ValueError: Please specify a container name and blob name.

Based on this log

File "/usr/local/lib/python3.10/site-packages/prefect/deployments.py", line 55, in load_flow_from_flow_run
    await storage_block.get_directory(from_path=None, local_path=".")

you can see that deployment.path is None, might be a good place to start investigating. https://github.com/PrefectHQ/prefect/blob/4ab989b1755f9c395fb191325fe2d4e2678caa1c/src/prefect/deployments.py#L54

anna-geller commented 2 years ago

I just tried to reproduce on the latest version with the default Azure Blob Storage configuration, and while there are some serialization warnings, the flow executes properly image

image

image

Docker block:

from prefect.infrastructure import DockerContainer

docker_block = DockerContainer(
    image="prefecthq/prefect:2.4.0-python3.9",  # this will always use the latest Prefect version
    env={"EXTRA_PIP_PACKAGES": "adlfs"},
    image_pull_policy="ALWAYS",  # to always pull the latest Prefect image
)
docker_block.save("az", overwrite=True)

AZ block:

from prefect.filesystems import Azure

az = Azure(bucket_path="prefect/deployments", azure_storage_connection_string="xxx")
az.save("prod", overwrite=True)

flow:

import platform
import prefect
from prefect import task, flow, get_run_logger
from prefect.orion.api.server import ORION_API_VERSION
import sys

@task
def log_platform_info():
    logger = get_run_logger()
    logger.info("Host's network name = %s", platform.node())
    logger.info("Python version = %s", platform.python_version())
    logger.info("Platform information (instance type) = %s ", platform.platform())
    logger.info("OS/Arch = %s/%s", sys.platform, platform.machine())
    logger.info("Prefect Version = %s 🚀", prefect.__version__)
    logger.info("Prefect API Version = %s", ORION_API_VERSION)

@flow
def healthcheck():
    log_platform_info()

if __name__ == "__main__":
    healthcheck()

Version:

Version:             2.4.0
API version:         0.8.0
Python version:      3.9.12
Git commit:          513639e8
Built:               Tue, Sep 13, 2022 2:15 PM
OS/Arch:             darwin/arm64
Profile:             dev
Server type:         cloud
anna-geller commented 2 years ago

I guess I was using v2

image

anna-geller commented 2 years ago

I wouldn't focus as much here on a comparison with S3 but rather on the generic implementation from fsspec https://github.com/fsspec/adlfs which unifies and abstracts those differences away (whether some service names it a bucket or container, it relates to the same concept of a grouping mechanism for objects)

trymzet commented 2 years ago

I did some digging and I found this was an issue with fsspec, fixed in version 2022.02: https://github.com/fsspec/adlfs/pull/285. Installing that version fixes the IsADirectoryError from storage_block.get_directory() for me. Prefect has a higher version in its requirements.txt so this was due to my environment.

anna-geller commented 2 years ago

that's great to hear -- feel free to keep the issue open if something is still not working, thanks for digging deeper into this, pinning down the exact version that fixes the issue is extremely helpful

trymzet commented 2 years ago

Thanks, I'll close this as the issue is solved, everything is working as expected. :)

trybjo commented 2 years ago

Thanks guys!