getElementsByName / tmp-doc

0 stars 0 forks source link

airflow dag sample #18

Open getElementsByName opened 3 years ago

getElementsByName commented 3 years ago
from datetime import timedelta
from dateutil.tz import tz

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

from airflow.utils.dates import days_ago
from airflow.utils.timezone import datetime

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date':  datetime(2020, 9, 8, 0, 0),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(seconds=1),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
dag = DAG(
    'sample',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    catchup=True
)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

def sample_python_callable(**kwargs):
  print(f'{kwargs}')
  execution_date = kwargs['execution_date']
  execution_date_with_timezone = execution_date.astimezone(tz=tz.tzlocal())
  print(execution_date)
  print(execution_date_with_timezone)

t2 = PythonOperator(
    task_id='python',
    depends_on_past=False,
    retries=1,
    dag=dag,
    python_callable=sample_python_callable,
    provide_context=True,
    op_kwargs={
    },
    execution_timeout=timedelta(minutes=1),
)
dag.doc_md = __doc__

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""

# import pendulum
# dt = pendulum.parse('1975-05-21 22:00:00')
# dt.add(days=-1)

#  macros.ds_format(ds, "%Y-%m-%d", "%Y")
templated_command = """
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% set et_tz = execution_date.astimezone(tz=params.local_tz) %}
echo "year {{ et_tz.strftime('%Y') }}"
echo "month {{ et_tz.strftime('%m') }}"
echo "day {{ et_tz.strftime('%d') }}"
echo "day {{ et_tz.add(days=-1).strftime('%d') }}"
echo "{{ params.my_param }}"
"""

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in', 'local_tz': tz.tzlocal()},
    dag=dag,
)

t1 >> [t2, t3]