spark-redshift-community / spark-redshift

Performant Redshift data source for Apache Spark
Apache License 2.0
135 stars 62 forks source link

Support primary key overwrite write data #155

Closed melin closed 5 months ago

melin commented 6 months ago

Support primary key overwrite write data。

bsharifi commented 6 months ago

@melin Thank you for the feature request. Can you please provide more details on what feature you are requesting with perhaps a small example?

melin commented 6 months ago

aws glue etl writes redshift, supports setting the primary key, and generates the following code. We plan to directly spark reshift in emr serverless, replacing glue etl. It is more convenient if spark-redshift supports upsert directly.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame

def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
    for alias, frame in mapping.items():
        frame.toDF().createOrReplaceTempView(alias)
    result = spark.sql(query)
    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Script generated for node Amazon Redshift
AmazonRedshift_node1709708931387 = glueContext.create_dynamic_frame.from_options(connection_type="redshift", connection_options={"redshiftTmpDir": "s3://aws-glue-assets-753463419839-ap-southeast-1/temporary/", "useConnectionProperties": "true", "dbtable": "luckysg_iupushaid.t_short_url_access_statistics", "connectionName": "redshift_conn_ods"}, transformation_ctx="AmazonRedshift_node1709708931387")

# Script generated for node SQL Query
SqlQuery0 = '''
select * from myDataSource
'''
SQLQuery_node1709708949850 = sparkSqlQuery(glueContext, query = SqlQuery0, mapping = {"myDataSource":AmazonRedshift_node1709708931387}, transformation_ctx = "SQLQuery_node1709708949850")

# Script generated for node Amazon Redshift
AmazonRedshift_node1709708962959 = glueContext.write_dynamic_frame.from_options(frame=SQLQuery_node1709708949850, connection_type="redshift", 
    connection_options={"postactions": "BEGIN; DELETE FROM dw_dwd.dwd_t_ord_order_item_d_inc USING dw_dwd.dwd_t_ord_order_item_d_inc_temp_4pkpui WHERE dw_dwd.dwd_t_ord_order_item_d_inc_temp_4pkpui.pv = dw_dwd.dwd_t_ord_order_item_d_inc.pv AND dw_dwd.dwd_t_ord_order_item_d_inc_temp_4pkpui.statistics_date = dw_dwd.dwd_t_ord_order_item_d_inc.statistics_date; INSERT INTO dw_dwd.dwd_t_ord_order_item_d_inc SELECT * FROM dw_dwd.dwd_t_ord_order_item_d_inc_temp_4pkpui; DROP TABLE dw_dwd.dwd_t_ord_order_item_d_inc_temp_4pkpui; END;", "redshiftTmpDir": "s3://aws-glue-assets-753463419839-ap-southeast-1/temporary/", "useConnectionProperties": "true", "dbtable": "dw_dwd.dwd_t_ord_order_item_d_inc_temp_4pkpui", "connectionName": "redshift_conn_dwd", "preactions": "CREATE TABLE IF NOT EXISTS dw_dwd.dwd_t_ord_order_item_d_inc (id BIGINT, short_url VARCHAR, statistics_date VARCHAR, pv INTEGER, uv INTEGER, version INTEGER, create_time TIMESTAMP, update_time TIMESTAMP); DROP TABLE IF EXISTS dw_dwd.dwd_t_ord_order_item_d_inc_temp_4pkpui; CREATE TABLE dw_dwd.dwd_t_ord_order_item_d_inc_temp_4pkpui AS SELECT * FROM dw_dwd.dwd_t_ord_order_item_d_inc WHERE 1=2;"}, transformation_ctx="AmazonRedshift_node1709708962959")

job.commit()
melin commented 5 months ago

cc @bsharifi Is this feature supported?