apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.45k stars 961 forks source link

[Bug] Assumption on the Consistency of Flink CatalogTable is Vulnerable #4372

Closed yunfengzhou-hub closed 1 month ago

yunfengzhou-hub commented 1 month ago

Search before asking

Paimon version

87321e13277bb32e909f4949bb0358eaffcf00e0

Compute Engine

Flink

Minimal reproduce step

Create a MaterializeTable and try to insert some data to it. When the code enters FlinkTableFactory#createDynamicTableSink, check context.getCatalogTable().getOrigin().

What doesn't meet your expectations?

The expected value in context.getCatalogTable().getOrigin() should be a CatalogMaterializedTable, while actually it is a CatalogTable.

Anything else?

The cause of the unexpected behavior above is as follows: When creating a Flink job that inserts into Paimon(Reading from Paimon has the same problem), Flink will first invoke PaimonCatalog's getTable method to acquire certain metadata about the table, and then invoke FlinkTableFactory#createDynamicTableSink to create a flink sink. Suppose the table that Paimon returned in getTable is tableA and the table passed into FlinkTableFactory#createDynamicTableSink is tableB, Paimon has wrongly assumed that tableB == tableA or tableB == tableA.copy(...). However, Flink Catalog and TableFactory API has not guaranteed this assumption. During the optimization process of Flink SQL Planner, Flink might create another table and pass it to PaimonCatalog. Flink is not doing it wrong so long as the behaviors of the public APIs(CatalogTable) does not change.

This false assumption resulted in the following codes. You can see that for a table acquired through context.getCatalogTable().getOrigin(), Paimon assumes that the table must be a certain child class of CatalogBaseTable.

// in AbstractFlinkTableFactory#createDynamicTableSource

if (origin instanceof SystemCatalogTable) {
    return new SystemTableSource(
            ((SystemCatalogTable) origin).table(),
            isStreamingMode,
            context.getObjectIdentifier());
} else {
// in AbstractFlinkTableFactory#buildPaimonTable

if (origin instanceof DataCatalogTable) {
    FileStoreTable fileStoreTable = (FileStoreTable) ((DataCatalogTable) origin).table();
    table = fileStoreTable.copyWithoutTimeTravel(newOptions);
} else {
    table =
            FileStoreTableFactory.create(createCatalogContext(context))
                    .copyWithoutTimeTravel(newOptions);
}

Thus it is necessary to get these implementations rid of the false assumption to avoid potential correctness problems.

Are you willing to submit a PR?

yunfengzhou-hub commented 1 month ago

A specific situation that might trigger failure out of this problem is as follows: Suppose there is a custom FileIO that can only be created in the corresponding custom Catalog, instead of being dynamically loaded through FileIOLoader. This FileIO would work normally in the following code's if branch, but would throw exception in the code's else branch.

// in AbstractFlinkTableFactory#buildPaimonTable

if (origin instanceof DataCatalogTable) {
    FileStoreTable fileStoreTable = (FileStoreTable) ((DataCatalogTable) origin).table();
    table = fileStoreTable.copyWithoutTimeTravel(newOptions);
} else {
    table =
            FileStoreTableFactory.create(createCatalogContext(context))
                    .copyWithoutTimeTravel(newOptions);
}