apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.35k stars 2.42k forks source link

[SUPPORT] Insert overwrite with replacement instant cannot execute archive #10873

Closed xuzifu666 closed 6 months ago

xuzifu666 commented 6 months ago

Describe the problem you faced

Insert overwrite with replacement instant cannot execute archive 1710641988757.png

To Reproduce

Steps to reproduce the behavior:

  1. insert overwrite a table partition 100 times with replacement instant in timeline
  2. timeline had not execute archived all the time
  3. impact query table due to scan timeline instants

Expected behavior

execute archive in a regular time

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

xuzifu666 commented 6 months ago

@jonvex @ad1happy2go could you look into this issue?

ad1happy2go commented 6 months ago

@xuzifu666 When I tried the below code, it was properly archiving. Can you check below or provide your table/writer configurations.

fake = Faker()
data = [{"transactionId": fake.uuid4(), "EventTime": "2014-01-01 23:00:01","storeNbr" : "1",
         "FullName": fake.name(), "Address": fake.address(),
         "CompanyName": fake.company(), "JobTitle": fake.job(),
         "EmailAddress": fake.email(), "PhoneNumber": fake.phone_number(),
         "RandomText": fake.sentence(), "City": "US",
         "State": "NYC", "Country": "US"} for _ in range(5)]

hudi_options = {
    "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
    "hoodie.datasource.write.recordkey.field": "transactionId,storeNbr,EventTime",
    "hoodie.datasource.write.precombine.field": "Country",
    "hoodie.table.name": "huditransaction",
    "hoodie.datasource.write.operation" : "insert_overwrite",
    "hoodie.datasource.write.partitionpath.field" : "city"
}

pandas_df = pd.DataFrame(data)
df = spark.createDataFrame(pandas_df).withColumn("EventTime", expr("cast(EventTime as timestamp)"))

for i in range(1,100):
    (df.write.format("hudi").options(**hudi_options).mode("append").save(PATH))
image
xuzifu666 commented 6 months ago

@xuzifu666 When I tried the below code, it was properly archiving. Can you check below or provide your table/writer configurations.

fake = Faker()
data = [{"transactionId": fake.uuid4(), "EventTime": "2014-01-01 23:00:01","storeNbr" : "1",
         "FullName": fake.name(), "Address": fake.address(),
         "CompanyName": fake.company(), "JobTitle": fake.job(),
         "EmailAddress": fake.email(), "PhoneNumber": fake.phone_number(),
         "RandomText": fake.sentence(), "City": "US",
         "State": "NYC", "Country": "US"} for _ in range(5)]

hudi_options = {
    "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
    "hoodie.datasource.write.recordkey.field": "transactionId,storeNbr,EventTime",
    "hoodie.datasource.write.precombine.field": "Country",
    "hoodie.table.name": "huditransaction",
    "hoodie.datasource.write.operation" : "insert_overwrite",
    "hoodie.datasource.write.partitionpath.field" : "city"
}

pandas_df = pd.DataFrame(data)
df = spark.createDataFrame(pandas_df).withColumn("EventTime", expr("cast(EventTime as timestamp)"))

for i in range(1,100):
    (df.write.format("hudi").options(**hudi_options).mode("append").save(PATH))
image

@ad1happy2go Hi,you can set partition key 'city' different in range(1,100),it would not archive,please try it again,thanks.

ad1happy2go commented 6 months ago

@xuzifu666 In case if you saying this to update above code -

for i in range(1,100):
    df = df.withColumn("city", lit(i))
    (df.write.format("hudi").options(**hudi_options).mode("append").save(PATH))

With above change, yes there was no commit which was archiving. But I am thinking why they will be even archived, as all partitions data will still be valid and all the commits are valid and should be active.

xuzifu666 commented 6 months ago

yes there was no commit which was archiving.

Hi, @ad1happy2go this case is alse archived?

ad1happy2go commented 6 months ago

No it's not archiving, But Why you think they should be archived. As all these commits are still valid and should be read in this case, so they should be active only.

xuzifu666 commented 6 months ago

No it's not archiving, But Why you think they should be archived. As all these commits are still valid and should be read in this case, so they should be active only.

yes it is not should archive

ad1happy2go commented 6 months ago

Sorry @xuzifu666 . Didn't understood. Do you want to say it should archive or not?

xuzifu666 commented 6 months ago

Sorry @xuzifu666 . Didn't understood. Do you want to say it should archive or not?

No problem,only want to confirm with you,thanks for your reply. Can close the issue.