# Import
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import (PythonOperator, PythonVirtualenvOperator, BranchPythonOperator)
from airflow.models import Variable
from pprint import pprint
# Start and End function
def gen_emp(id, rule="all_success"):
op = EmptyOperator(task_id=id, trigger_rule=rule)
return op
# Setting
with DAG(
'Movie',
default_args={
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(seconds=3)
},
description='About movie',
schedule="10 2 * * *",
start_date=datetime(2024, 7, 24),
catchup=True,
tags=['movie'],
) as dag:
# Functions
def get_data(ds_nodash): # bring data of movies
from movie.api.call import save2df
df=save2df(ds_nodash)
print(df.head(5))
def save_data(ds_nodash): # save data of movies
from movie.api.call import apply_type2df
df=apply_type2df(load_dt=ds_nodash)
print("*"*33)
print(df.head(10))
print("*"*33)
print(df.dtypes)
g=df.groupby('openDt') # Grouping by 'column'
# openDt(개봉일)를 기준으로,
sum_df=g.agg({'audiCnt': 'sum'}).reset_index() # Aggregation
# audiCnt(관객 수)를 sum하고 reset_index()로 배열 reset, agg(): 그룹화된 데이터에 대해 여러 가지 집계 연산을 수행
print(sum_df)
def branch_fun(ds_nodash): # Choose a branch if:
import os
home_dir=os.path.expanduser('~') # For Path
path=os.path.join(home_dir, f"tmp/test_parquet/load_dt={ds_nodash}")
if os.path.exists(path): # If there is the directory,
return "rm.dir" # True (There is the directory)
else:
return "get.data", "echo.task" # False (There is no directory)
# Operators
branch_op=BranchPythonOperator( # Operator that makes a choice (Directory 有没有)
task_id="branch.op",
python_callable=branch_fun # Call function
)
task_get=PythonVirtualenvOperator( # Operator that brings data
task_id='get.data',
python_callable=get_data,
requirements=["git+https://github.com/EstherCho-7/movie1.git@0.4/api"], # At least
system_site_packages=False,
trigger_rule="all_done", # Trigger
# venv_cache_path="/home/esthercho/tmp2/airflow_venv/get_data"
)
rm_dir=BashOperator( # Operator that removes the directory if there is the directory
task_id='rm.dir',
bash_command='rm -rf ~/tmp/test_parquet/load_dt={{ds_nodash}}' # Command for remove
)
save_data=PythonVirtualenvOperator( # Operator that saves data
task_id='save.data',
python_callable=save_data,
requirements=["git+https://github.com/EstherCho-7/movie1.git@0.4/api"],
system_site_packages=False,
trigger_rule="one_success",
# venv_cache_path="/home/esthercho/tmp2/airflow_venv/get_data"
)
echo_task=BashOperator( # say task when there is no directory
task_id="echo.task",
bash_command="echo 'task'"
)
task_end = gen_emp('end', 'all_done')
task_start = gen_emp('start')
task_join=BashOperator(
task_id='join',
bash_command="exit 1",
trigger_rule="all_done"
)
# Pipe line
task_start >> branch_op >> rm_dir >> task_get
task_start >> task_join >> save_data
branch_op >> task_get
branch_op >> echo_task >> save_data
task_get >> save_data >> task_end
# rm_dir >> task_get