Open EstherCho-7 opened 1 month ago
from datetime import datetime, timedelta from textwrap import dedent from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.operators.python import (PythonOperator, PythonVirtualenvOperator, BranchPythonOperator) from airflow.models import Variable from pprint import pprint as pp with DAG( 'Movie_summary', default_args={ 'depends_on_past': False, 'retries': 1, 'retry_delay': timedelta(seconds=3) }, # max_active_runs= 1, # max_active_tasks=3, description='About movie', schedule="10 2 * * *", start_date=datetime(2024, 7, 24), catchup=True, tags=['movie', 'api', 'amt'], ) as dag: REQUIREMENTS=[ "git+https://github.com/EstherCho-7/mov_agg@0.5/agg", ] # FUNCTION def gen_empty(*ids): tasks=[] for id in ids: task=EmptyOperator(task_id=id) tasks.append(task) return tuple(tasks) # def apply_data(): # return 0 # def merge_data(): # return 0 # def dedup_data(): # return 0 # def summary_data(): # return 0 def gen_vpython(**kw): # task_id=kw['id'] # fun_o=kw['fun_obj'] # op_kw=kw['op_kwargs'] task=PythonVirtualenvOperator( #task=PythonOperator( task_id=kw['id'], python_callable=kw['fun_obj'], system_site_packages=False, requirements=REQUIREMENTS, op_kwargs=kw['op_kw'] # op_kwargs={ # "url_param": {"multiMovieYn":"Y"}, # } ) return task #def pro_data(ds_nodash, url_param): def pro_data(**params): print("@"*100) print(params['task_name']) # pp(**params) print(params) # task_name O print("@"*100) def pro_merge(task_name,**params): load_dt=params['ds_nodash'] from mov_agg.u import merge df=merge(load_dt) print("*"*33) print(df) def pro_data3(**params): print("@"*100) print(params['task_name']) # pp(**params) print(params) # task_name O print("@"*100) def pro_data4(**params): print("@"*100) print(params['task_name']) # pp(**params) print(params) # task_name O print("@"*100) # OPERATOR start, end = gen_empty('start', 'end') apply_type=gen_vpython( id='apply.type', fun_obj=pro_data, op_kw={ "task_name": "apply_type" } ) merge_df=gen_vpython( id='merge.df', fun_obj=pro_merge, op_kw={ "task_name": "merge_df" } ) de_dup=gen_vpython( id='de.dup', fun_obj=pro_data3, op_kw={ "task_name": "de_dup" } ) summary_df=gen_vpython( id='summary.df', fun_obj=pro_data4, op_kw={ "task_name": "summary_df" } ) start >> merge_df >> de_dup >> apply_type apply_type >> summary_df >> end # start >> apply_type >> merge_df >> de_dup>> summary >> end