astronomer / astronomer-cosmos

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code
https://astronomer.github.io/astronomer-cosmos/
Apache License 2.0
590 stars 152 forks source link

[Bug] Cannot use S3 as a remote manifest path #1126

Open ogescalante opened 1 month ago

ogescalante commented 1 month ago

Astronomer Cosmos Version

Other Astronomer Cosmos version (please specify below)

If "Other Astronomer Cosmos version" selected, which one?

1.6.0a3

dbt-core version

1.8.1

Versions of dbt adapters

No response

LoadMode

CUSTOM

ExecutionMode

KUBERNETES

InvocationMode

None

airflow version

2.9.2

Operating System

Debian GNU/Linux 12 (bookworm)

If a you think it's an UI issue, what browsers are you seeing the problem on?

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

What happened?

I cannot see how to use S3 as remote location for my Manifest file, I keep getting this error:

image

My dag code:

from datetime import datetime
from airflow.models import Variable
from airflow import DAG
from cosmos import (
    ProjectConfig,
    ExecutionConfig,
    DbtTaskGroup,
    ExecutionMode,
    RenderConfig,
    LoadMode,
    ProfileConfig
)

import include.constants as ct

with DAG(
    dag_id="artemis_dag",
    start_date=datetime(2023, 7, 10),
    schedule_interval=None,
    default_args=ct.DEFAULT_ARGS,
    catchup=False,
    tags=["Artemis Pipeline"],
    description="Orchestrating the DBT models of the artemis-pipeline project",
    max_active_runs="{{ var.value.get('artemis_active_runs', 1 }}",
    max_active_tasks="{{ var.value.get('artemis_max_active_tasks', 32 }}",
) as dag:
    run_artemis_pipeline = DbtTaskGroup(
        group_id="artemis_pipeline",
        render_config=RenderConfig(load_method=LoadMode.DBT_MANIFEST),
        project_config=ProjectConfig(
            dbt_project_path=ct.ARTEMIS_PROJECT_DIR,
            manifest_path="s3://***/manifest.json"
        ),
        execution_config=ExecutionConfig(
            execution_mode=ExecutionMode.KUBERNETES,
        ),
        operator_args={
            "image": Variable.get("DBT_IMAGE"),
            "namespace": Variable.get("NAMESPACE"),
            "get_logs": True,
            "is_delete_operator_pod": True,
        },
    )

    run_artemis_pipeline

I've installed the astronomer-cosmos[amazon] but the problem persists. If I try to use the manifest_conn_id the ui says this argument does not exists.

Relevant log output

No response

How to reproduce

try creating a DAG passing s3 as the manifest path location.

Anything else :)?

No response

Are you willing to submit PR?

Contact Details

No response

dosubot[bot] commented 1 month ago

Hello @ogescalante! I'm here to assist you with any bugs, questions, or contributions you have. Let's work together to resolve the issue you're encountering.

To resolve the CosmosValueError caused by a missing manifest file at the specified S3 path, ensure the manifest file exists at the specified path and that the path is correctly set. Here are the steps to follow:

  1. Verify the S3 Path: Ensure the manifest_path is correctly set to the S3 path where the manifest file is stored.
  2. Check File Existence: Confirm that the manifest file actually exists at the specified S3 path.

Here's an example of how to set the manifest_path:

from pathlib import Path

# Example of setting the manifest path
manifest_path = Path("s3://your-bucket/path/to/manifest.json")

# Ensure the path is set correctly in your configuration
project_config = ProjectConfig(
    dbt_project_path=ct.ARTEMIS_PROJECT_DIR,
    manifest_path=manifest_path,
    project_name="artemis_pipeline"
)

# Validate the project to check for the existence of the manifest file
project_config.validate_project()

Additionally, ensure that the astronomer-cosmos[amazon] package handles S3 paths correctly. The open_s3_file function uses the S3Hook from the airflow.providers.amazon.aws.hooks.s3 module to read the content of a file from an S3 bucket. Make sure the conn_id (AWS connection ID) is correctly configured in your Airflow connections.

Here's the relevant code for handling S3 paths:

def bucket_and_key(path: str) -> Tuple[str, str]:
    from urllib.parse import urlsplit
    parsed_url = urlsplit(path)
    bucket = parsed_url.netloc
    key = parsed_url.path.lstrip("/")
    return bucket, key

def open_s3_file(path: str, conn_id: Optional[str]) -> str:
    from airflow.providers.amazon.aws.hooks.s3 import S3Hook
    from botocore.exceptions import ClientError

    if conn_id is None:
        conn_id = S3Hook.default_conn_name

    hook = S3Hook(aws_conn_id=conn_id)
    bucket, key = bucket_and_key(path)
    try:
        content = hook.read_key(key=key, bucket_name=bucket)
    except ClientError as e:
        if e.response.get("Error", {}).get("Code", "") == "NoSuchKey":
            raise FileNotFoundError(f"{path} does not exist")
        raise e
    return content

If the manifest_conn_id argument is not recognized in the UI, it could be due to a typo or mismatch in the connection ID used in the DAG and the one configured in Airflow. Ensure that the connection ID is correctly configured in the Airflow connections and matches the one used in the DAG configuration.

For example:

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=PostgresUserPasswordProfileMapping(
        conn_id="example_conn",
        profile_args={"schema": "public"},
    ),
)

Make sure the connection ID example_conn is correctly set up in Airflow.

By following these steps, you should be able to resolve the issue with using S3 as a remote location for your manifest file in Astronomer Cosmos version 1.6.0a3 [1][2][3].

To continue talking to Dosu, mention @dosu.

pankajkoti commented 1 month ago

hi @ogescalante , we have not included the PR https://github.com/astronomer/astronomer-cosmos/pull/1109 that adds support for using remote manifest paths yet in any of our pre-releases. With 1.6.0a3, it is still trying to consider your remote path as a local path and is also not accepting the parameter manifest_conn_id as it was only introduced in the PR #1109 which is yet to be released/pre-released. I will discuss with the team if we can create a pre-release sometime soon that includes the PR so you can then test it.

ogescalante commented 1 month ago

Hi @pankajkoti thanks for getting back on this, consider please removing it from the official docs meanwhile since it is pretty frustrating to not being able to do this. https://astronomer.github.io/astronomer-cosmos/configuration/parsing-methods.html

ogescalante commented 1 month ago

For those with the same problem, my workarounf was implementing an s3 hook myself:

from datetime import datetime
from airflow.models import Variable
from airflow import DAG
from cosmos import (
    ProjectConfig,
    ExecutionConfig,
    DbtTaskGroup,
    ExecutionMode,
    RenderConfig,
    LoadMode,
    ProfileConfig
)
from airflow.hooks.S3_hook import S3Hook

import include.constants as ct

manifest_path = ct.ARTEMIS_PROJECT_DIR / "target" / "manifest.json"

def download_file_from_s3():
    s3 = S3Hook()
    bucket_name = ***
    file_key = 'manifest.json'

    s3.get_key(file_key, bucket_name).download_file(manifest_path)

with DAG(
    dag_id="artemis_dag",
    start_date=datetime(2023, 7, 10),
    schedule_interval=None,
    default_args=ct.DEFAULT_ARGS,
    catchup=False,
    tags=["Artemis Pipeline"],
    description="Orchestrating the DBT models of the artemis-pipeline project",
    max_active_runs="{{ var.value.get('artemis_active_runs', 1 }}",
    max_active_tasks="{{ var.value.get('artemis_max_active_tasks', 32 }}",
) as dag:
    run_artemis_pipeline = DbtTaskGroup(
        group_id="artemis_pipeline",
        render_config=RenderConfig(load_method=LoadMode.DBT_MANIFEST),
        project_config=ProjectConfig(
            dbt_project_path=ct.ARTEMIS_PROJECT_DIR,
            manifest_path=manifest_path
        ),
        execution_config=ExecutionConfig(
            execution_mode=ExecutionMode.KUBERNETES,
        ),
        operator_args={
            "image": Variable.get("DBT_IMAGE"),
            "namespace": Variable.get("NAMESPACE"),
            "get_logs": True,
            "is_delete_operator_pod": True,
        },
    )

    run_artemis_pipeline
pankajastro commented 1 month ago

Yes, the docs is confusing for the user at the moment. I have created an issue to fix it https://github.com/astronomer/astronomer-cosmos/issues/1128

pankajkoti commented 1 month ago

@ogescalante Thanks for voicing out. Apologies for the frustration caused due to the docs rendering from main branch. I just took a relook at the docs, and they do mention that this will only be available since and after Cosmos 1.6 which is yet to be released.

Thanks @pankajastro for creating an issue to fix the docs.

pankajkoti commented 1 month ago

hi @ogescalante we just have created a pre-release 1.6.0a4 https://pypi.org/project/astronomer-cosmos/1.6.0a4/ that includes the relevant PR for remote manifest load. Appreciate if you would like to test it and provide any feedback! :)