apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.48k stars 2.44k forks source link

[SUPPORT] Missing Data with Amazon Athena in Glue Table with Hudi 0.10.1 #7191

Closed aniketnanna closed 1 year ago

aniketnanna commented 2 years ago

Highlight of Issues Facing:

  1. Missing Data
  2. DDL changes in Hudi Tables
  3. Upgrade to Newer Version

Detailed Description of Issues:

1. Missing Data a. For around 20 tables,randomly, few records are missing in comparision to the main AWS RDS DB. 100/200 records out of millions of Data records are not available from Athena but Spark SQL shows correct counts b. For 1 or 2 tables, only 1 single records got missed out of 170 millions record c. In 1 table 2800 records missed out of 600,000 records.

2. DDL Changes: a. Add column in existing table and drop column from existing table b. Change table/column name c. Not able to add or delete column from spark.sql. ALTER TABLE ADD COLUMNS E.g. spark.sql('alter table db.table_name add columns(check_status string)') Screenshot from 2022-11-08 00-44-48

3. Upgrade to newer version: a. Upgrade to newer version of Hudi in AWS Work environment(Cureent Hudi Version: 0.10.1) without reprocessing complete Data.

Work Requirement :

  1. Complete production data lake has been built with ~8 TB data having 120+ Tables
  2. Cannot afford to miss a single record due to the important queries that would be running over these tables for business use cases, or if a missing record is identified then we could be able to insert them by one off script calls

Details of Work Environment:

Migrating Postgres RDS to S3 datalake using AWS DMS:

Postgres Version: 12.8

  1. Migration from Postgres RDS to S3 using Amazon DMS a. Source Endpoint: Source Engine: Postgres b. Target Endpoint: Endpoint engine: Amazon S3-output File format:Parquet (Uncompressed) c. Initial Load: For initial load/Full Load based on size of data, Number of output files are one or more. Format of Output file: parquet (Uncompressed) Sizes of output files: Vary between KB to GB based on number of records d. CDC: CDC record files are written in S3 whenever there is change in data or interval of 60s with least interval of 60s in S3 bucket Format of Output file: parquet(Uncompressed)

Development Environment:

Script-1 Details:

**Configurations/Libraries used in the Script:**
    Glue Version: 2.0 supports spark 2.4, scala 2, python 3
    Use Glue data catalog as the Hive metastore
    Hudi Version: 0.10.1
    Dependent JARs path: s3://hudi-files-for-gluejob/hudi-spark-bundle_2.11-0.10.1.jar,s3://hudi-files-for-gluejob/spark-avro_2.11-2.4.4.jar

    Libraries:
    import sys
    import os
    import json
    from dateutil import parser, tz
    from datetime import datetime
    import math
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    from pyspark.sql.functions import concat, col, lit, to_timestamp, year, month,lower,ceil,row_number,max,from_utc_timestamp, date_trunc, dayofmonth
    from pyspark.sql.types import BooleanType
    from pyspark.sql import SQLContext
    from pyspark.sql.window import Window

    from awsglue.utils import getResolvedOptions
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.types import StringType, TimestampType, IntegerType
    import boto3
    import pytz
    import traceback
    from botocore.exceptions import ClientError
    import requests
    spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
    glueContext = GlueContext(spark.sparkContext)

**Flow of the Program:**
1. Checks whether a table exist in glue catalogue tables.
2. Get primary key
4. Create dynamic frame
    inputDyf = glueContext.create_dynamic_frame_from_options(connection_type = 's3', connection_options = {'paths': [s3_path], 'groupFiles': 'none', 'recurse':True}, format = 'parquet',transformation_ctx = t_ctx)
5. Convert dynamic_frame to data_frame
    inputdframe=inputDyf.toDF()
6. Remove duplicates
    inputdframe = inputdframe.withColumn("row_number",row_number().over(Window.partitionBy(inputdframe[primaryKey],inputdframe.update_ts_dms).orderBy(inputdframe[primaryKey],inputdframe.update_ts_dms))).withColumn('max_row_number', max('row_number').over(Window.partitionBy(inputdframe[primaryKey],inputdframe.update_ts_dms))).where(col('row_number') == col('max_row_number')).drop('row_number').drop('max_row_number')
7. Convert all timestamp columns from UTC to IST.
8. Convert specific columns to Boolean
    During Migration, DMS casting boolean columns to string, converting these columns back to boolean
9. For tables where partitions to be added, creating partition columns year, month and day using created_date column
10. Created Hudi table on Full load/Intial Load
    commonConfig = {'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'className' : 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc':'false', 'hoodie.datasource.write.precombine.field': 'update_ts_dms', 'hoodie.datasource.write.recordkey.field': primaryKey, 'hoodie.table.name': tableName1, 'hoodie.consistency.check.enabled': 'false', 'hoodie.datasource.hive_sync.database': database_name, 'hoodie.datasource.hive_sync.table': tableName1, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.parquet.max.file.size':125829120, 'hoodie.parquet.min.file.size':94371840}
    partitionDataConfig = {'hoodie.datasource.write.partitionpath.field': partitionKey, 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor','hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.datasource.hive_sync.partition_fields': partitionKey}
    unpartitionDataConfig = {'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator'}
    initLoadConfig = {'hoodie.bulkinsert.shuffle.parallelism': shufflecount, 'hoodie.datasource.write.operation': 'upsert'}
    here shufflecount count = total_size of files/(1048576*500)
    10.1 Hudi configurations used for initial load for partitioned table (i.e. when table does not exist)
    combinedConf = {**commonConfig, **partitionDataConfig, **initLoadConfig}        
    10.2 Hudi configurations used for initial load for Un-partitioned table (i.e. when table does not exist)
    combinedConf = {**commonConfig, **unpartitionDataConfig, **initLoadConfig}
    10.3 Writing data frame syntax used for both partitioned and unpartitioned tables
        outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Overwrite').save(targetPath)
11. To differntiate between Inserts, Updates and Deletes records for CDC added a column with name 'Op'
    for Inserts, 'Op' = 'I'
    for Updates, 'Op' = 'U'
    for Deletes, 'Op' = 'D'
12. For Inserts and updates i.e. when 'Op' != 'D'. (This is When table exists)
      commonConfig = {'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'className' : 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc':'false', 'hoodie.datasource.write.precombine.field': 'update_ts_dms', 'hoodie.datasource.write.recordkey.field': primaryKey, 'hoodie.table.name': tableName1, 'hoodie.consistency.check.enabled': 'false', 'hoodie.datasource.hive_sync.database': database_name, 'hoodie.datasource.hive_sync.table': tableName1, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.parquet.max.file.size':125829120, 'hoodie.parquet.min.file.size':94371840}
      partitionDataConfig = {'hoodie.datasource.write.partitionpath.field': partitionKey, 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor','hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.datasource.hive_sync.partition_fields': partitionKey}
      unpartitionDataConfig = {'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator'}
      incrementalConfig = {'hoodie.upsert.shuffle.parallelism': 1, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 10}
    Hudi configurations used for CDC partitioned tables
        combinedConf = {**commonConfig, **partitionDataConfig, **incrementalConfig}
    Hudi Configuration used for CDC unpartitioned tables
        combinedConf = {**commonConfig, **unpartitionDataConfig, **incrementalConfig}
    Writing data frame syntax used for both partitioned and unpartitioned tables
        outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Overwrite').save(targetPath)
13. For Deletes i.e. when 'Op' = 'D'
    Hudi configurations used:
    combinedConf = {**commonConfig, **unpartitionDataConfig, **initLoadConfig}
    where,
    commonConfig = {'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'className' : 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc':'false', 'hoodie.datasource.write.precombine.field': 'update_ts_dms', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.table.name': tableName1, 'hoodie.consistency.check.enabled': 'false', 'hoodie.datasource.hive_sync.database': delete_database_name, 'hoodie.datasource.hive_sync.table': tableName1, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.parquet.max.file.size':125829120, 'hoodie.parquet.min.file.size':94371840}
    unpartitionDataConfig = {'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator'}
    initLoadConfig = {'hoodie.bulkinsert.shuffle.parallelism': 5, 'hoodie.datasource.write.operation': 'upsert'}
    Writing data frame syntax used:
        outputDf_deleted.write.format('org.apache.hudi').options(**combinedConf).mode('Overwrite').save(targetPath)

Script-2 Details:

**Configurations/Libraries used in the Script:**
    Glue Version: 2.0 supports spark 2.4, scala 2, python 3
    Use Glue data catalog as the Hive metastore
    Hudi Version: 0.10.1
    Dependent JARs path: s3://hudi-files-for-gluejob/hudi-spark-bundle_2.11-0.10.1.jar,s3://hudi-files-for-gluejob/spark-avro_2.11-2.4.4.jar
    Job parameters: key: --conf Value: spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension

    Libraries:
    import sys
    import json
    import boto3
    import time
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from pyspark.sql.session import SparkSession
    from pyspark.sql.functions import *

**Flow of the Program:**
1. Using spark.sql, queries delete records from delete_catalog created in script-1
2. Hudi Configurations:
    commonConfig = {'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'className' : 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc':'false', 'hoodie.datasource.write.precombine.field': 'update_ts_dms', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.table.name': table_name, 'hoodie.consistency.check.enabled': 'true', 'hoodie.datasource.hive_sync.database': databasename, 'hoodie.datasource.hive_sync.table': table_name, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.parquet.max.file.size':125829120, 'hoodie.parquet.min.file.size':94371840}
    partitionDataConfig = {'hoodie.datasource.write.partitionpath.field': partitionKey, 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor','hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.datasource.hive_sync.partition_fields': partitionKey}
    unpartitionDataConfig = {'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator'}
    incrementalConfig = {'hoodie.upsert.shuffle.parallelism': 1, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 10}
    deleteDataConfig = {'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.EmptyHoodieRecordPayload'}
3. for partitioned tables:
    combinedConf = {**commonConfig, **partitionDataConfig, **incrementalConfig, **deleteDataConfig}
4. for unpartitioned tables:
    combinedConf = {**commonConfig, **unpartitionDataConfig, **incrementalConfig, **deleteDataConfig}
5. Writing dataframe syntax:
    outputDf_deleted.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
6. Delete records from delete_catalog:
    combinedConf = {**commonConfig, **unpartitionDataConfig, **incrementalConfig, **deleteDataConfig}
    outputDf_deleted.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
xushiyan commented 2 years ago

@aniketnanna these are all aws managed services involved. have you filed aws support case?

aniketnanna commented 2 years ago

@xushiyan We have 3 issues to solve as mentioned above:

  1. Missing data
  2. DDL Change
  3. Upgrade New version

1. Missing data:

a.  This issue is related to Athena. Connected with support for the same.
b.  Support Engineer found an error ---> "can not create year partitions from string".
    This error was found only for a few records and a few tables.
c.  Used the following parameter from Hudi Document into the glue job:
    --sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool
    (reference- https://hudi.apache.org/docs/syncing_aws_glue_data_catalog/)
d.  Current Status: 
    Even using the parameter mentioned in (c), Athena-Glue partitions are not getting added to the table even though data is written in S3 within their respective partitions.

2. DDL Change:

    a.  Hudi version: 0.11.0 explicitly mentions that we can run DDL, such as ALTER TABLE   starting with Hudi 0.11.0 using Spark SQL query.
    b.  Though DDL changes are allowed to be handled using Pyspark code, it is not explicitly mentioned in Hudi 0.10.1 documentation that we can run DDL like executing ALTER TABLE using Spark SQL query.
        But, it provides a 'How To' documentation page to run DDL with ALTER TABLE with Spark SQL queries.
    c.  It is confusing to understand, whether Hudi 0.10.1 can perform ALTER TABLE DDL Queries with Spark SQL.
    d.  It threw an error when I tried adding a column from Spark SQL. Attached error screenshot in the case above.
    e.  Need your guidance to perform mainly 2 DDL changes: Add Column and Drop Column.

3. Upgrade New version:

    a.  If want to upgrade to the newer version, it's not feasible to reprocess all data.
    b.  Need your help to upgrade the Hudi version without affecting the existing Hudi table data, if only the Hudi version upgrade can solve some of the issues and AWS is compatible with new Hudi versions above 0.10.1
aniketnanna commented 2 years ago

1. Missing Data Issue

a.  Regarding missing data in some of the tables where data is present in the processed bucket but Athena is not able to read
b.  Found the following errors in error logs Glue job:

Screenshot from 2022-11-15 09-44-04 c. The above issue is solved by adding missing partitions using ALTER TABLE ADD PARTITION in Athena. d. Not able to find the root cause of the issue where Glue and Hudi Scripts missed a few partitions randomly. e. The exact same setup is followed for other tables where it worked fine and added all partitions but with respect to this table found an anomaly.

aniketnanna commented 2 years ago

New Issue in addition to above

4. Unpartitioned data records: a. Few records from a table are unpartitioned and are stored in S3 with partition name "default". i.e. year/month/day --> default/default/default b. These records are not reflected in Athena but can be queried from Spark SQL

Please help on including the unpartitioned data return in query in Athena. What configurations or approach should help here?

codope commented 1 year ago

a. Few records from a table are unpartitioned and are stored in S3 with partition name "default". i.e. year/month/day --> default/default/default b. These records are not reflected in Athena but can be queried from Spark SQL

Hudi used to have default partition value in case the partitionpath field was null for a record in partitioned table. This value is not compatible with the HIve-based engines. So, we switched to the Hive sentinel value __HIVE_DEFAULT_PARTITION__ in https://github.com/apache/hudi/pull/5954

There is a check during upgrade of Hudi which will fail if a partition with old default value is present in the table. When the upgrade fails, hudi-cli can be used to repair the table.

ad1happy2go commented 1 year ago

@aniketnanna After the above fix, its creating the partition with __HIVE_DEFAULT_PARTITION__ and confirmed that Athena is not missing any data.

Glue Code here - https://gist.github.com/ad1happy2go/7d982bc6e137b56ce6e6f18bdb62fd03

image image
codope commented 1 year ago

Closing the issue. Please try out as suggested above.