apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.4k stars 946 forks source link

[Bug] Flink 1.17.1 use 'changelog-producer' = 'lookup' delete sql error , Can not use lookup, there is no temp disk directory to use #1508

Closed chestnutqiang closed 6 months ago

chestnutqiang commented 1 year ago

Search before asking

Paimon version

paimon-flink-1.17-0.5-20230706.002547-73.jar

Compute Engine

Flink 1.17

Minimal reproduce step

CREATE TABLE p_t1 ( a int, b string, PRIMARY KEY (a) NOT ENFORCED ) with( 'bucket' = '4', 'write-buffer-size' = '1mb', 'changelog-producer' = 'lookup' );

INSERT INTO p_t1 VALUES (1, 'Table'), (2, 'Store');

delete from p_t1 where a=1;

What doesn't meet your expectations?

olumns: [], schema: struct<_KEY_a:int,_SEQUENCE_NUMBER:bigint,_VALUE_KIND:tinyint,a:int,b:string>, includeAcidColumns: true}
@
"flink-root-sql-client-dev-flinkx03.log" 3965L, 492410C                                                                                                         3641,1        92%
        at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:267)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Can not use lookup, there is no temp disk directory to use.
        at org.apache.paimon.table.TableUtils.deleteWhere(TableUtils.java:67)
        at org.apache.paimon.flink.sink.FlinkTableSink.executeDeletion(FlinkTableSink.java:184)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:898)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:874)
        at org.apache.flink.table.gateway.service.operation.OperationExecutor.callModifyOperations(OperationExecutor.java:509)
        at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:425)
        at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
        at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
        at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
        at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
        ... 7 more
Caused by: java.lang.RuntimeException: Can not use lookup, there is no temp disk directory to use.
        at org.apache.paimon.operation.KeyValueFileStoreWrite.createLookupLevels(KeyValueFileStoreWrite.java:248)
        at org.apache.paimon.operation.KeyValueFileStoreWrite.createRewriter(KeyValueFileStoreWrite.java:229)
        at org.apache.paimon.operation.KeyValueFileStoreWrite.createCompactManager(KeyValueFileStoreWrite.java:194)
        at org.apache.paimon.operation.KeyValueFileStoreWrite.createWriter(KeyValueFileStoreWrite.java:164)
        at org.apache.paimon.operation.KeyValueFileStoreWrite.createWriter(KeyValueFileStoreWrite.java:71)
        at org.apache.paimon.operation.AbstractFileStoreWrite.createWriterContainer(AbstractFileStoreWrite.java:327)
        at org.apache.paimon.operation.AbstractFileStoreWrite.lambda$getWriterWrapper$2(AbstractFileStoreWrite.java:307)
        at java.util.HashMap.computeIfAbsent(HashMap.java:1126)
        at org.apache.paimon.operation.AbstractFileStoreWrite.getWriterWrapper(AbstractFileStoreWrite.java:306)
        at org.apache.paimon.operation.AbstractFileStoreWrite.write(AbstractFileStoreWrite.java:107)
        at org.apache.paimon.table.sink.TableWriteImpl.writeAndReturn(TableWriteImpl.java:102)
        at org.apache.paimon.table.sink.TableWriteImpl.write(TableWriteImpl.java:97)
        at org.apache.paimon.table.TableUtils.deleteWhere(TableUtils.java:60)
        ... 16 more

End of exception on server side>]
        at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536) ~[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516) ~[flink-dist-1.17.1.jar:1.17.1]
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) ~[?:1.8.0_144]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_144]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) ~[?:1.8.0_144]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_144]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_144]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_144]

Anything else?

image image image

Whether IOManager should be non-null, we can put it inside the constructor?

Are you willing to submit a PR?

JingsongLi commented 1 year ago

Thanks @ChestnutQiang for reporting. Maybe it is OK to disable TableUtils.deleteWhere pushdown for lookup and full-compaction changelog-producer.

Pandas886 commented 9 months ago

Thanks @chestnutqiang for reporting. 感谢您的报道。 Maybe it is OK to disable TableUtils.deleteWhere pushdown for lookup and full-compaction changelog-producer. 也许可以禁用 TableUtils.deleteWhere pushdown进行查找和完全压缩changelog-producer。

I meet same error , how to solve ?Can I set some properties to ensure the normal operation of the program?