WHATDO-TEAM / whatdo

1 stars 0 forks source link

[FEATURE] Airflow 구축 #47

Open seokjunKing opened 3 weeks ago

seokjunKing commented 3 weeks ago
seokjunKing commented 3 weeks ago

Airflow 구축

1. 인스턴스 사양 선택

2. 운영체제 선택

3. 네트워크 설정

4. Airflow 설치 및 구성


고려사항

seokjunKing commented 3 weeks ago

CSV 복사 DAG 작성

  1. csv 파일이 로컬에 업로드 되어있다는 가정
  2. 수동으로 dag 트리거 한다는 가정
  3. 중복 실행 방지 설정
  4. 경로, 버킷이름 등등 임의로 설정

위 기준으로 임의의 dag 작성(gpt)

copy_csv_to_gcs_dag.py

from airflow import DAG
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.operators.python import PythonOperator
import pendulum
import os

# 로컬 경로 및 파일 기록 경로 설정
local_dir = '/home/wangseokjun333/data/upload/'
completed_files_path = '/home/wangseokjun333/data/upload/copied_files.txt'
destination_bucket = 'whatdo'

# 복사할 새로운 파일 확인 함수
def find_new_csv():
    files_to_copy = []
    if os.path.exists(completed_files_path):
        with open(completed_files_path, 'r') as f:
            copied_files = f.read().splitlines()
    else:
        copied_files = []

    for filename in os.listdir(local_dir):
        if filename.endswith('.csv') and filename not in copied_files:
            files_to_copy.append(filename)

    return files_to_copy

# 복사 완료된 파일 기록 함수
def log_copied_file(filename):
    with open(completed_files_path, 'a') as f:
        f.write(f"{filename}\n")

# DAG 설정
with DAG(
    dag_id='manual_csv_copy',
    schedule=None,  # 수동 트리거
    start_date=pendulum.today('UTC').add(days=-1),
    catchup=False
) as dag:

    # 1. 새로운 CSV 파일을 확인하는 Task
    find_new_files_task = PythonOperator(
        task_id='find_new_csv_files',
        python_callable=find_new_csv,
    )

    # 2. 로컬 파일을 whatdo 버킷으로 업로드하는 Task
    def upload_to_gcs_task(**kwargs):
        files_to_copy = kwargs['ti'].xcom_pull(task_ids='find_new_csv_files')
        for file in files_to_copy:
            upload_task = LocalFilesystemToGCSOperator(
                task_id=f'upload_{file}_to_whatdo',
                src=f"{local_dir}{file}",  # 로컬에서 파일의 전체 경로
                dst=f"upload/{file}",  # GCS 버킷 내 저장 경로
                bucket=destination_bucket
            )
            upload_task.execute(context=kwargs)
            log_copied_file(file)  # 업로드 후 파일을 기록하여 중복 방지

    upload_to_whatdo = PythonOperator(
        task_id='upload_to_whatdo',
        python_callable=upload_to_gcs_task,
    )

    # Task 순서 지정
    find_new_files_task >> upload_to_whatdo

참고자료

  1. 로컬에서 gcs로 파일 업로드 https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/transfer/local_to_gcs.html

  2. 복사 완료 파일 목록 기록 copied_files.txt라는 파일을 로컬에 생성하여 복사 완료된 파일명을 기록 DAG 실행 시 이 파일을 확인하고 이미 복사된 파일은 건너뛰도록 설정


고려사항

  1. 스토리지 생성

  2. csv 업로드 방식

    • csv파일이 업로드되는 버킷을 만들어 활용 (gcs_to_gcs)
    • csv파일이 로컬에 업로드됨 (local_to_gcs)
  3. dag 실행 방식

    • 파일 업로드 감지로 복사
    • 주기적으로 dag 실행
    • 수동 트리거
    • 이벤트 기반 트리거

위 사항을 고려하여 설계할 필요가 있음

seokjunKing commented 1 week ago

파이프라인의 가용성을 높이고 데이터 손실 및 에러 상황을 효과적으로 관리

데이터 적재의 안정성:

데이터 적재 중 네트워크 오류나 DB 연결 문제가 발생할 경우, 해당 데이터를 임시 저장하고 네트워크 안정화 후 재처리를 자동화하는 로직을 추가

데이터 분할 및 병렬 처리:

큰 데이터를 한 번에 처리하는 대신 Airflow의 병렬 처리를 사용해 데이터를 효율적으로 나누어 처리함으로써 작업 속도를 최적화하고 자원을 절약할 수 있습니다. 예를 들어, 일별 또는 시간 단위로 파일을 분할하여 병렬로 적재하면, 전체 적재 시간이 단축되고 안정성을 높일 수 있다.

데이터 백업 및 복구 전략:

Airflow에서 실패한 작업이나 데이터 유실에 대비해 주요 단계마다 데이터를 백업하고, 필요 시 백업된 데이터로부터 복구할 수 있도록 파이프라인을 구성

Task 간의 의존성 관리와 트랜잭션 처리:

DAG 내의 중요한 작업에 대해 원자성(Atomicity)을 확보하여, 특정 단계가 실패했을 때 모든 작업이 자동으로 취소되거나 재처리되도록 설정하면 파이프라인의 신뢰성을 높일 수 있다