apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.16k stars 854 forks source link

[Bug] The partition expire was not correctly triggered during the commit execution. #3434

Open otherhy opened 1 month ago

otherhy commented 1 month ago

Search before asking

Paimon version

0.8

Compute Engine

JavaAPI

Minimal reproduce step

According to the examples in the documentation, this is how we perform the commit. Taking BatchWrite as an example, after writing the data, we need to create a new instance of the BatchTableCommit class each time to execute the commit.

// 3. Collect all CommitMessages to a global node and commit
BatchTableCommit commit = writeBuilder.newCommit();
commit.commit(messages);

Based on the following call chain, we can infer that each time a BatchTableCommit instance is created, a TableCommitImpl instance is ultimately created within it. In its constructor, a PartitionExpire instance is passed as a parameter.

- org.apache.paimon.table.sink.BatchWriteBuilderImpl#newCommit
  - org.apache.paimon.table.AbstractFileStoreTable#newCommit(java.lang.String)
    - org.apache.paimon.table.AbstractFileStoreTable#newCommit(java.lang.String, java.lang.String)

Based on the following code, we can infer that each time a TableCommitImpl instance is created, the org.apache.paimon.AbstractFileStore#newPartitionExpire method is called to create a new PartitionExpire instance.

return new TableCommitImpl(
                store().newCommit(commitUser, branchName),
                createCommitCallbacks(),
                snapshotExpire,

                // creates a new PartitionExpire instance
                options.writeOnly() ? null : store().newPartitionExpire(commitUser),

                options.writeOnly() ? null : store().newTagCreationManager(),
                catalogEnvironment.lockFactory().create(),
                CoreOptions.fromMap(options()).consumerExpireTime(),
                new ConsumerManager(fileIO, path),
                coreOptions().snapshotExpireExecutionMode(),
                name(),
                coreOptions().forceCreatingSnapshot());

Based on the constructor of the PartitionExpire class, we can infer that when the instance is initialized, lastCheck is set to the current time.

public PartitionExpire(
            RowType partitionType,
            Duration expirationTime,
            Duration checkInterval,
            String timePattern,
            String timeFormatter,
            FileStoreScan scan,
            FileStoreCommit commit) {
        this.partitionKeys = partitionType.getFieldNames();
        this.toObjectArrayConverter = new RowDataToObjectArrayConverter(partitionType);
        this.expirationTime = expirationTime;
        this.checkInterval = checkInterval;
        this.timeExtractor = new PartitionTimeExtractor(timePattern, timeFormatter);
        this.scan = scan;
        this.commit = commit;

        // Initialize lastCheck to the current time.
        this.lastCheck = LocalDateTime.now();
    }

After BatchTableCommit is created, based on the example, we immediately start the commit. When the commit is completed, the org.apache.paimon.operation.PartitionExpire#expire(long) method of the PartitionExpire instance is called, as shown in the following code, to check for partition expiration.

public void expire(long commitIdentifier) {
        expire(LocalDateTime.now(), commitIdentifier);
    }

@VisibleForTesting
    void expire(LocalDateTime now, long commitIdentifier) {
        if (now.isAfter(lastCheck.plus(checkInterval))) {
            doExpire(now.minus(expirationTime), commitIdentifier);
            lastCheck = now;
        }
    }

But at this time, lastCheck is set to now because it was just initialized. Using the default value checkInterval=1h as an example, lastCheck.plus(checkInterval) would be one hour later. Therefore, now.isAfter(lastCheck.plus(checkInterval)) always results in false, causing the partition expiration to be skipped. And because the BatchTableCommit can only perform a single commit, the next time we execute a commit, we will use a brand new PartitionExpire instance. This causes our commits to always fail to trigger the partition expiration check.

Please help me check if my logic is correct or if there is an issue with my usage.

What doesn't meet your expectations?

The partition expiration parameters set on the table did not take effect because they were not correctly triggered during the commit.

Anything else?

No response

Are you willing to submit a PR?

otherhy commented 1 month ago

If my logic is correct, then I believe this issue is caused by the PartitionExpire instance not being reused. It can be resolved by recording the instance object in the upper layer AbstractFileStoreTable and reusing this instance object each time a TableCommitImpl instance is created.

I look forward to your response and feedback.

AkisAya commented 1 month ago

in a stream env, StoreCommitter holds TableCommitImpl, so expiration of partition works as expected。But in a batch env, if you create TableCommitImpl every time, sure there will be a new PartitionExpire every time and PartitionExpire#expire() will not be executed。but in both stream env and batch env, paimon uses TableCommitImpl to commit changes, there's no other way to differ it's a batch env or stream env in TableCommitImpl. But it does have a isStreamingMode which is used in table.newWrite() but not used in table.newCommit(), see withExecutionMode

# BatchWriteBuilderImpl
    @Override
    public BatchTableWrite newWrite() {
        return table.newWrite(commitUser)
                .withIgnorePreviousFiles(staticPartition != null)
                .withExecutionMode(false);
    }

    @Override
    public BatchTableCommit newCommit() {
        InnerTableCommit commit = table.newCommit(commitUser).withOverwrite(staticPartition);
        commit.ignoreEmptyCommit(true);
        return commit;
    }

maybe isStreamingMode should be set when create a TableCommitImpl so a TableCommitImpl can differ stream and batch env and do diffrent things when expire a partition.

Overall, Paimon is a stream-first computing engine, and some designs do not take batch processing into consideration. or maybe they want you to use a seperate Partition Expiration to doExpire. just like what they do in PartitionExpireTest, to new a Expire to do Expiration. And maybe there should be a PartitionExpirationProcedure for spark engine too.

LinMingQiang commented 1 month ago

Just need add parameter isEndInput for committer to distinguish batch or streaming mode.