open-metadata / openmetadata-spark-agent

Apache License 2.0
2 stars 2 forks source link

Can not create pipeline with spark job #19

Open buithuhasvtech opened 1 month ago

buithuhasvtech commented 1 month ago

Hi there, I am trying to create a pipeline with Spark job (My code is based on this tutorial: https://docs.open-metadata.org/v1.3.x/connectors/ingestion/lineage/spark-lineage#spark-lineage-ingestion). The tables used in the spark job all exist in OM, the spark job has a finished status, and a pipeline service named My_pipeline_service has been created. Screenshot 2024-05-21 141945

But I didn't see any pipeline. What can I do to create the pipeline with the spark job? I have only added metadata ingestion in the DB service, do I need to add any other type of ingestion to create the pipeline?

This is my code: `from pyspark.sql import SparkSession import sys import calendar import time from pyspark.sql.types import StructType,StructField, StringType, IntegerType

ss_conf = {'database': 'test_db', 'password': 'my_pwd', 'port': '3306', 'host': 'my_host', 'ssl': 'false', 'username': 'my_username', }

OM_JWT = "my_om_jwt"

spark = ( SparkSession.builder.master("local") .appName("localTestApp").config( "spark.jars", "/thuvien/driver/singlestore-spark-connector_2.12-4.1.3-spark-3.3.0.jar,/thuvien/driver/mariadb-java-client-3.1.4.jar,/thuvien/driver/singlestore-jdbc-client-1.2.0.jar,/thuvien/driver/commons-dbcp2-2.9.0.jar,/thuvien/driver/commons-pool2-2.11.1.jar,/thuvien/driver/spray-json_2.10-1.2.5.jar,/thuvien/driver/openmetadata-spark-agent-1.0-beta.jar", ) .config( "spark.extraListeners", "org.openmetadata.spark.agent.OpenMetadataSparkListener", ) .config("spark.openmetadata.transport.hostPort", "my_hostPort") .config("spark.openmetadata.transport.type", "openmetadata") .config("spark.openmetadata.transport.jwtToken", OM_JWT) .config( "spark.openmetadata.transport.pipelineServiceName", "My_pipeline_service" ) .config("spark.openmetadata.transport.pipelineName", "My_pipeline_name") .config( "spark.openmetadata.transport.pipelineDescription", "My ETL Pipeline" ) .config( "spark.openmetadata.transport.databaseServiceNames", "analytic", ) .config("spark.openmetadata.transport.timeout", "30") .config("spark.openlineage.facets.disabled", "[spark_unknown;spark.logicalPlan]") .getOrCreate() )

sc = spark.sparkContext sc.setLogLevel("INFO")

conf ss

spark.conf.set("spark.datasource.singlestore.clientEndpoint",ss_conf['host']) spark.conf.set("spark.datasource.singlestore.user", ss_conf['username']) spark.conf.set("spark.datasource.singlestore.password", ss_conf['password'])

df1 = (spark .read .format("singlestore") .option("spark.datasource.singlestore.disablePushdown", "true") .option("enableParallelRead", "automatic") .option("parallelRead.Features", "readFromAggregatorsMaterialized,readFromAggregators") .option("paralledRead.repartition", "true") .option("paralledRead.maxNumPartitions", 20) .option("parallelRead.repartition.columns", "isdn_key") .load("test_db.uc16_02_tkc_20240229"))

print(df1.count())

df1.na.fill(0) .na.fill("") .write .mode("Overwrite") .format("singlestore").save("test_db.uc16_02_tkc_20240229_new")

spark.stop() `