apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.23k stars 2.39k forks source link

[SUPPORT] split_reader don't checkpoint before consuming all splits #8087

Open 1032851561 opened 1 year ago

1032851561 commented 1 year ago

split_reader don't checkpoint before consuming all splits

Describe the problem you faced

When using flink to incrementally query a mor table with many splits(read.start-commit=earliest),The first checkpoint of the reader is successful after all splits are consumed. It takes a lot of time to cause checkpoint timeout. In my understanding, the process of reading data based on mini batch can trigger and complete the checkpoint.

image

To Reproduce

Steps to reproduce the behavior:

  1. mor table with many splits
  2. read.start-commit=ealiest
  3. read.tasks=1
  4. checkpoint interval=10s and checkpoint timeout=10min

Environment Description

Stacktrace

Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.

codope commented 1 year ago

@danny0405 Can you please take a look?

danny0405 commented 1 year ago

The first checkpoint of the reader is successful after all splits are consumed. It takes a lot of time to cause checkpoint timeout.

So which checkpoint is timed out? The StreamReadOperator would cache all the input splits when they are not consumed, if the reader starts reading from the earliest, the checkpoint data set can be large, how much is the ckp data size for the successfull ckp of the StreamReadOperator ?

1032851561 commented 1 year ago

The first checkpoint of the reader is successful after all splits are consumed. It takes a lot of time to cause checkpoint timeout.

So which checkpoint is timed out? The StreamReadOperator would cache all the input splits when they are not consumed, if the reader starts reading from the earliest, the checkpoint data set can be large, how much is the ckp data size for the successfull ckp of the StreamReadOperator ?

The first checkpoint is timed out , because of StreamReadOperator don't trigger until the all cache splits is consumed . The number of splits is about 1200 , so the ckp data size should be small.

Now I control the upper limit of the number of data read per checkpoint interval, then it looks gook.

private void enqueueProcessSplits() {
    if (maxConsumeRecordsPerCkp > 0 && consumedRecordsBetweenCkp > maxConsumeRecordsPerCkp)
            return;  //reach max consume records in this checkpoint interval
}

private void consumeAsMiniBatch(MergeOnReadInputSplit split) throws IOException {
         consumedRecordsBetweenCkp += 1L;
         split.consume();
}

public void snapshotState(StateSnapshotContext context) throws Exception {
        consumedRecordsBetweenCkp = 0L;  // reset when a new checkpoint coming.
}
danny0405 commented 1 year ago

Can you try to fire a fix here?

1032851561 commented 1 year ago

Can you try to fire a fix here?

I will fire a fix. But what I don't understand is why MailboxExecutor doesn't work as expected . On the other hand , if StreamReadOperator restore from checkpoint after a long time , the cache splits have be removed from filesystem , so it will alway fail ?

danny0405 commented 1 year ago

Yes, you are right.

danny0405 commented 1 year ago

I will fire a fix. But what I don't understand is why MailboxExecutor doesn't work as expected

Needs to dig into the backround before we fire a fix.

1032851561 commented 1 year ago

I will fire a fix. But what I don't understand is why MailboxExecutor doesn't work as expected

Needs to dig into the backround before we fire a fix.

After debugging, i know MailBoxExecutor and StreamReadOperator are in same thread, so it is not a bug. Split_reader each receives a split, immediately call enqueueProcessSplits to submit a command to MailboxExecutor , the thread is run as below :

time 1:    receive split1
time 2:    consumeAsMiniBatch(split1)
time 3:    receive split2
time 4:    consumeAsMiniBatch(split1)
time 5:    consumeAsMiniBatch(split2)
time 6:    receive split3
.......
time N:    first barrier is comming (checkpoint already timeout)

We should not block the arrival of the barrier. Whether we can receive the last split to read data?

org.apache.hudi.source.StreamReadMonitoringFunction#monitorDirAndForwardSplits: 

    for (int i = 0; i < result.getInputSplits().size(); i++) {
      MergeOnReadInputSplit split = result.getInputSplits().get(i);
      split.setLast(i == (result.getInputSplits().size()-1));
      context.collect(split);
    }

org.apache.hudi.source.StreamReadOperator#processElement:

  public void processElement(StreamRecord<MergeOnReadInputSplit> element) {
    splits.add(element.getValue());
    if (element.getValue().isLast()) {
      enqueueProcessSplits();
    }
  }
danny0405 commented 1 year ago

Thanks, here the question is: why the checkpoint barrier can not cham in earlier? Like in a time point within the range time1 ~ time6

1032851561 commented 1 year ago

Thanks, here the question is: why the checkpoint barrier can not cham in earlier? Like in a time point within the range time1 ~ time6

In my case, the first increment queries(start-commint=earilest) products about 1200 MergeOnReadInputSplit, the first checkpoint barrier was generated after all the split is sent to StreamReadOperator.

danny0405 commented 1 year ago

So it's the StreamReadMonitoringFunction that blocks the checkpoint barrier while distributing the input splits, not the split_reader

1032851561 commented 1 year ago

So it's the StreamReadMonitoringFunction that blocks the checkpoint barrier while distributing the input splits, not the split_reader

No, StreamReadMonitoringFunction collect the splits quickly and it finish first checkpoint only a few milliseconds. Due to split_reader cahce MergeOnReadInputSplit and read data at the same time the same thread, so is it blocked barrier alignment.

image

danny0405 commented 1 year ago

But for each input split, we only comsume a 2014 records batch a time, the checkpoint barrier should fetch the checkpoint lock while one batch consumption is done ?

1032851561 commented 1 year ago

But for each input split, we only comsume a 2014 records batch a time, the checkpoint barrier should fetch the checkpoint lock while one batch consumption is done ?

I think checkpoint lock is not used here. Using mailbox instead of, the mini batch consumption was packaged as mail to submit to TaskMailBox. StreamTask#runMailboxLoop -> MailboxProcessor#runMailboxLoop run as below:

        while (true) {
            processMail(localMailbox, false);  // 1. process current all mails (mini batch consumption)
            mailboxDefaultAction.runDefaultAction(defaultActionContext);  // 2. processElement(MergeOnReadInputSplit)  or process barriers
        }

There are two things will submit a “mini batch consumption” mail to MailBox:

  1. StreamReadOperator#processElement
  2. after consumeAsMiniBatch

The first checkpoint barrier is behind 1200 splits. When it is comming, split_read has consumed many mini batchs(maybe not all split), it takes a long time.

danny0405 commented 1 year ago

The first checkpoint barrier is behind 1200 splits.

So you are talking about that the barrier is queued up behind of these input splits, in #processElement, we just put the input split into a local queue, and then call up an async task through #enqueueProcessSplits to ask for the checkpoint lock for split processing, so the split processing and split cache/barrier are handle in different thread, why like you said, the barrier can be blocked?

When a split is in processing, the flag currentSplitState should be SplitState.RUNNING then the #enqueueProcessSplits should return as fast, the checkoint barrier could find a chance to jump in the mail box queue.

1032851561 commented 1 year ago

The first checkpoint barrier is behind 1200 splits.

So you are talking about that the barrier is queued up behind of these input splits, in #processElement, we just put the input split into a local queue, and then call up an async task through #enqueueProcessSplits to ask for the checkpoint lock for split processing, so the split processing and split cache/barrier are handle in different thread, why like you said, the barrier can be blocked?

When a split is in processing, the flag currentSplitState should be SplitState.RUNNING then the #enqueueProcessSplits should return as fast, the checkoint barrier could find a chance to jump in the mail box queue.

processElement,snapshotState,processSplits are handle in same thread, each split cache trigger at least a mini batch consume.

image image image

danny0405 commented 1 year ago

processElement,snapshotState,processSplits are handle in same thread

You are right, they are all executed in the task thread, how about we use our own custom thread instead of the mailbox executor, the mailbox executor does not really executes the task, instead it puts a new task mail into the mailbox.

1032851561 commented 1 year ago

processElement,snapshotState,processSplits are handle in same thread

You are right, they are all executed in the task thread, how about we use our own custom thread instead of the mailbox executor, the mailbox executor does not really executes the task, instead it puts a new task mail into the mailbox.

Yes, my first idea is to use a separate thread to process the splits, I am trying to now. just like :

SplitProcessThread :
        while (running) {
            try {
                processSplits();
            } catch (IOException e) {
                LOG.error("process splits wrong", e);
                executor.execute(StreamReadOperator::splitProcessException, "process splits wrong");
                throw new RuntimeException(e);
            }
        }

StreamReadOperator:
   splitProcessException(){
         throw new Exception("split process thread  exist.").
   }
danny0405 commented 1 year ago

Should be fine, we need to test it in practice about the performace and whether it resolves the problem that ckp takes too long time to be timedout.

1032851561 commented 1 year ago

Should be fine, we need to test it in practice about the performace and whether it resolves the problem that ckp takes too long time to be timedout.

I have tested my case, and it looks good. Can i create a pr then you review it? image

danny0405 commented 1 year ago

Can i create a pr then you review it

Sure, but let's test the PR in production for at least one week, we also need to test the failover/restart for data completeness.