spark-redshift-community / spark-redshift

Performant Redshift data source for Apache Spark
Apache License 2.0
137 stars 63 forks source link

Manifest.json not being created while writing pyspark streaming dataframe to redshift #158

Closed Keerthi9711 closed 3 weeks ago

Keerthi9711 commented 7 months ago

Environment setup: AWS EMR serverless 6.9.0 version Pyspark ETL job with multiple streaming queries, each streaming query writes to an iceberg table and redshift table, in microbatches, the trigger interval for microbatch is 60seconds.

Issue: In the redshift temp s3 , i see the folders with .avro files, but the manifest.json are not getting created. So the data is not being copied to redshift.

In the emr drivers logs, stderr, I see the below only few times. 24/05/01 17:41:58 INFO RedshiftWriter: Loading new Redshift data to: <<<<<>>>>>>>>> 24/05/01 17:41:58 INFO RedshiftWriter: CREATE TABLE IF NOT EXISTS <<<<>>>>>>>> 24/05/01 17:41:58 INFO RedshiftWriter: COPY <> FROM 's3://redshift-consumption-alldomains-cache/d7c1f59e-a637-472d-b7c0-254f72e97539/manifest.json' CREDENTIALS 'aws_iam_role=arn:aws:iam::12345678:role/AmazonRedshiftAllCommandsFullAccess' FORMAT AS AVRO 'auto' manifest

But most of the times my logs show 24/05/01 17:46:23 INFO RedshiftWriter: Loading new Redshift data to: <<<<<>>>>>>>>> 24/05/01 17:46:23 INFO RedshiftWriter: CREATE TABLE IF NOT EXISTS <<<<>>>>>>>>

the COPY command is not following this , i'm assuming in such times the manifest.json file is not being created.

CODE SNIPPET:

def df_to_write(template,spark,Data1Renamed,Data2EnrichedRenamed,Data3,Data1_s3_path,Data1_s3_checkpoints,Data2_s3_path,Data2_s3_checkpoints,Data3_s3_path,Data3_s3_checkpoints,iceberg_table_dishsubs,iceberg_table_roamingsubs,iceberg_table_nondishsubs,Data1_rtbl,Data2_rtbl,nonData1_rtbl,redshift_url,redshift_temp,redshift_exec_role):
    cur_time = datetime.now(timezone.utc)
    stop_time = (cur_time + timedelta(days=1)).replace(
        hour=7, minute=50, second=0, microsecond=0
    )
    waiting_duration = (stop_time - cur_time).total_seconds()

    Data1_s3_query=Data1Renamed.writeStream.queryName("data1")\
                            .foreachBatch(
                                            lambda df,batch_id:write_to_multiple_sinks
                                                (df=df,batch_id=batch_id,s3_path=Data1_s3_path,s3_path_chekpoints=Data1_s3_checkpoints,                                                 
                                                 iceberg_table=iceberg_table_dishsubs,template=template,spark=spark,subscriber='XXXX',
                                                 rtbl_name=Data1_rtbl,
                                                 redshift_exec_role=redshift_exec_role,redshift_url=redshift_url,
                                                 redshift_temp=f"{redshift_temp}ookla_insourced/{template}/Data1"
                                                )
                                        ).option("checkpointLocation",Data1_s3_checkpoints)\
                                        .trigger(processingTime='60 seconds').start()
    Data2_s3_query=Data2EnrichedRenamed.writeStream.queryName("data2")\
                            .foreachBatch(
                                            lambda df,batch_id:write_to_multiple_sinks
                                                (df=df,batch_id=batch_id,s3_path=Data2_s3_path,s3_path_chekpoints=Data2_s3_checkpoints,
                                                 iceberg_table = iceberg_table_roamingsubs,template=template,spark=spark,
                                                 subscriber='XXXXX',
                                                 rtbl_name=Data2_rtbl,
                                                 redshift_exec_role=redshift_exec_role,redshift_url=redshift_url,
                                                 redshift_temp=f"{redshift_temp}ookla_insourced/{template}/Data2"
                                                )
                                        ).option("checkpointLocation",Data2_s3_checkpoints)\
                                        .trigger(processingTime='60 seconds').start()
    Data3_s3_query=Data3.writeStream.queryName("data3")\
                            .foreachBatch(
                                            lambda df,batch_id:write_to_multiple_sinks
                                                (df=df,batch_id=batch_id,s3_path=Data3_s3_path,s3_path_chekpoints=Data3_s3_checkpoints,
                                                 iceberg_table=iceberg_table_nondishsubs,template=template,spark=spark,subscriber='XXXXX',
                                                 rtbl_name=nonData1_rtbl,
                                                 redshift_exec_role=redshift_exec_role,redshift_url=redshift_url,
                                                 redshift_temp=f"{redshift_temp}ookla_insourced/{template}/nonData1"
                                                )
                                        ).option("checkpointLocation",Data3_s3_checkpoints)\
                                        .trigger(processingTime='60 seconds').start()

    Data1_s3_query.awaitTermination(waiting_duration)
    Data2_s3_query.awaitTermination(waiting_duration)
    Data3_s3_query.awaitTermination(waiting_duration)

def write_to_multiple_sinks(spark,df,batch_id,s3_path,s3_path_chekpoints,iceberg_table,template,subscriber,rtbl_name,
                            redshift_exec_role,redshift_url,redshift_temp):
    ruser,rpwd=get_secret()
    r_url = f"{redshift_url}?user={ruser}&password={rpwd}"
    glue_catalog_name = "AwsDataCatalog"
    database_name = "XXXXXX"
    table_name = iceberg_table
    tableIdentifier = s3_path
    tbl_properties = f"PARTITIONED BY (dl_year,dl_month,dl_day) location '{tableIdentifier}' TBLPROPERTIES('table_type' = 'ICEBERG','write.format'='parquet','write.parquet.compression-codec'='snappy','write.target-file-size-bytes'='536870912','write.metadata.delete-after-commit.enabled'='true','write.metadata.previous-versions-max'='10','commit.manifest-merge.enabled'='true','write.delete.mode'='copy-on-write','write.update.mode'='copy-on-write')"

    if template=='abc' :
        join_condition= 'XXXX'
        if subscriber=='XXXX':
          table_schema = schema_1
          streaming_cols = cols_1
        elif subscriber=='XXXX':
          table_schema = schema_2
          streaming_cols = cols_2
        elif subscriber=='XXXX':
          table_schema = schema_3
          streaming_cols = cols_3
    elif template=='def':
        join_condition= 'XXXX'
        if subscriber=='XXXX':
          table_schema = schema_4
          streaming_cols = cols_4
        elif subscriber=='XXXXX':
          table_schema = schema_5
          streaming_cols = cols_5
        elif subscriber=='XXXX':
          table_schema = schema_6
          streaming_cols = cols_6
    try:
        spark.sql(f"use {glue_catalog_name}.{database_name}")
        spark.sql(f"CREATE TABLE IF NOT EXISTS {glue_catalog_name}.{database_name}.{table_name} {table_schema} {tbl_properties}")          
        df.createOrReplaceGlobalTempView(f"temp_view_{subscriber}") 

        #data dedeuplication using iceberg merge
        result_rows = spark.sql(f"""
            WITH earliestvalidtimestamp AS
                    (  
                      SELECT *
                      FROM   {glue_catalog_name}.{database_name}.{table_name}
                      WHERE  result_date > CURRENT_TIMESTAMP() - interval 1 day 
                    ) 
               ,newdataset AS 
                  (
                      SELECT df.*
                      FROM  global_temp.temp_view_{subscriber} df
                      LEFT ANTI JOIN  earliestvalidtimestamp tbl
                      ON  df.{join_condition}=tbl.{join_condition} 
                    ) 
            SELECT {streaming_cols} FROM newdataset
            """)

        result_rows.writeTo(f"{glue_catalog_name}.{database_name}.{table_name}").append()

        ##### writing to redshift tables ###########
        result_rows.write.format("io.github.spark_redshift_community.spark.redshift") \
            .option("url", r_url) \
            .option("dbtable", rtbl_name) \
            .option("tempdir", redshift_temp) \
            .option("aws_iam_role", redshift_exec_role).mode('append').save()

    finally:
       result = spark.catalog.dropGlobalTempView(f"temp_view_{subscriber}")
      #  print("is global view dropped",f"temp_view_{subscriber}", result)
       spark.catalog.dropGlobalTempView(f"temp_view_{subscriber}")
bsharifi commented 4 months ago

@Keerthi9711 Apologies for the delayed response. When the COPY fails, can you please check the temporary S3 bucket for whether or not the manifest.json file is missing? Are there any other errors or exceptions found in the driver node logs? You can also check the logs on the other nodes in the cluster for exceptions or errors.