awslabs / amazon-s3-tagging-spark-util

Apache License 2.0
10 stars 2 forks source link

Apache Spark based Amazon S3 object tagging

Arch

This is a library built on top of Apache Spark for tagging Amazon S3 objects. This library helps you to tag objects at table level or partition level. This library supports the following file format options

Requirements

How to build the library ?

The project is compiled using SBT. The library depends on Java 8 and is known to work with Apache Spark versions 2.4.3

Spark 2.4:

Spark 3.1:

Spark 3.3:

Spark 3.4:

This JAR includes the spark-avro and commons-lang3 and its dependencies. They need to be put in Spark's extra classpath.

Note:- The released JARs are available in the releases page.

Configure AWS Glue ETL Job

Copy the JAR for the corresponding Scala version and spark version into Amazon S3 bucket

aws s3 cp spark_24/target/scala-2.11/amazon-s3-tagging-spark-util-spark_24-scala-2.11-lib-2.0.jar s3://$BUCKET/$PREFIX

Create a Glue ETL job with following special parameters. For more details on AWS Glue Special Parameters.

Glue 2.0 Configuration:

"--extra-jars" : "s3://$BUCKET/$PREFIX/amazon-s3-tagging-spark-util-spark_24-scala-2.11-lib-2.0.jar" // change the jar for spark version

Glue 3.0 Configuration:

"--extra-jars" : "s3://$BUCKET/$PREFIX/amazon-s3-tagging-spark-util-spark_31-scala-2.12-lib-2.0.jar" // change the jar for spark version
"-enable-s3-parquet-optimized-committer": "false"

Glue 4.0 Configuration:

"--extra-jars" : "s3://$BUCKET/$PREFIX/amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar" // change the jar for spark version
"-enable-s3-parquet-optimized-committer": "false"

Configure AWS EMR ETL Job

Copy the JAR for the corresponding Scala version and spark version into Amazon S3 bucket

aws s3 cp spark_24/target/scala-2.11/amazon-s3-tagging-spark-util-spark_24-scala-2.11-lib-2.0.jar s3://$BUCKET/$PREFIX

EMR Job Spark Configuration:

"spark.jars": "s3://$BUCKET/$PREFIX/amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar"
"spark.sql.parquet.fs.optimized.committer.optimization-enabled": "false"
"spark.sql.sources.commitProtocolClass": "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol"
"spark.hadoop.mapreduce.output.fs.optimized.committer.enabled": "false"

Please update the jar file based on EMR version like below.

Supported Glue and EMR Versions and Library

Glue Supported Version Supported Library
Glue 2.0 amazon-s3-tagging-spark-util-spark_24-scala-2.11-lib-2.0.jar
Glue 3.0 amazon-s3-tagging-spark-util-spark_31-scala-2.12-lib-2.0.jar
Glue 4.0 amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar
EMR Supported Version Supported Library
EMR 6.8.0 amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar
EMR 6.8.1 amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar
EMR 6.9.0 amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar
EMR 6.9.1 amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar
EMR 6.10.0 amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar
EMR 6.10.1 amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar
EMR 6.11.0 amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar
EMR 6.11.1 amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar
EMR 6.12.0 amazon-s3-tagging-spark-util-spark_34-scala-2.12-lib-2.0.jar
EMR 6.13.0 amazon-s3-tagging-spark-util-spark_34-scala-2.12-lib-2.0.jar
EMR 6.14.0 amazon-s3-tagging-spark-util-spark_34-scala-2.12-lib-2.0.jar

Sample Scala Spark Job

Sample Scala Spark Code: For this example, we assume that we work on some kind of customer data, where every it has customer id, name , street, city and country.

case class Customer(id: Long, name: String, street: String, city: String, country: String)

Our library is built on Apache Spark and is designed to work with very large datasets that typically live in a distributed filesystem. For the sake of simplicity in this example, we just generate a few records though.

val rdd = spark.parallelize(Seq(
  Customer(1, "James Butt", "627 Walford Ave", "Dallas", "Dallas"),
  Customer(2, "Gearldine Gellinger", "4 Bloomfield Ave", "Irving", "Dallas"),
  Customer(3, "Ozell Shealy", "8 Industry Ln", "New York", "New York"),
  Customer(4, "Haydee Denooyer", "25346 New Rd", "New York", "New York"),
  Customer(5, "Mirta Mallett", "7 S San Marcos Rd", "New York", "New York")))

val customerDataFrame = spark.createDataFrame(rdd)

Store the data into S3 and tag the objects using additional options.

// Option 1: Static Tagging
customerDataFrame
  .write
  .format("s3.parquet")
  .option("tags", "{\"ProjectTeam\": \"Team-A\", \"FileType\":\"parquet\"}")
  .save("s3://$DATA_BUCKET/$TABLE_NAME")

// Option 2: Dynamic Tagging using partition value
customerDataFrame
  .write
  .partitionBy("country")
  .format("s3.parquet")
  .option("tags", "{\"ProjectTeam\": \"Team-A\", \"Country\":\"${country}\"}")
  .save("s3://$DATA_BUCKET/$TABLE_NAME")

Sample PySpark Job

Our library is built on Apache Spark and is designed to work with very large datasets that typically live in a distributed filesystem. For the sake of simplicity in this example, we just generate a few records though.


rdd = spark.sparkContext.parallelize([
    Row(id=1, name="James Butt", street="627 Walford Ave", city="Dallas", country="USA"),
    Row(id=2, name="Gearldine Gellinger", street="4 Bloomfield Ave", city="Irving", country="USA"),
    Row(id=3, name="Ozell Shealy", street="8 Industry Ln", city="New York", country="USA"),
    Row(id=4, name="Haydee Denooyer", street="25346 New Rd", city="New York", country="USA"),
    Row(id=5, name="Mirta Mallett", street="7 S San Marcos Rd", city="New York", country="USA")
])

# Create a DataFrame from the RDD
customer_data_frame = spark.createDataFrame(rdd)

Store the data into S3 and tag the objects using additional options.

# Option 1: Static Tagging
customer_data_frame \
    .write \
    .format("s3.parquet") \
    .option("tags", "{\"ProjectTeam\": \"Team-A\", \"FileType\":\"parquet\"}") \
    .save("s3://$DATA_BUCKET/$TABLE_NAME")

# Option 2: Dynamic Tagging using partition value
customer_data_frame \
    .write \
    .partitionBy("country") \
    .format("s3.parquet") \
    .option("tags", "{\"ProjectTeam\": \"Team-A\", \"Country\":\"${country}\"}") \
    .save("s3://$DATA_BUCKET/$TABLE_NAME")

Contributing Guidelines

See CONTRIBUTING for more information.

License

This library is licensed under the Apache 2.0 License.