apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.88k stars 13.94k forks source link

Allow jinja templating connection ids for all third party operators #35259

Open kimminw00 opened 9 months ago

kimminw00 commented 9 months ago

Description

We use private staging and prod S3s(Ceph clusters for example) in our office. So there are often cases where DAGs are running with only connection ids changed. We prefer to use Param rather than to use hardcoded connection ids to make our code reusable. I only gave an example for Amazon operator, but templating connection ids is required for other operators too.

Why is it needed? Code reusability

Use case/motivation

with DAG(
    dag_id="example_s3",
    params={
        "aws_conn_id": Param("", type="string"),
    },
    ...
) as dag:

    create_object = S3CreateObjectOperator(
        task_id="create_object",
        s3_bucket=bucket_name,
        s3_key=key,
        data=DATA,
        replace=True,
        aws_conn_id="{{ params.aws_conn_id }}", # Params enable us to provide runtime configuration
    )

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

Taragolis commented 9 months ago

This might be blocked by https://github.com/apache/airflow/issues/29069 .

In general you could extend S3CreateObjectOperator.template_fields by create custom Operator with required template_fields

from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator

class AwesomeS3CreateObjectOperator:
    template_fields: ("aws_conn_id", *S3CreateObjectOperator.template_fields)
Taragolis commented 6 months ago

I think it might be a good first issue, I also required to collect all connection ID from the existed operators and list them, so everyone could pick up and make the changes.

Mark as good first issue, so maybe someone could volunteer free time to find at least most of the non-templated connections ids.

kimminw00 commented 5 months ago

I collected all third party operators which have connection IDs.

Third party operators which have connection IDs [ ] Airbyte [ ] Alibaba [ ] Amazon [ ] Apache Beam [ ] Apache Cassandra [ ] Apache Drill [ ] Apache Druid [ ] Apache Flink [ ] Apache HDFS [ ] Apache Hive [ ] Apache Impala [ ] Apache Kafka [ ] Apache Kylin [ ] Apache Livy [ ] Apache Pig [ ] Apache Pinot [ ] Apache Spark [ ] Apprise [ ] ArangoDB [ ] Asana [ ] Atlassian Jira [ ] Cloudant [ ] CNCF Kubernetes [ ] Cohere [ ] Common IO [ ] Common SQL [ ] Databricks [ ] Datadog [ ] dbt Cloud [ ] Dingding [ ] Discord [ ] Docker [ ] Elasticsearch [ ] Exasol [ ] Facebook [ ] File Transfer Protocol (FTP) [ ] GitHub [ ] Google [ ] gRPC [ ] Hashicorp [ ] Hypertext Transfer Protocol (HTTP) [ ] IBM Cloudant [ ] Influx DB [ ] Internet Message Access Protocol (IMAP) [ ] Java Database Connectivity (JDBC) [ ] Jenkins [ ] Microsoft Azure [ ] Microsoft SQL Server (MSSQL) [ ] Microsoft PowerShell Remoting Protocol (PSRP) [ ] Microsoft Windows Remote Management (WinRM) [ ] MongoDB [ ] MySQL [ ] Neo4j [ ] ODBC [ ] OpenAI [ ] OpenFaaS [ ] OpenLineage [ ] Open Search [ ] Opsgenie [ ] Oracle [ ] Pagerduty [ ] Papermill [ ] PgVector [ ] Pinecone [ ] PostgreSQL [ ] Presto [ ] Qdrant [ ] Redis [ ] Salesforce [ ] Samba [ ] Segment [ ] Sendgrid [ ] SFTP [ ] Slack [ ] SMTP [ ] Snowflake [ ] SQLite [ ] SSH [ ] Tableau [ ] Tabular [ ] Telegram [ ] Teradata [ ] Trino [ ] Vertica [ ] Weaviate [ ] Yandex [ ] Zendesk
geraj1010 commented 3 days ago

I think we would just add the connection_id paramter to template_fields for all existing providers operators? However, that would be cumbersome and how to apply that convention moving forward? I'd be happy to make the updates to the operators, but I'm concerned about inconsistency.