apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

[SUPPORT] Getting error when writing into MOR HUDI table if schema changed (datatype changed / column dropped) #8040

Open ghost opened 1 year ago

ghost commented 1 year ago

Problem

We were trying schema evolution in our MOR Hudi table and we are able to add new column but when we delete a column/change its type, it gives this error - "py4j.protocol.Py4JJavaError: An error occurred while calling o130.save. org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'deleted_column_name' not found at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221)"

We did more research on this issue as it was coming occasionally. But once it comes, the error continues for all the subsequent write operation on the hudi MOR table and we can't write anything on that hudi table.

So our finding is that schema evolution may work fine until compaction takes place. When compaction happens in MOR, it get some file with the deleted column and some without it (or different datatypes) and that is causing this issue. You can test this by keeping the compaction, min/max commit, cleaner commit retained configurations to low values in MOR.

To Reproduce

This is the code snippet. We are using Glue 4.0 and Hudi version 0.12.1. (Tried using EC2 as well)

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.session import SparkSession

args = getResolvedOptions(sys.argv, ['JOB_NAME','schema_evolution_testing'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').config('spark.sql.hive.convertMetastoreParquet','false').getOrCreate() sc = spark.sparkContext glueContext = GlueContext(sc) job = Job(glueContext) job.init(args['JOB_NAME'], args)

def main():

HUDI configuration

commonOptions = {                                               
    'hoodie.upsert.shuffle.parallelism': 200,
    'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.compact.inline': 'true', 
    'hoodie.compact.inline.max.delta.commits': 5,
    'hoodie.keep.max.commits': 5,
    'hoodie.keep.min.commits': 4,
    'hoodie.datasource.write.precombine.field': 'cdc_timestamp',
    'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
    'hoodie.cleaner.commits.retained': 3,
    'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator'
}
tableConfig = {                                             
        "hoodie.table.name": 'emp',
        "hoodie.datasource.write.recordkey.field": 'id',
        'hoodie.datasource.write.table.name': 'emp'
}
CombinedConfig = {**tableConfig, **commonOptions}

# Reading Parquet files from raw S3 DMS output
df = spark.read.parquet('s3://bucket/folder/emp/')
print("df schema is:",df.schema.simpleString())
df.show()

# Writing into hudi
(
    df.write.format("org.apache.hudi")
    .options(**CombinedConfig)
    .mode("append")
    .save(f"s3://hudi_bucket/hudi_folder/emp")
)
print("Upsertion into HUDI completed")

main() job.commit()

These are the steps we followed for testing this issue -

Column Type Changed : run 1 - creation and insertion of 4000 records in hudi table emp with schema - struct<Op:string,cdc_timestamp:string,id:int,name:string>. This job was successful and it created a parquet file in hudi table

run 2 - Altered emp table and added column mgr_id integer. Updated 4000 records. New schema is - struct<Op:string,cdc_timestamp:string,id:int,name:string,mgr_id:int>. This job was successful and it created a .log file in Hudi table.

run 3 - Altered emp table changed type of mgr_id column from integer to character varying and updated 3000 records & inserted 5 records of mgr_id. New schema is - struct<Op:string,cdc_timestamp:string,id:int,name:string,mgr_id:string>. This job was successful and it created another .log file and another parquet file in Hudi table.

run 4 - Updated one record in DB. This job was successful and it created another .log file.

run 5 - Updated one record in DB. This job failed with this ERROR- An error occurred while calling o120.save. Found int, expecting union

Error log - File "/tmp/schema_evolution_testing.py", line 51, in main .save(hudi_table_path) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 968, in save self._jwrite.save(path) File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in call return_value = get_return_value( File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco return f(*a, **kw) File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o120.save. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 41.0 failed 4 times, most recent failure: Lost task 0.3 in stage 41.0 (TID 185) (172.34.225.30 executor 4): org.apache.hudi.exception.HoodieException: Exception when reading log file at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:352) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.(HoodieMergedLogRecordScanner.java:103) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:324) at org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:198) at org.apache.hudi.table.action.compact.HoodieCompactor.lambda$compact$57154431$1(HoodieCompactor.java:138) at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070) at scala.collection.Iterator$$anon$10.next(Iterator.scala:455) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376) at org.apache.spark.rdd.RDD.iterator(RDD.scala:327) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:138) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.avro.AvroTypeException: Found int, expecting union at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308) at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:207) at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:144) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:382) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:464) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:343)

Column dropped : run 1 - creation and insertion of 35 records in hudi table emp with schema - struct<Op:string,cdc_timestamp:string,id:int,name:string,mgr_id:string>. This job was successful and it created a parquet file in hudi table bucket.

run 2 - Altered emp table and dropped column mgr_id. Updated 35 records. New schema is - struct<Op:string,cdc_timestamp:string,id:int,name:string>. This job created 4 new parquet files in hudi table bucket and then failed with this error - An error occurred while calling o120.save. Parquet/Avro schema mismatch: Avro field 'mgr_id' not found. Error log - File "/tmp/schema_evolution_testing.py", line 51, in main .save(hudi_table_path) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 968, in save self._jwrite.save(path) File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in call return_value = get_return_value( File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco return f(*a, **kw) File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o120.save. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 16.0 failed 4 times, most recent failure: Lost task 0.3 in stage 16.0 (TID 120) (172.36.30.113 executor 5): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:378) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376) at org.apache.spark.rdd.RDD.iterator(RDD.scala:327) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:138) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349) at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:80) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161) at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147) Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155) Caused by: org.apache.hudi.exception.HoodieException: operation has failed at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278) at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135) at java.util.concurrent.FutureTask.run(FutureTask.java:266) Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53) at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'mgr_id' not found at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221) at org.apache.parquet.avro.AvroRecordConverter.(AvroRecordConverter.java:126) at org.apache.parquet.avro.AvroRecordConverter.(AvroRecordConverter.java:91) at org.apache.parquet.avro.AvroRecordMaterializer.(AvroRecordMaterializer.java:33) at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142) at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:185) at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)

This script has same behavior on AWS EC2 as well where our production code is running for same HUDI Version 0.12.1.

Environment Description

soumilshah1995 commented 1 year ago

please share your job parameters

soumilshah1995 commented 1 year ago

sample code

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql.session import SparkSession
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame, Row
from pyspark.sql.functions import *
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
import datetime
from awsglue import DynamicFrame

import boto3

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv,
                          ["JOB_NAME", "database_name", "kinesis_table_name", "starting_position_of_kinesis_iterator",
                           "hudi_table_name", "window_size", "s3_path_hudi", "s3_path_spark"])

spark = SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer').config(
    'spark.sql.hive.convertMetastoreParquet', 'false').getOrCreate()

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

database_name = args["database_name"]
kinesis_table_name = args["kinesis_table_name"]
hudi_table_name = args["hudi_table_name"]
s3_path_hudi = args["s3_path_hudi"]
s3_path_spark = args["s3_path_spark"]

print("***********")
print(f"""
database_name  {database_name}
kinesis_table_name = {kinesis_table_name}
hudi_table_name ={hudi_table_name}
s3_path_hudi = {s3_path_hudi}
s3_path_spark = {s3_path_spark}
""")
# can be set to "latest", "trim_horizon" or "earliest"
starting_position_of_kinesis_iterator = args["starting_position_of_kinesis_iterator"]

# The amount of time to spend processing each batch
window_size = args["window_size"]

data_frame_DataSource0 = glueContext.create_data_frame.from_catalog(
    database=database_name,
    table_name=kinesis_table_name,
    transformation_ctx="DataSource0",
    additional_options={"inferSchema": "true", "startingPosition": starting_position_of_kinesis_iterator}
)

# config
commonConfig = {
    'path': s3_path_hudi
}

hudiWriteConfig = {
    'className': 'org.apache.hudi',
    'hoodie.table.name': hudi_table_name,
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
    'hoodie.datasource.write.precombine.field': 'date',
    'hoodie.datasource.write.recordkey.field': '_id',
    'hoodie.datasource.write.partitionpath.field': 'year:SIMPLE,month:SIMPLE,day:SIMPLE',
    'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.CustomKeyGenerator',
    'hoodie.deltastreamer.keygen.timebased.timestamp.type': 'MIXED',
    'hoodie.deltastreamer.keygen.timebased.input.dateformat': 'yyyy-mm-dd',
    'hoodie.deltastreamer.keygen.timebased.output.dateformat': 'yyyy/MM/dd'
}

hudiGlueConfig = {
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
    'hoodie.datasource.hive_sync.database': database_name,
    'hoodie.datasource.hive_sync.table': hudi_table_name,
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.write.hive_style_partitioning': 'true',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.hive_sync.partition_fields': 'year,month,day'
}

combinedConf = {
    **commonConfig,
    **hudiWriteConfig,
    **hudiGlueConfig
}

# ensure the incomong record has the correct current schema, new fresh columns are fine, if a column exists in current schema but not in incoming record then manually add before inserting
def evolveSchema(kinesis_df, table, forcecast=False):
    try:
        # get existing table's schema
        glue_catalog_df = spark.sql("SELECT * FROM " + table + " LIMIT 0")
        # sanitize for hudi specific system columns
        columns_to_drop = ['_hoodie_commit_time', '_hoodie_commit_seqno', '_hoodie_record_key',
                           '_hoodie_partition_path', '_hoodie_file_name']
        glue_catalog_df_sanitized = glue_catalog_df.drop(*columns_to_drop)
        if (kinesis_df.schema != glue_catalog_df_sanitized.schema):
            merged_df = kinesis_df.unionByName(glue_catalog_df_sanitized, allowMissingColumns=True)
        return (merged_df)
    except Exception as e:
        print(e)
        return (kinesis_df)

def processBatch(data_frame, batchId):
    if (data_frame.count() > 0):
        kinesis_dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_kinesis_data_frame")
        kinesis_data_frame = kinesis_dynamic_frame.toDF()

        kinesis_data_frame = evolveSchema(kinesis_data_frame, database_name + '.' + hudi_table_name, False)

        glueContext.write_dynamic_frame.from_options(
            frame=DynamicFrame.fromDF(kinesis_data_frame, glueContext, "evolved_kinesis_data_frame"),
            connection_type="custom.spark",
            connection_options=combinedConf
        )

glueContext.forEachBatch(
    frame=data_frame_DataSource0,
    batch_function=processBatch,
    options={
        "windowSize": window_size,
        "checkpointLocation": s3_path_spark
    }
)

job.commit()
ghost commented 1 year ago

sample code

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql.session import SparkSession
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame, Row
from pyspark.sql.functions import *
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
import datetime
from awsglue import DynamicFrame

import boto3

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv,
                          ["JOB_NAME", "database_name", "kinesis_table_name", "starting_position_of_kinesis_iterator",
                           "hudi_table_name", "window_size", "s3_path_hudi", "s3_path_spark"])

spark = SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer').config(
    'spark.sql.hive.convertMetastoreParquet', 'false').getOrCreate()

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

database_name = args["database_name"]
kinesis_table_name = args["kinesis_table_name"]
hudi_table_name = args["hudi_table_name"]
s3_path_hudi = args["s3_path_hudi"]
s3_path_spark = args["s3_path_spark"]

print("***********")
print(f"""
database_name  {database_name}
kinesis_table_name = {kinesis_table_name}
hudi_table_name ={hudi_table_name}
s3_path_hudi = {s3_path_hudi}
s3_path_spark = {s3_path_spark}
""")
# can be set to "latest", "trim_horizon" or "earliest"
starting_position_of_kinesis_iterator = args["starting_position_of_kinesis_iterator"]

# The amount of time to spend processing each batch
window_size = args["window_size"]

data_frame_DataSource0 = glueContext.create_data_frame.from_catalog(
    database=database_name,
    table_name=kinesis_table_name,
    transformation_ctx="DataSource0",
    additional_options={"inferSchema": "true", "startingPosition": starting_position_of_kinesis_iterator}
)

# config
commonConfig = {
    'path': s3_path_hudi
}

hudiWriteConfig = {
    'className': 'org.apache.hudi',
    'hoodie.table.name': hudi_table_name,
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
    'hoodie.datasource.write.precombine.field': 'date',
    'hoodie.datasource.write.recordkey.field': '_id',
    'hoodie.datasource.write.partitionpath.field': 'year:SIMPLE,month:SIMPLE,day:SIMPLE',
    'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.CustomKeyGenerator',
    'hoodie.deltastreamer.keygen.timebased.timestamp.type': 'MIXED',
    'hoodie.deltastreamer.keygen.timebased.input.dateformat': 'yyyy-mm-dd',
    'hoodie.deltastreamer.keygen.timebased.output.dateformat': 'yyyy/MM/dd'
}

hudiGlueConfig = {
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
    'hoodie.datasource.hive_sync.database': database_name,
    'hoodie.datasource.hive_sync.table': hudi_table_name,
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.write.hive_style_partitioning': 'true',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.hive_sync.partition_fields': 'year,month,day'
}

combinedConf = {
    **commonConfig,
    **hudiWriteConfig,
    **hudiGlueConfig
}

# ensure the incomong record has the correct current schema, new fresh columns are fine, if a column exists in current schema but not in incoming record then manually add before inserting
def evolveSchema(kinesis_df, table, forcecast=False):
    try:
        # get existing table's schema
        glue_catalog_df = spark.sql("SELECT * FROM " + table + " LIMIT 0")
        # sanitize for hudi specific system columns
        columns_to_drop = ['_hoodie_commit_time', '_hoodie_commit_seqno', '_hoodie_record_key',
                           '_hoodie_partition_path', '_hoodie_file_name']
        glue_catalog_df_sanitized = glue_catalog_df.drop(*columns_to_drop)
        if (kinesis_df.schema != glue_catalog_df_sanitized.schema):
            merged_df = kinesis_df.unionByName(glue_catalog_df_sanitized, allowMissingColumns=True)
        return (merged_df)
    except Exception as e:
        print(e)
        return (kinesis_df)

def processBatch(data_frame, batchId):
    if (data_frame.count() > 0):
        kinesis_dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_kinesis_data_frame")
        kinesis_data_frame = kinesis_dynamic_frame.toDF()

        kinesis_data_frame = evolveSchema(kinesis_data_frame, database_name + '.' + hudi_table_name, False)

        glueContext.write_dynamic_frame.from_options(
            frame=DynamicFrame.fromDF(kinesis_data_frame, glueContext, "evolved_kinesis_data_frame"),
            connection_type="custom.spark",
            connection_options=combinedConf
        )

glueContext.forEachBatch(
    frame=data_frame_DataSource0,
    batch_function=processBatch,
    options={
        "windowSize": window_size,
        "checkpointLocation": s3_path_spark
    }
)

job.commit()

Soumil this solution is incorrect for our problem and absolutely useless for our use case as the logic here is for COW HUDI table and will not work for MOR tables as the compaction in MOR works differently. You are merging old data in HUDI and creating a new dataframe without the deleted column and pushing in COW HUDI table here and it will work for COW as in COW new parquets are created for every iteration. But in MOR, a .log file is created in every iteration and during compaction the old data is merged and parquet is created. Still we tested the code you gave us and as expected, we are getting the same error once compaction happens.

ghost commented 1 year ago

please share your job parameters

We are running our production code for Hudi upsert on EC2. We are not using any job parameter for that. In AWS glue, we are using the configuration --datalake-formats having value 'hudi'

soumilshah1995 commented 1 year ago

Do you have an example dataset of some kind. Are you using via AWS glue connector ? if yes did you update to connector ?

Also i remember one guy in group had issue in past he updated to HUDI 0.13 and issue was resolved for him can you try that ? Also can you share snippets that you used to delete column ?

i suggest having spark session in following way

try:
    import sys
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from awsglue.utils import getResolvedOptions
    from pyspark.sql.types import *
    from datetime import datetime, date
    import boto3
    from functools import reduce
    from pyspark.sql import Row

    import uuid
    from faker import Faker
except Exception as e:
    print("Modules are missing : {} ".format(e))

spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
         .config('spark.sql.hive.convertMetastoreParquet', 'false') \
         .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
         .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
         .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()
soumilshah1995 commented 1 year ago

i think i have tried deleting column with glue 4.0 and table type as MOR i am getting following error

image

Sample Code

try:
    import sys
    import os
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from awsglue.utils import getResolvedOptions
    from pyspark.sql.types import *
    from datetime import datetime, date
    import boto3
    from functools import reduce
    from pyspark.sql import Row

    import uuid
    from faker import Faker
except Exception as e:
    print("Modules are missing : {} ".format(e))

spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
         .config('spark.sql.hive.convertMetastoreParquet', 'false') \
         .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
         .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
         .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()

# =================================INSERTING DATA =====================================
global faker
faker = Faker()

class DataGenerator(object):

    @staticmethod
    def get_data():
        return [
            (
                uuid.uuid4().__str__(),
                faker.name(),
                faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                str(faker.random_int(min=10000, max=150000)),
                str(faker.random_int(min=18, max=60)),
                str(faker.random_int(min=0, max=100000)),
                str(faker.unix_time()),
                faker.email(),
                faker.credit_card_number(card_type='amex'),

            ) for x in range(100)
        ]

data = DataGenerator.get_data()
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card"]
spark_df = spark.createDataFrame(data=data, schema=columns)

# ============================== Settings =======================================
db_name = "hudidb"
table_name = "employees"
recordkey = 'emp_id'
precombine = "ts"
PARTITION_FIELD = 'state'
path = "s3://hudi-demos-emr-serverless-project-soumil/tmp1/"
method = 'upsert'
table_type = "MERGE_ON_READ"
# ====================================================================================

hudi_part_write_config = {
    'className': 'org.apache.hudi',

    'hoodie.table.name': table_name,
    'hoodie.datasource.write.table.type': table_type,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.precombine.field': precombine,

    'hoodie.datasource.hive_sync.mode': 'hms',
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.support_timestamp': 'false',
    'hoodie.datasource.hive_sync.database': db_name,
    'hoodie.datasource.hive_sync.table': table_name,

}

spark_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)

# ================================================================
#                         Adding NEW COLUMN
# ================================================================

class DataGenerator(object):

    @staticmethod
    def get_data():
        return [
            (
                uuid.uuid4().__str__(),
                faker.name(),
                faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                str(faker.random_int(min=10000, max=150000)),
                str(faker.random_int(min=18, max=60)),
                str(faker.random_int(min=0, max=100000)),
                str(faker.unix_time()),
                faker.email(),
                faker.credit_card_number(card_type='amex'),
                faker.date().__str__()

            ) for x in range(100)
        ]

data = DataGenerator.get_data()
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card",
           "new_date_col"]
spark_df = spark.createDataFrame(data=data, schema=columns)
spark_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)

try:
    print("Try1")
    table_name_test = f"{table_name}_ro"
    query = f"alter table {db_name}.{table_name_test} drop column credit_card"
    spark.sql(query)
except Exception as e:
    print("ERR1", e)

try:
    print("Try2")
    table_name_test = f"{table_name}_rt"
    query = f"alter table {db_name}.{table_name_test} drop column credit_card"
    spark.sql(query)
except Exception as e:
    print("ERR2", e)

Job param

image

--additional-python-modules faker==11.3.0
--conf --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog --conf spark.sql.legacy.pathOptionBehavior.enabled=true

--datalake-formats hudi
soumilshah1995 commented 1 year ago

i think we need to ask AWS to update Hudi version to 0.13 let see what other have to say on this

soumilshah1995 commented 1 year ago

Good Morning Can some one Please provide some updates regarding this tickets ?

soumilshah1995 commented 1 year ago

any updates ?

soumilshah1995 commented 1 year ago

Any updates on this issue ?

soumilshah1995 commented 1 year ago

Hello do we have an update for this issue ?

ghost commented 1 year ago

Hello HUDI Team,

Any update on the above issue? We have to go live in 2 weeks.

PhantomHunt commented 1 year ago

i think we need to ask AWS to update Hudi version to 0.13 let see what other have to say on this

I tested it and found that the issue I am facing still exists for HUDI 0.13.0 . We are getting the same error when the compaction runs on the MOR table. Changing column type and deleting a column in a MOR hudi table is a very important feature we need.

Note - Now we are using a python script on AWS EC2 for writing into HUDI and not AWS Glue.

soumilshah1995 commented 1 year ago

I've just posted this in Slack; ideally, we can reach an agreement sooner on this issue @PhantomHunt @ghost

nfarah86 commented 1 year ago

flagging.

voonhous commented 1 year ago

Can you try adding these configurations to enable Hudi's full schema evolution? From the configurations, i think Avro's schema resolution was used for schema evolution.

"hoodie.schema.on.read.enable"  ->  "true",
"hoodie.datasource.write.reconcile.schema" -> "true"

If it still doesn't work, is it possible to provide a minimal example so that we can try reproducing on our end? Thank you

soumilshah1995 commented 1 year ago

The complete example is provided above; if you require more details, please let me know.

Code

"""
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog --conf spark.sql.legacy.pathOptionBehavior.enabled=true

"""
try:
    import sys
    import os
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from awsglue.utils import getResolvedOptions
    from pyspark.sql.types import *
    from datetime import datetime, date
    import boto3
    from functools import reduce
    from pyspark.sql import Row

    import uuid
    from faker import Faker
except Exception as e:
    print("Modules are missing : {} ".format(e))

spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
         .config('spark.sql.hive.convertMetastoreParquet', 'false') \
         .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
         .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
         .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()

# =================================INSERTING DATA =====================================
global faker
faker = Faker()

class DataGenerator(object):

    @staticmethod
    def get_data():
        return [
            (
                uuid.uuid4().__str__(),
                faker.name(),
                faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                str(faker.random_int(min=10000, max=150000)),
                str(faker.random_int(min=18, max=60)),
                str(faker.random_int(min=0, max=100000)),
                str(faker.unix_time()),
                faker.email(),
                faker.credit_card_number(card_type='amex'),

            ) for x in range(100)
        ]

data = DataGenerator.get_data()
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card"]
spark_df = spark.createDataFrame(data=data, schema=columns)

# ============================== Settings =======================================
db_name = "hudidb"
table_name = "employees"
recordkey = 'emp_id'
precombine = "ts"
PARTITION_FIELD = 'state'
path = "s3://hudi-demos-emr-serverless-project-soumil/tmp1/"
method = 'upsert'
table_type = "MERGE_ON_READ"
# ====================================================================================

hudi_part_write_config = {
    'className': 'org.apache.hudi',

    'hoodie.table.name': table_name,
    'hoodie.datasource.write.table.type': table_type,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.precombine.field': precombine,
    "hoodie.schema.on.read.enable":"true",
    "hoodie.datasource.write.reconcile.schema":"true",

    'hoodie.datasource.hive_sync.mode': 'hms',
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.support_timestamp': 'false',
    'hoodie.datasource.hive_sync.database': db_name,
    'hoodie.datasource.hive_sync.table': table_name,

}

spark_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)

# ================================================================
#                         Adding NEW COLUMN
# ================================================================

class DataGenerator(object):

    @staticmethod
    def get_data():
        return [
            (
                uuid.uuid4().__str__(),
                faker.name(),
                faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                str(faker.random_int(min=10000, max=150000)),
                str(faker.random_int(min=18, max=60)),
                str(faker.random_int(min=0, max=100000)),
                str(faker.unix_time()),
                faker.email(),
                faker.credit_card_number(card_type='amex'),
                faker.date().__str__()

            ) for x in range(100)
        ]

data = DataGenerator.get_data()
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card",
           "new_date_col"]
spark_df = spark.createDataFrame(data=data, schema=columns)
spark_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)

try:
    print("Try1")
    table_name_test = f"{table_name}_ro"
    query = f"alter table {db_name}.{table_name_test} drop column credit_card"
    spark.sql(query)
except Exception as e:
    print("ERR1", e)

try:
    print("Try2")
    table_name_test = f"{table_name}_rt"
    query = f"alter table {db_name}.{table_name_test} drop column credit_card"
    spark.sql(query)
except Exception as e:
    print("ERR2", e)

image

voonhous commented 1 year ago

I'm not really familiar with AWS's product, but it looks like HoodieSparkSessionExtension isn't being used here.

Is this in your job params?

--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
soumilshah1995 commented 1 year ago
spark.serializer=org.apache.spark.serializer.KryoSerializer 
--conf spark.sql.hive.convertMetastoreParquet=false
--conf spark.sql.hive.convertMetastoreParquet=false
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
--conf spark.sql.legacy.pathOptionBehavior.enabled=true
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension

image

they re job param

soumilshah1995 commented 1 year ago

let me try with setting you mentioned

soumilshah1995 commented 1 year ago

same issue image

after adding conf you mentioned

voonhous commented 1 year ago

Apologies, can't help you with this since this seems to be a AWS glue issue.

However, @PhantomHunt's issue doesn't seem related to this.

Note - Now we are using a python script on AWS EC2 for writing into HUDI and not AWS Glue.

In this regard, can you please share a minimal example of your code that is triggering this issue?

soumilshah1995 commented 1 year ago

I'm not sure if the problem is with HUDI or AWS, but in either case, we need to find a solution so that customers can delete columns when using table type as MOR

PhantomHunt commented 1 year ago

Can you try adding these configurations to enable Hudi's full schema evolution? From the configurations, i think Avro's schema resolution was used for schema evolution.

"hoodie.schema.on.read.enable"  ->  "true",
"hoodie.datasource.write.reconcile.schema" -> "true"

If it still doesn't work, is it possible to provide a minimal example so that we can try reproducing on our end? Thank you

Hi, I tested this configuration, and now the error is not coming up. Will do some in-depth testing next week. I also did some research and figured out there are more such configurations available now like -

'hoodie.datasource.write.schema.allow.auto.evolution.column.drop' : 'true',
'hoodie.avro.schema.external.transformation' : 'true',
'hoodie.avro.schema.validate' : 'true'

So what is the difference between the configurations you gave vs these? Which configuration is the best one here?

soumilshah1995 commented 1 year ago

ie.schema.on.read.enable" -> "true", "hoodie.datasource.write.reconcile.schema" -> "true"

were you able to delete the column in glue ?

PhantomHunt commented 1 year ago

ie.schema.on.read.enable" -> "true", "hoodie.datasource.write.reconcile.schema" -> "true"

were you able to delete the column in glue ?

Not in Glue, we are using writer script in EC2 which is not giving schema error now as it was giving previously. When I checked the _rt and _ro tables on Athena, found that table still contain those columns but no data there. That fits our use case as of now. Will check next week what is the behavior of these configurations with compaction and cleaning

soumilshah1995 commented 1 year ago

sounds good i am running same test on Glue now lets see if that works

soumilshah1995 commented 1 year ago

image

same error

try:
    import sys
    import os
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from awsglue.utils import getResolvedOptions
    from pyspark.sql.types import *
    from datetime import datetime, date
    import boto3
    from functools import reduce
    from pyspark.sql import Row

    import uuid
    from faker import Faker
except Exception as e:
    print("Modules are missing : {} ".format(e))

spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
         .config('spark.sql.hive.convertMetastoreParquet', 'false') \
         .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
         .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
         .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()

# =================================INSERTING DATA =====================================
global faker
faker = Faker()

class DataGenerator(object):

    @staticmethod
    def get_data():
        return [
            (
                uuid.uuid4().__str__(),
                faker.name(),
                faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                str(faker.random_int(min=10000, max=150000)),
                str(faker.random_int(min=18, max=60)),
                str(faker.random_int(min=0, max=100000)),
                str(faker.unix_time()),
                faker.email(),
                faker.credit_card_number(card_type='amex'),

            ) for x in range(100)
        ]

data = DataGenerator.get_data()
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card"]
spark_df = spark.createDataFrame(data=data, schema=columns)

# ============================== Settings =======================================
db_name = "hudidb"
table_name = "employees"
recordkey = 'emp_id'
precombine = "ts"
PARTITION_FIELD = 'state'
path = "s3://hudi-demos-emr-serverless-project-soumil/tmp1/"
method = 'upsert'
table_type = "MERGE_ON_READ"
# ====================================================================================

hudi_part_write_config = {
    'className': 'org.apache.hudi',
    "hoodie.schema.on.read.enable":"true",
    "hoodie.datasource.write.reconcile.schema":"true",
    "hoodie.avro.schema.external.transformation":"true",
    'hoodie.avro.schema.validate':"true",
    "hoodie.datasource.write.schema.allow.auto.evolution.column.drop":"true",

    'hoodie.table.name': table_name,
    'hoodie.datasource.write.table.type': table_type,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.precombine.field': precombine,

    'hoodie.datasource.hive_sync.mode': 'hms',
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.support_timestamp': 'false',
    'hoodie.datasource.hive_sync.database': db_name,
    'hoodie.datasource.hive_sync.table': table_name,

}

spark_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)

# ================================================================
#                         Adding NEW COLUMN
# ================================================================

class DataGenerator(object):

    @staticmethod
    def get_data():
        return [
            (
                uuid.uuid4().__str__(),
                faker.name(),
                faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                str(faker.random_int(min=10000, max=150000)),
                str(faker.random_int(min=18, max=60)),
                str(faker.random_int(min=0, max=100000)),
                str(faker.unix_time()),
                faker.email(),
                faker.credit_card_number(card_type='amex'),
                faker.date().__str__()

            ) for x in range(100)
        ]

data = DataGenerator.get_data()
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card",
           "new_date_col"]
spark_df = spark.createDataFrame(data=data, schema=columns)
spark_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)

try:
    print("Try1")
    table_name_test = f"{table_name}_ro"
    query = f"alter table {db_name}.{table_name_test} drop column credit_card"
    spark.sql(query)
except Exception as e:
    print("ERR1", e)

try:
    print("Try2")
    table_name_test = f"{table_name}_rt"
    query = f"alter table {db_name}.{table_name_test} drop column credit_card"
    spark.sql(query)
except Exception as e:
    print("ERR2", e)
voonhous commented 1 year ago

@soumilshah1995

Seems like a glue issue. I think it's better to open a new issue regarding the problem you are facing.

soumilshah1995 commented 1 year ago

I'll have to explain everything from scratch, however I recommend leaving the github issue open as long as the problem persists.

voonhous commented 1 year ago

It's best practice to have 1 issue tracking 1 problem. Given that OP's issue isn't related to Glue, a new ticket might be helpful for future users encountering Glue + schema evolution issues...

I don't know, just my 2c.

PhantomHunt commented 1 year ago

Can you try adding these configurations to enable Hudi's full schema evolution? From the configurations, i think Avro's schema resolution was used for schema evolution.

"hoodie.schema.on.read.enable"  ->  "true",
"hoodie.datasource.write.reconcile.schema" -> "true"

If it still doesn't work, is it possible to provide a minimal example so that we can try reproducing on our end? Thank you

Hi, I tested this configuration, and now the error is not coming up. Will do some in-depth testing next week. I also did some research and figured out there are more such configurations available now like -

'hoodie.datasource.write.schema.allow.auto.evolution.column.drop' : 'true',
'hoodie.avro.schema.external.transformation' : 'true',
'hoodie.avro.schema.validate' : 'true'

So what is the difference between the configurations you gave vs these? Which configuration is the best one here?

@voonhous any update on this?

soumilshah1995 commented 1 year ago

Here is New Ticket https://github.com/apache/hudi/issues/8401 @voonhous

voonhous commented 1 year ago

@PhantomHunt Will check and get back to you on Monday.

easonwood commented 1 year ago

Hi, we had a similar error but in COW table. Can we check this ? https://github.com/apache/hudi/issues/8540