apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

[SUPPORT] Transaction and spark job final state inconsistency in batch processing #9101

Closed KnightChess closed 1 year ago

KnightChess commented 1 year ago

code in: https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java#L215-L272

In this picture, we have submit instance commit success. But in two, when we trigger mayBeCleanAndArchive, it throw Exception, and make this job failed, it will retry in job level. But this commit has commit. So the final result is: instance commit success -> job failed and retry, and the success instance will not rollback. image

Including other places, I think will cause this problem. I think we need catch all exception after we commit instance success or extend the scope of a transaction. I prefer first catch all exception

To Reproduce

Steps to reproduce the behavior:

1. 2. 3. 4.

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

KnightChess commented 1 year ago

may be it work well in streaming processing, but In batch processing, may be some problem

KnightChess commented 1 year ago

@nsivabalan Can you help see this problem, similar issue #6679 in batch model.

KnightChess commented 1 year ago

In our case, we open occ for every job, and it throw exception when call CleanerUtils.rollbackFailedWrites in clean after commit instance success, the job failed and will retry. Our job is merge into spark sql which is not idempotent.

danny0405 commented 1 year ago

when we trigger mayBeCleanAndArchive, it throw Exception, and make this job failed, it will retry in job level. But this commit has commit. So the final result is: instance commit success -> job failed and retry, and the success instance will not rollback.

It should be okay the successful commit does not rollback, you mean the retry of spark job would result in another commit on the table? Did you have some clue why the job fails when cleaning?

KnightChess commented 1 year ago

@danny0405 yes, it will recompute again, there will be multiple commit instances in a offline task cycle like daily cycle, in this day, there should be one commit instance in theory, but has two commit instance.

Did you have some clue why the job fails when cleaning?

first job: we meet #7837, so it should be rollback in next batch etl (we open occ), second job: After instance commit, it will rollback failed instance if open occ, and there is something bug in our Rss(Remote shuffle service), so rollback will failed, and the job failed and spark will retry again, in this daily cycle will recompute again and create new commit instance based before instance which job is failed

danny0405 commented 1 year ago

Is there anyway the cleaning and archive does not trigger recomputation of the whole job?

KnightChess commented 1 year ago

Is there anyway the cleaning and archive does not trigger recomputation of the whole job?

timeline: 00000.commit 00001.commit

I think it is possible. The question is the table service or other processing logical is not in the scope of transaction. At the end of transaction, it mean hudi commit status is successful (00002.commit), but the engine job status is still running, and if it failed by other processing logical, the engine job will retry to make it success, so hudi will use the latest instance (00002.commit) snapshot to upsert, and commit again (00003.commit).

danny0405 commented 1 year ago

inline archiving and cleaning may have this issue, do you try the async cleaning instead? Is there any spark param to control the failover behavior, seems not very easy to fix from Hudi side.

danny0405 commented 1 year ago

Simplify catching exception makes sense to me if it can solve the problem, the data quality should take higher priority.

KnightChess commented 1 year ago

inline archiving and cleaning may have this issue, do you try the async cleaning instead? Is there any spark param to control the failover behavior, seems not very easy to fix from Hudi side.

in offlien batch processing, this way is not user friendly, they need to maintain multiple tasks for the table serviece, unless we have good operation and maintenance services for monitoring and management

KnightChess commented 1 year ago

Simplify catching exception makes sense to me if it can solve the problem, the data quality should take higher priority.

yes, but it has a problem, we will not be able to perceive the abnormal exception.

danny0405 commented 1 year ago

Okay, let's keep this issue open to enlighten with more ideas.

ad1happy2go commented 1 year ago

@KnightChess MERGE INTO should be idempotent(if using upsert as operation type), Can you please enlighten me under which scenario it can create data inconsistency issues? Only thing I can see that it may show records as false updates for the incremental or point-in-time queries.

OR Are you not using upsert(or no preCombineField provided)?

KnightChess commented 1 year ago

@ad1happy2go merge into will use DefaultPayload logical to judge the record update or not. If the preCombineFiled value is the same, it still can update target record.

KnightChess commented 1 year ago

and in some scenario, user will use current_time as preCombineFiled value

ad1happy2go commented 1 year ago

@KnightChess We had a discussion on this. For any process if the job being run twice it can create data inconsistencies for non-idempotent if it contains multiple actions/writes.

We may want to come up with solution for optimising which will help us to avoid running whole write/upsert again in case of table services failure. (May be a flag which may control )

Created Tracking JIRA - https://issues.apache.org/jira/browse/HUDI-6554

Downgrading the priority to Major as this may not be blocker for 0.14.0.

KnightChess commented 1 year ago

@ad1happy2go @danny0405 thank you answer