Open salamandra2508 opened 1 month ago
Thanks for opening your first issue in the Marquez project! Please be sure to follow the issue template!
It looks like your configuration is good since you've got jobs in the system.
Are you using the correct Airflow operators to extract lineage metadata? (ie: not python or k8s operator) Have you tried to turn on the debug logs in Airflow so that you can see the inputs and outputs are indeed populated with schemas? Are there are namespaces created for your datasets?
It looks like your configuration is good since you've got jobs in the system.
Are you using the correct Airflow operators to extract lineage metadata? (ie: not python or k8s operator) Have you tried to turn on the debug logs in Airflow so that you can see the inputs and outputs are indeed populated with schemas? Are there are namespaces created for your datasets?
I tried to create DAG based on example ( btw example from repo (https://github.com/MarquezProject/marquez/blob/main/examples/airflow/airflow.md) works fine based on Postgres operator) but when I tried to create same dag via sparksubmit operator, I can see jobs but no dataset . my spark script : `from pyspark.sql import SparkSession from pyspark.sql.functions import lit import random import sys
def runstage(stage): spark = SparkSession.builder.appName(f"CombinedDataJob{stage}").getOrCreate()
s3_output_table1 = "s3a://test-busket/marquez/table1.csv"
s3_output_table2 = "s3a://test-busket/marquez/table2.csv"
s3_output_merged = "s3a://test-busket/marquez/merged_table.csv"
s3_output_final = "s3a://test-busket/marquez/final_dataset.csv"
if stage == 'generate':
# Stage 1: Generate 2 tables with some date and save them
table1_data = [(1, 'A'), (2, 'B')]
table2_data = [(1, 'First'), (2, 'Second')]
df_table1 = spark.createDataFrame(table1_data, ['id', 'value'])
df_table1.write.mode('overwrite').csv(s3_output_table1, header=True)
df_table2 = spark.createDataFrame(table2_data, ['id', 'description'])
df_table2.write.mode('overwrite').csv(s3_output_table2, header=True)
elif stage == 'merge':
# Stage 2: Merge these tables into a third table
df_table1 = spark.read.csv(s3_output_table1, header=True, inferSchema=True)
df_table2 = spark.read.csv(s3_output_table2, header=True, inferSchema=True)
df_merged = df_table1.join(df_table2, 'id')
df_merged.write.mode('overwrite').csv(s3_output_merged, header=True)
elif stage == 'process':
# Stage 3: Create a dataset for the output of the merged table
df_merged = spark.read.csv(s3_output_merged, header=True, inferSchema=True)
df_final = df_merged.withColumn('processed_flag', lit(True))
df_final.write.mode('overwrite').csv(s3_output_final, header=True)
spark.stop()
if name == "main":
stage = sys.argv[1] # The stage to run is passed as the first argument
run_stage(stage)
DAG:
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.utils.dates import days_ago
from airflow.datasets import Dataset # Importing the Dataset class
from datetime import datetime, timedelta
from draif_common.common_env import get_default_spark_conf, LIB_FOLDER
default_args = { 'owner': 'datascience', 'depends_on_past': False, 'start_date': days_ago(1), 'email_on_failure': False, 'email_on_retry': False, 'email': ['datascience@example.com'], 'execution_timeout': timedelta(minutes=300), }
dag = DAG( dag_id='complex_data_pipeline_dag', schedule_interval=None, catchup=False, default_args=default_args, description='A complex data pipeline DAG' )
conf = get_default_spark_conf()
generate_tables = SparkSubmitOperator( task_id='generate_tables', application=f'{LIB_FOLDER}/combined_data_job.py', conn_id='SparkConnection', total_executor_cores=2, executor_cores=1, executor_memory='1g', driver_memory='1g', conf=conf, application_args=['generate'], dag=dag )
merge_tables = SparkSubmitOperator( task_id='merge_tables', application=f'{LIB_FOLDER}/combined_data_job.py', conn_id='SparkConnection', total_executor_cores=2, executor_cores=1, executor_memory='1g', driver_memory='1g', conf=conf, application_args=['merge'], dag=dag )
process_final_dataset = SparkSubmitOperator(
task_id='process_final_dataset',
application=f'{LIB_FOLDER}/combined_data_job.py',
conn_id='SparkConnection',
total_executor_cores=2,
executor_cores=1,
executor_memory='1g',
driver_memory='1g',
conf=conf,
application_args=['process'],
outlets=[
Dataset('s3a://TEST-BUCKET/marquez/final_dataset.csv') # Track final dataset
],
dag=dag
)
generate_tables >> merge_tables >> process_final_dataset `
@salamandra2508 I see you're trying to manually define outlets in operator. How about turning on Spark OpenLineage listener and getting the lineage automatically from spark job https://openlineage.io/docs/integrations/spark/ ?
After installing Marquez and setup in my environments (AWS EKS + helm + argocd) I can see the Airflow DAGs in Marquez UI but do not see datasets.
![Datasets-Repo-https-code-rbi-tech-raiffeisen-ua-data-airflow-examples-git-Branch-dev-new-10-02-2024_09_33_AM] (https://github.com/user-attachments/assets/4bf50c17-4fe8-4764-925c-a488a14f80d5)
how to achieve this? Or can some one provide DAG example to check it? I'm pretty new with this stuff.