apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.39k stars 942 forks source link

[Bug] When scan.mode is set to TimeTravel semantics, getTable from Catalog will fail #731

Closed liming30 closed 1 year ago

liming30 commented 1 year ago

Search before asking

Paimon version

0.4

Compute Engine

flink-1.16

Minimal reproduce step

Execute the following sql in sql client:

CREATE CATALOG ts_catalog WITH (
    'type' = 'paimon',
    'warehouse' = '/test-data/e1d13ddf-7423-4075-91b2-ca5f7a18b9fd.store'
);
USE CATALOG ts_catalog;
CREATE TABLE IF NOT EXISTS ts_table (
    k VARCHAR,
    v INT,
    PRIMARY KEY (k) NOT ENFORCED
) WITH (
    'bucket' = '2',
    'log.consistency' = 'eventual',
    'log.system' = 'kafka',
    'kafka.bootstrap.servers' = 'kafka:9092',
    'scan.mode' = 'from-snapshot',
    'scan.snapshot-id' = '1',
    'kafka.topic' = 'ts-topic-b6a8e1c8-003e-4ef9-a5cc-16979a0ce56b'
);

INSERT INTO result1 SELECT * FROM ts_table;

What doesn't meet your expectations?

Creating a Table with timeTravel semantics should not report an error. But I get the following exception:

Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse statement: SELECT * FROM ts_table
;
    at org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174) ~[flink-sql-client-1.16.1.jar:1.16.1]
    at org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45) ~[flink-sql-client-1.16.1.jar:1.16.1]
    at org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71) ~[flink-sql-client-1.16.1.jar:1.16.1]
    at org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) ~[flink-sql-client-1.16.1.jar:1.16.1]
    at org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) ~[flink-sql-client-1.16.1.jar:1.16.1]
    at org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) ~[flink-sql-client-1.16.1.jar:1.16.1]
    at org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:295) [flink-sql-client-1.16.1.jar:1.16.1]
    at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:280) [flink-sql-client-1.16.1.jar:1.16.1]
    at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:228) [flink-sql-client-1.16.1.jar:1.16.1]
    at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) [flink-sql-client-1.16.1.jar:1.16.1]
    at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) [flink-sql-client-1.16.1.jar:1.16.1]
    at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) [flink-sql-client-1.16.1.jar:1.16.1]
    at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) [flink-sql-client-1.16.1.jar:1.16.1]
Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. Can not create a Path from a null string
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186) ~[?:?]
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) ~[?:?]
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261) ~[?:?]
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) ~[?:?]
    at org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) ~[flink-sql-client-1.16.1.jar:1.16.1]
    ... 12 more
Caused by: java.lang.IllegalArgumentException: Can not create a Path from a null string
    at org.apache.paimon.fs.Path.checkPathArg(Path.java:128) ~[paimon-flink.jar:0.4-SNAPSHOT]
    at org.apache.paimon.fs.Path.<init>(Path.java:142) ~[paimon-flink.jar:0.4-SNAPSHOT]
    at org.apache.paimon.CoreOptions.path(CoreOptions.java:564) ~[paimon-flink.jar:0.4-SNAPSHOT]
    at org.apache.paimon.CoreOptions.path(CoreOptions.java:560) ~[paimon-flink.jar:0.4-SNAPSHOT]
    at org.apache.paimon.AbstractFileStore.snapshotManager(AbstractFileStore.java:73) ~[paimon-flink.jar:0.4-SNAPSHOT]
    at org.apache.paimon.table.AbstractFileStoreTable.snapshotManager(AbstractFileStoreTable.java:178) ~[paimon-flink.jar:0.4-SNAPSHOT]
    at org.apache.paimon.table.AbstractFileStoreTable.tryTimeTravel(AbstractFileStoreTable.java:196) ~[paimon-flink.jar:0.4-SNAPSHOT]
    at org.apache.paimon.table.AbstractFileStoreTable.copy(AbstractFileStoreTable.java:132) ~[paimon-flink.jar:0.4-SNAPSHOT]
    at org.apache.paimon.table.FileStoreTableFactory.create(FileStoreTableFactory.java:86) ~[paimon-flink.jar:0.4-SNAPSHOT]
    at org.apache.paimon.table.FileStoreTableFactory.create(FileStoreTableFactory.java:69) ~[paimon-flink.jar:0.4-SNAPSHOT]
    at org.apache.paimon.catalog.AbstractCatalog.getDataTable(AbstractCatalog.java:62) ~[paimon-flink.jar:0.4-SNAPSHOT]
    at org.apache.paimon.catalog.AbstractCatalog.getTable(AbstractCatalog.java:56) ~[paimon-flink.jar:0.4-SNAPSHOT]
    at org.apache.paimon.flink.FlinkCatalog.getTable(FlinkCatalog.java:163) ~[paimon-flink.jar:0.4-SNAPSHOT]
    at org.apache.paimon.flink.FlinkCatalog.getTable(FlinkCatalog.java:71) ~[paimon-flink.jar:0.4-SNAPSHOT]
    at org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408) ~[flink-table-api-java-uber-1.16.1.jar:1.16.1]
    at org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:364) ~[flink-table-api-java-uber-1.16.1.jar:1.16.1]
    at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:73) ~[?:?]
    at org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83) ~[?:?]
    at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) ~[?:?]
    at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143) ~[?:?]
    at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99) ~[?:?]
    at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203) ~[?:?]
    at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112) ~[?:?]
    at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184) ~[?:?]
    at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) ~[?:?]
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) ~[?:?]
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975) ~[?:?]
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3085) ~[?:?]
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070) ~[?:?]
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335) ~[?:?]
    at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) ~[?:?]
    at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) ~[?:?]
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) ~[?:?]
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975) ~[?:?]
    at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) ~[?:?]
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952) ~[?:?]
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704) ~[?:?]
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182) ~[?:?]
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) ~[?:?]
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261) ~[?:?]
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) ~[?:?]
    at org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) ~[flink-sql-client-1.16.1.jar:1.16.1]
    ... 12 more

Anything else?

The reason for the exception is that the table with timeTravel semantics needs to check the snapshot. https://github.com/apache/incubator-paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java#L132

AbstractFileStoreTable#store will generator the options from the schema of the table, which does not contain the PATH configuration, so this exception occurs.

At the same time, I found that in many unit tests, if the scan.mode of the table is configured as the type of timeTravel, the exception will occur. For example FlinkCatalogTest#testCreateTable_Streaming

Should we always put the PATH configuration in the table schema?

Are you willing to submit a PR?

JingsongLi commented 1 year ago

Thanks @liming30 for reporting. Yes this is bug, we should throw TableNotExistException for catalog.

liming30 commented 1 year ago

@JingsongLi , hi, I think this is not a problem that the table does not exist. It is because we are using the old TableSchema to generate the SnapshotManager, and the old TableSchema does not contain PATH (although we put PATH in the new TableSchema).

What about setting the PATH for the TableSchema when creating the FileStoreTable?

JingsongLi commented 1 year ago

@JingsongLi , hi, I think this is not a problem that the table does not exist. It is because we are using the old TableSchema to generate the SnapshotManager, and the old TableSchema does not contain PATH (although we put PATH in the new TableSchema).

What about setting the PATH for the TableSchema when creating the FileStoreTable?

But set path has already done here.

liming30 commented 1 year ago

But set path has already done here.

@JingsongLi In tryTimeTravel, we still use the old tableSchema to initialize the SnapshotManager, where the PATH does not exist.

JingsongLi commented 1 year ago

But set path has already done here.

@JingsongLi In tryTimeTravel, we still use the old tableSchema to initialize the SnapshotManager, where the PATH does not exist.

Thanks, PR reviewed