apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.23k stars 2.39k forks source link

[SUPPORT] How to use DeltaStreamer on AWS Glue for Community Video and Blogs #10752

Closed soumilshah1995 closed 4 months ago

soumilshah1995 commented 4 months ago

Greetings,

I am currently engaged in developing a community video aimed at illustrating to users the advantages of utilizing DeltaStreamer on AWS Glue as opposed to EMR. AWS Glue, being serverless and lightweight, offers significant benefits for data processing tasks.

Step 1: Download Dataset and upload to S3

Link : https://drive.google.com/drive/folders/1BwNEK649hErbsWcYLZhqCWnaXFX3mIsg?usp=share_link

Step 2: Create Scala Job


import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
import org.apache.spark.sql.SparkSession
import org.apache.spark.api.java.JavaSparkContext
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
import org.apache.hudi.utilities.deltastreamer.SchedulerConfGenerator
import org.apache.hudi.utilities.UtilHelpers

object GlueApp {
  def main(sysArgs: Array[String]) {
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)

    var config = Array(
      "--source-class", "org.apache.hudi.utilities.sources.ParquetDFSSource",
      "--source-ordering-field", "replicadmstimestamp",
      "--target-base-path", "s3://XX/test_silver/",
      "--target-table", "invoice",
      "--table-type" , "COPY_ON_WRITE",
      "--hoodie-conf", "hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator",
      "--hoodie-conf", "hoodie.datasource.write.recordkey.field=invoiceid",
      "--hoodie-conf", "hoodie.datasource.write.partitionpath.field=destinationstate",
      "--hoodie-conf", "hoodie.streamer.source.dfs.root=s3://XX/test/",
      "--hoodie-conf", "hoodie.datasource.write.precombine.field=replicadmstimestamp"
    )

    val cfg = HoodieDeltaStreamer.getConfig(config)
    val additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg)
    val jssc = UtilHelpers.buildSparkContext("delta-streamer-test", "jes", additionalSparkConfigs)
    val spark = jssc.sc

    val glueContext: GlueContext = new GlueContext(spark)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    try {
        new HoodieDeltaStreamer(cfg, jssc).sync();
    } finally {
        jssc.stop();
    }

    Job.commit()
  }
}

I've attempted to implement the provided code, but I'm encountering difficulties in getting it to function correctly. Here are the approaches I've tried:

Approach 1 : Use Glue 4.0 with Flags


--datalake-formats | hudi

--conf  |  spark.serializer=org.apache.spark.serializer.KryoSerializer  --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog --conf spark.sql.legacy.pathOptionBehavior.enabled=true --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension

Unfortunately, this approach did not yield the desired outcome.

I also attempted to upload custom JAR files to S3 and utilize them, but encountered similar issues.

image

I would greatly appreciate any assistance or insights regarding the correct utilization of DeltaStreamer on AWS Glue.

Thank you for your attention and support.

ad1happy2go commented 4 months ago

@soumilshah1995 I made few modifications and made it work with below code. After 0.14.X, the classname changed to HoodieStreamer.

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
import org.apache.spark.sql.SparkSession
import org.apache.spark.api.java.JavaSparkContext
import org.apache.hudi.utilities.streamer.HoodieStreamer
import org.apache.hudi.utilities.streamer.SchedulerConfGenerator
import org.apache.hudi.utilities.UtilHelpers
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;

object GlueApp {

  def main(sysArgs: Array[String]) {
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)

    val BUCKET = <bucket>

    var config = Array(
      "--source-class", "org.apache.hudi.utilities.sources.JsonDFSSource",
      "--source-ordering-field", "ts",
      "--target-base-path", "s3://<BUCKET>/sandbox/aditya_sandbox/testds_glue/invoice" ,
      "--target-table", "invoice",
      "--table-type" , "COPY_ON_WRITE",
      "--hoodie-conf", "hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator",
      "--hoodie-conf", "hoodie.datasource.write.recordkey.field=symbol",
      "--hoodie-conf", "hoodie.datasource.write.partitionpath.field=date",
      "--hoodie-conf", s"hoodie.streamer.source.dfs.root=s3://${BUCKET}/testcases/stocks/data/source",
      "--hoodie-conf", "hoodie.datasource.write.precombine.field=ts",
      "--schemaprovider-class","org.apache.hudi.utilities.schema.FilebasedSchemaProvider",
      "--hoodie-conf", "hoodie.datasource.write.hive_style_partitioning=false",
      "--hoodie-conf", s"hoodie.deltastreamer.schemaprovider.source.schema.file=s3://${BUCKET}/testcases/stocks/data/schema.avsc",
      "--hoodie-conf", s"hoodie.deltastreamer.schemaprovider.target.schema.file=s3://${BUCKET}/testcases/stocks/data/schema.avsc",
      "--payload-class", s"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
      "--props", s"s3://${BUCKET}/testcases/stocks/configs/hoodie.properties"
    )

    val cfg = HoodieStreamer.getConfig(config)
    val additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg)
    val jssc = UtilHelpers.buildSparkContext("delta-streamer-test", "jes", additionalSparkConfigs)
    val spark = jssc.sc

    val glueContext: GlueContext = new GlueContext(spark)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    try {
        new HoodieStreamer(cfg, jssc).sync();
    } finally {
        jssc.stop();
    }

    Job.commit()
  }
}

used these jars - hudi-utilities-slim-bundle_2.12-0.14.1.jar,hudi-spark3.3-bundle_2.12-0.14.1.jar,jcommander-1.78.jar

soumilshah1995 commented 4 months ago

Roger that testing it now on my aws account

soumilshah1995 commented 4 months ago

Screenshot 2024-02-27 at 8 50 43 AM

@ad1happy2go Thanks of help I will make sure you are given credits in videos