delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.65k stars 1.72k forks source link

[BUG] Multi-cluster writes to Delta Lake Storage in S3 #1498

Closed soumilshah1995 closed 1 year ago

soumilshah1995 commented 2 years ago

Trying out

https://delta.io/blog/2022-05-18-multi-cluster-writes-to-delta-lake-storage-in-s3/ https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#

DynamoDB tables

image

jar files

image

jar file path provided

image

When i run my code i dont see any messages in the dynamodb table here is code

try:
    import os
    import sys

    import pyspark
    from pyspark import SparkConf, SparkContext
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, asc, desc
    from pyspark.sql.functions import *
    from delta.tables import *
    from delta.tables import DeltaTable

    print("All modules are loaded .....")

except Exception as e:
    print("Some modules are missing {} ".format(e))

class DeltaLakeHelper(object):
    """
    Delta Lakes Python helper class that aids in basic operations such as inserting, updating, deleting, merging, removing older files and versions, and generating Athena manifest files.
    """

    def __init__(self, delta_lake_path: str):

        self.spark = self.__create_spark_session()
        self.delta_lake_path = delta_lake_path
        self.delta_df = None

    def generate_manifest_files(self):
        """

        Generates Manifest file for Athena

        :return: Bool
        """
        self.delta_df.generate("symlink_format_manifest")
        return True

    def __generate_delta_df(self):
        try:
            if self.delta_df is None:
                self.delta_df = DeltaTable.forPath(self.spark, self.delta_lake_path)
        except Exception as e:
            pass

    def compact_table(self, num_of_files=10):
        """

        Converts smaller parquert files into larger Files

        :param num_of_files: Int
        :return: Bool

        """
        df_read = self.spark.read.format("delta") \
            .load(self.delta_lake_path) \
            .repartition(num_of_files) \
            .write.option("dataChange", "false") \
            .format("delta") \
            .mode("overwrite") \
            .save(self.delta_lake_path)
        return True

    def delete_older_files_versions(self):
        """

        Deletes Older Version and calls vacuum(0)

        :return: Bool
        """
        self.__generate_delta_df()
        self.delta_df.vacuum(0)
        return True

    def insert_overwrite_records_delta_lake(self, spark_df, max_record_per_file='10000'):
        """
        Inserts into Delta Lake

        :param spark_df: Pyspark Dataframe
        :param max_record_per_file: str ie max_record_per_file= "10000"
        :return:Bool
        """
        spark_df.write.format("delta") \
            .mode("overwrite") \
            .option("maxRecordsPerFile", max_record_per_file) \
            .save(self.delta_lake_path)

        return True

    def append_records_delta_lake(self, spark_df, max_record_per_file="10000"):
        """

        Append data into Delta lakes

        :param spark_df: Pyspark Dataframe
        :param max_record_per_file: str ie max_record_per_file= "10000"
        :return: Bool
        """
        spark_df.write.format("delta") \
            .mode('append') \
            .option("maxRecordsPerFile", max_record_per_file) \
            .save(self.delta_lake_path)
        return True

    def update_records_delta_lake(self, condition="", value_to_set={}):
        """

        Set the value on delta lake

        :param condition : Str Example:  condition="emp_id = '3'"
        :param value_to_set: Dict IE  value_to_set={"employee_name": "'THIS WAS UPDATE ON DELTA LAKE'"}
        :return: Bool
        """
        self.__generate_delta_df()
        self.delta_df.update(condition, value_to_set)
        return True

    def upsert_records_delta_lake(self, old_data_key, new_data_key, new_spark_df):
        """

        Find one and update into delta lake
        If records is found it will update if not it will insert into delta lakes
        See Examples on How to use this

        :param old_data_key: Key is nothing but Column on which you want to merge or upsert data into delta lake
        :param new_data_key: Key is nothing but Column on which you want to merge or upsert data into delta lake
        :param new_spark_df: Spark DataFrame
        :return: Bool
        """
        self.__generate_delta_df()
        dfUpdates = new_spark_df

        self.delta_df.alias('oldData') \
            .merge(dfUpdates.alias('newData'), f'oldData.{old_data_key} = newData.{new_data_key}') \
            .whenMatchedUpdateAll() \
            .whenNotMatchedInsertAll() \
            .execute()

        return True

    def delete_records_delta_lake(self, condition=""):
        """

        Set the value on delta lake

        :param condition:Str IE condition="emp_id = '4'"
        :return:Bool
        """
        self.__generate_delta_df()
        self.delta_df.delete(condition)
        return True

    def read_delta_lake(self):
        """

        Reads from Delta lakes

        :return: Spark DF
        """
        df_read = self.spark.read.format("delta").load(self.delta_lake_path)
        return df_read

    def __create_spark_session(self):
        self.spark = SparkSession \
            .builder \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.databricks.delta.retentionDurationCheck.enabled", "false") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
            .config("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore") \
            .getOrCreate()
        return self.spark

def main():

    try:
        from awsglue.job import Job
        from awsglue.utils import getResolvedOptions
        from awsglue.dynamicframe import DynamicFrame
        from awsglue.context import GlueContext
    except Exception as e:pass

    helper = DeltaLakeHelper(delta_lake_path="s3a://glue-learn-begineers/deltalake/delta_table")

    args = getResolvedOptions(sys.argv, ["JOB_NAME"])
    spark = helper.spark
    sc = spark.sparkContext
    glueContext = GlueContext(sc)
    job = Job(glueContext)
    job.init(args["JOB_NAME"], args)

    # ====================================================
    """Create Spark Data Frame """
    # ====================================================
    data = impleDataUpd = [
        (1, "this is insert 1 ", "Sales", "RJ", 81000, 30, 23000, 827307999),
        (2, "this is insert 2", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
        (3, "this is insert 3", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
        (4, "this is insert 3", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
    ]
    columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
    df_write = spark.createDataFrame(data=data, schema=columns)
    helper.insert_overwrite_records_delta_lake(spark_df=df_write)

    # ====================================================
    """Appending  """
    # ====================================================
    data = impleDataUpd = [
        (5, "this is append", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
    ]
    columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
    df_append = spark.createDataFrame(data=data, schema=columns)
    helper.append_records_delta_lake(spark_df=df_append)

    # ====================================================
    """READ FROM DELTA LAKE  """
    # ====================================================
    df_read = helper.read_delta_lake()
    print("READ", df_read.show())

    # ====================================================
    """UPDATE DELTA LAKE"""
    # ====================================================
    helper.update_records_delta_lake(condition="emp_id = '3'",
                                     value_to_set={"employee_name": "'THIS WAS UPDATE ON DELTA LAKE'"})

    # ====================================================
    """ DELETE DELTA LAKE"""
    # ====================================================
    helper.delete_records_delta_lake(condition="emp_id = '4'")

    # ====================================================
    """ FIND ONE AND UPDATE OR UPSERT DELTA LAKE """
    # ====================================================
    new_data = [
        (2, "this is update on delta lake ", "Sales", "RJ", 81000, 30, 23000, 827307999),
        (11, "This should be append ", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
    ]

    columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
    usr_up_df = spark.createDataFrame(data=new_data, schema=columns)

    helper.upsert_records_delta_lake(old_data_key='emp_id',
                                     new_data_key='emp_id',
                                     new_spark_df=usr_up_df)

    # ====================================================
    """ Compaction DELTA Prune Older Version and Create larger Files """
    # ====================================================
    helper.compact_table(num_of_files=2)
    helper.delete_older_files_versions()

    # ====================================================
    """ Create Manifest File for Athena """
    # ====================================================
    helper.generate_manifest_files()

    job.commit()

main()

Please advice

soumilshah1995 commented 2 years ago

Hie is there any updates ??

tdas commented 2 years ago

Can you get the log4j logs of your application by configuring log4j.properties? Here is some links I found that may help it setting it up

With the INFO level logs, we will be able to see what io.delta.storage.S3DynamoDBLogStore class is doing. Is it being correctly loaded or not.

soumilshah1995 commented 2 years ago

i can try to go to cloud watch and get the logs if thats what you want as mentioned jobs succeeded and does nto throws error

soumilshah1995 commented 2 years ago

@tdas

2022-11-26T06:13:22.523-05:00

Copy
+------+-----------------+-----------+-----+------+---+-----+----------+
|emp_id|    employee_name| department|state|salary|age|bonus|        ts|
+------+-----------------+-----------+-----+------+---+-----+----------+
|     4| this is insert 3|Engineering|   RJ| 79000| 53|15000|1627694678|
|     2| this is insert 2|Engineering|   RJ| 79000| 53|15000|1627694678|
|     3| this is insert 3|Engineering|   RJ| 79000| 53|15000|1627694678|
|     5|   this is append|Engineering|   RJ| 79000| 53|15000|1627694678|
|     1|this is insert 1 |      Sales|   RJ| 81000| 30|23000| 827307999|
+------+-----------------+-----------+-----+------+---+-----+----------+

READ None

More LOGS

2022-11-26 11:12:45,696 main WARN JNDI lookup class is not available because this JRE does not support JNDI. JNDI string lookups will not be available, continuing configuration. java.lang.ClassNotFoundException: org.apache.logging.log4j.core.lookup.JndiLookup
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    at org.apache.logging.log4j.util.LoaderUtil.loadClass(LoaderUtil.java:173)
    at org.apache.logging.log4j.util.LoaderUtil.newInstanceOf(LoaderUtil.java:211)
    at org.apache.logging.log4j.util.LoaderUtil.newCheckedInstanceOf(LoaderUtil.java:232)
    at org.apache.logging.log4j.core.util.Loader.newCheckedInstanceOf(Loader.java:301)
    at org.apache.logging.log4j.core.lookup.Interpolator.<init>(Interpolator.java:95)
    at org.apache.logging.log4j.core.config.AbstractConfiguration.<init>(AbstractConfiguration.java:114)
    at org.apache.logging.log4j.core.config.DefaultConfiguration.<init>(DefaultConfiguration.java:55)
    at org.apache.logging.log4j.core.layout.PatternLayout$Builder.build(PatternLayout.java:430)
    at org.apache.logging.log4j.core.layout.PatternLayout.createDefaultLayout(PatternLayout.java:324)
    at org.apache.logging.log4j.core.appender.ConsoleAppender$Builder.<init>(ConsoleAppender.java:121)
    at org.apache.logging.log4j.core.appender.ConsoleAppender.newBuilder(ConsoleAppender.java:111)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.logging.log4j.core.config.plugins.util.PluginBuilder.createBuilder(PluginBuilder.java:158)
    at org.apache.logging.log4j.core.config.plugins.util.PluginBuilder.build(PluginBuilder.java:119)
    at org.apache.logging.log4j.core.config.AbstractConfiguration.createPluginObject(AbstractConfiguration.java:813)
    at org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:753)
    at org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:745)
    at org.apache.logging.log4j.core.config.AbstractConfiguration.doConfigure(AbstractConfiguration.java:389)
    at org.apache.logging.log4j.core.config.AbstractConfiguration.initialize(AbstractConfiguration.java:169)
    at org.apache.logging.log4j.core.config.AbstractConfiguration.start(AbstractConfiguration.java:181)
    at org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:446)
    at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:520)
    at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:536)
    at org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:214)
    at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:146)
    at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:41)
    at org.apache.logging.log4j.LogManager.getContext(LogManager.java:194)
    at org.apache.logging.log4j.LogManager.getLogger(LogManager.java:597)
    at org.apache.spark.metrics.sink.MetricsConfigUtils.<clinit>(MetricsConfigUtils.java:12)
    at org.apache.spark.metrics.sink.MetricsProxyInfo.fromConfig(MetricsProxyInfo.java:17)
    at com.amazonaws.services.glue.cloudwatch.CloudWatchLogsAppenderCommon.<init>(CloudWatchLogsAppenderCommon.java:62)
    at com.amazonaws.services.glue.cloudwatch.CloudWatchLogsAppenderCommon$CloudWatchLogsAppenderCommonBuilder.build(CloudWatchLogsAppenderCommon.java:79)
    at com.amazonaws.services.glue.cloudwatch.CloudWatchAppender.activateOptions(CloudWatchAppender.java:73)
    at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
    at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
    at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
    at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)
    at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
    at org.apache.log4j.PropertyConfigurator.configureRootCategory(PropertyConfigurator.java:648)
    at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:514)
    at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
    at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
    at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
    at org.slf4j.impl.Log4jLoggerFactory.<init>(Log4jLoggerFactory.java:66)
    at org.slf4j.impl.StaticLoggerBinder.<init>(StaticLoggerBinder.java:72)
    at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:45)
    at org.slf4j.LoggerFactory.bind(LoggerFactory.java:150)
    at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:124)
    at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:412)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:357)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:383)
    at org.apache.spark.network.util.JavaUtils.<clinit>(JavaUtils.java:41)
    at org.apache.spark.internal.config.ConfigHelpers$.byteFromString(ConfigBuilder.scala:67)
    at org.apache.spark.internal.config.ConfigBuilder.$anonfun$bytesConf$1(ConfigBuilder.scala:259)
    at org.apache.spark.internal.config.ConfigBuilder.$anonfun$bytesConf$1$adapted(ConfigBuilder.scala:259)
    at org.apache.spark.internal.config.TypedConfigBuilder.$anonfun$transform$1(ConfigBuilder.scala:101)
    at org.apache.spark.internal.config.TypedConfigBuilder.createWithDefault(ConfigBuilder.scala:144)
    at org.apache.spark.internal.config.package$.<init>(package.scala:345)
    at org.apache.spark.internal.config.package$.<clinit>(package.scala)
    at org.apache.spark.SparkConf$.<init>(SparkConf.scala:654)
    at org.apache.spark.SparkConf$.<clinit>(SparkConf.scala)
    at org.apache.spark.SparkConf.set(SparkConf.scala:94)
    at org.apache.spark.SparkConf.$anonfun$loadFromSystemProperties$3(SparkConf.scala:76)
    at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:788)
    at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:230)
    at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461)
    at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:787)
    at org.apache.spark.SparkConf.loadFromSystemProperties(SparkConf.scala:75)
    at org.apache.spark.SparkConf.<init>(SparkConf.scala:70)
    at org.apache.spark.SparkConf.<init>(SparkConf.scala:59)
    at com.amazonaws.services.glue.SparkProcessLauncherPlugin.getSparkConf(ProcessLauncher.scala:41)
    at com.amazonaws.services.glue.SparkProcessLauncherPlugin.getSparkConf$(ProcessLauncher.scala:40)
    at com.amazonaws.services.glue.ProcessLauncher$$anon$1.getSparkConf(ProcessLauncher.scala:78)
    at com.amazonaws.services.glue.ProcessLauncher.<init>(ProcessLauncher.scala:84)
    at com.amazonaws.services.glue.ProcessLauncher.<init>(ProcessLauncher.scala:78)
    at com.amazonaws.services.glue.ProcessLauncher$.main(ProcessLauncher.scala:29)
    at com.amazonaws.services.glue.ProcessLauncher.main(ProcessLauncher.scala)
soumilshah1995 commented 2 years ago

Any Updates ??

scottsand-db commented 2 years ago

It seems you're having some issues getting the logs, right? Give the error message above. Would this link help? https://stackoverflow.com/questions/70383503/after-log4j-changes-hive-e-returns-additional-warning-which-has-impact-on-the-s

tdas commented 2 years ago

Hey @soumilshah1995 are you sure this is the only error. This is just a WARNING, not an ERROR that log4j library is printing. We want to see INFO level logs that has "DynamoDBLogStore" in it. Are you getting any log with "INFO" in them? If not, you can set the log level in your code as sparkContext.setLogLevel("INFO") before any delta operation. Then you should see info-level logs and in that, grep for "LogStore". Then you can share that filtered log with us for debugging.

soumilshah1995 commented 2 years ago

This is all i have man. please let me know if you want me to include anything specific in code above

soumilshah1995 commented 1 year ago

Any updates ?

scottsand-db commented 1 year ago

Ho @soumilshah1995 - did you follow TD's suggestion to include sparkContext.setLogLevel("INFO") in your spark setup?

Also, if you are having specific log4j issues, I would suggest you post in a Spark thread as Spark has a larger community and there will likely be someone there that has already come across your issue.

soumilshah1995 commented 1 year ago

Thanks a scott we switched to Apache Hudi instead of Delta lakes due to limitations i appreciate the help

scottsand-db commented 1 year ago

Hi @soumilshah1995 I just realized that you are using Delta Lake 1.1.0.

In Delta lake 1.2, we switched to using the delta-storage module ... to allow custom log stores ... like the DynamoDB log store ...

https://docs.delta.io/latest/porting.html#delta-lake-1-1-or-below-to-delta-lake-1-2-or-above

So, multi-cluster writes aren't working because you aren't using a new enough version of Delta Lake! Please use version 1.2 or above.