apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.27k stars 911 forks source link

[Bug] Using partial updated table array type field, when the array containing free value, insert the data error: Java. Lang. ArrayIndexOutOfBoundsException #4046

Closed dsanww closed 3 days ago

dsanww commented 3 weeks ago

Search before asking

Paimon version

paimon-flink-1.18-0.9-20240708.002315-31.jar

Compute Engine

flink1.18

Minimal reproduce step

step1: create TABLE if not exists default.result_t ( id int NOT NULL COMMENT '主键,全局唯一id', sex int , join_infos array<row<id int,name STRING,age INT>> COMMENT '品类id', update_time TIMESTAMP(0) COMMENT '更新时间', PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'bucket' = '1', 'changelog-producer' = 'lookup', 'merge-engine'='partial-update', 'fields.update_time.sequence-group'='join_infos', 'fields.join_infos.aggregate-function' = 'nested_update', 'fields.join_infos.nested-key' = 'id' ); step2: insert into default.result_t select 6, cast(null as int), array[cast(null as row<id int,name STRING,age INT>)], PROCTIME ();

The insert statement throws an exception: insert into default.result_t select 6, cast(null as int), array[row(1,'2',3)], PROCTIME ();

excpetion: java.io.IOException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:225) at org.apache.paimon.flink.sink.TableWriteOperator.prepareCommit(TableWriteOperator.java:121) at org.apache.paimon.flink.sink.RowDataStoreWriteOperator.prepareCommit(RowDataStoreWriteOperator.java:189) at org.apache.paimon.flink.sink.PrepareCommitOperator.emitCommittables(PrepareCommitOperator.java:100) at org.apache.paimon.flink.sink.PrepareCommitOperator.endInput(PrepareCommitOperator.java:88) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:750)

What doesn't meet your expectations?

no error

Anything else?

No response

Are you willing to submit a PR?

ArthurSXL8 commented 3 weeks ago

Hello, Could you please assign it to me?

ArthurSXL8 commented 2 weeks ago

@dsanww it can't be reproduced in my pc with flink 1.18 and paimon 0.9 compiled by master.It completed successfully, and the data is correct by query.Could you try it once more?

dsanww commented 2 weeks ago

@dsanww it can't be reproduced in my pc with flink 1.18 and paimon 0.9 compiled by master.It completed successfully, and the data is correct by query.Could you try it once more?

@ArthurSXL8 I tried to use the new version again(paimon-flink-1.18-0.9-20240826.002332-69.jar), but still an error will be reported. To be more specific, after inserting the null row type data, the subsequent insertion of non-null data, the data did not change again. I checked flink web ui and found that the insertion error will be reported:

error track: Caused by: java.io.IOException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:225) at org.apache.paimon.flink.sink.TableWriteOperator.prepareCommit(TableWriteOperator.java:121) at org.apache.paimon.flink.sink.RowDataStoreWriteOperator.prepareCommit(RowDataStoreWriteOperator.java:189) at org.apache.paimon.flink.sink.PrepareCommitOperator.emitCommittables(PrepareCommitOperator.java:100) at org.apache.paimon.flink.sink.PrepareCommitOperator.endInput(PrepareCommitOperator.java:88) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:750) Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1 at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.paimon.compact.CompactFutureManager.obtainCompactResult(CompactFutureManager.java:67) at org.apache.paimon.compact.CompactFutureManager.innerGetCompactionResult(CompactFutureManager.java:53) at org.apache.paimon.mergetree.compact.MergeTreeCompactManager.getCompactionResult(MergeTreeCompactManager.java:220) at org.apache.paimon.mergetree.MergeTreeWriter.trySyncLatestCompaction(MergeTreeWriter.java:323) at org.apache.paimon.mergetree.MergeTreeWriter.prepareCommit(MergeTreeWriter.java:275) at org.apache.paimon.operation.AbstractFileStoreWrite.prepareCommit(AbstractFileStoreWrite.java:210) at org.apache.paimon.operation.MemoryFileStoreWrite.prepareCommit(MemoryFileStoreWrite.java:149) at org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:253) at org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:220) ... 16 more Caused by: java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.paimon.format.parquet.writer.ParquetRowDataBuilder$ParquetWriteSupport.write(ParquetRowDataBuilder.java:77) at org.apache.paimon.format.parquet.writer.ParquetRowDataBuilder$ParquetWriteSupport.write(ParquetRowDataBuilder.java:56) at org.apache.paimon.shade.org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:138) at org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:103) at org.apache.paimon.format.parquet.writer.ParquetBulkWriter.addElement(ParquetBulkWriter.java:47) at org.apache.paimon.io.SingleFileWriter.writeImpl(SingleFileWriter.java:126) at org.apache.paimon.io.StatsCollectingSingleFileWriter.write(StatsCollectingSingleFileWriter.java:70) at org.apache.paimon.io.KeyValueDataFileWriter.write(KeyValueDataFileWriter.java:112) at org.apache.paimon.io.KeyValueDataFileWriter.write(KeyValueDataFileWriter.java:53) at org.apache.paimon.io.RollingFileWriter.write(RollingFileWriter.java:81) at org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.rewriteOrProduceChangelog(ChangelogMergeTreeRewriter.java:149) at org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.rewrite(ChangelogMergeTreeRewriter.java:106) at org.apache.paimon.mergetree.compact.MergeTreeCompactTask.rewriteImpl(MergeTreeCompactTask.java:157) at org.apache.paimon.mergetree.compact.MergeTreeCompactTask.rewrite(MergeTreeCompactTask.java:152) at org.apache.paimon.mergetree.compact.MergeTreeCompactTask.doCompact(MergeTreeCompactTask.java:105) at org.apache.paimon.compact.CompactTask.call(CompactTask.java:49) at org.apache.paimon.compact.CompactTask.call(CompactTask.java:34) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

sql client query: Flink SQL> insert into default.result_t

select 6, cast(null as int), array[cast(null as row<id int,name STRING,age INT>)], PROCTIME (); [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: b06f80d8d02e9046458097f477ce1d4b

Flink SQL> select * from default.result_t; +----+--------+------------+---------------------+ | id | sex | join_infos | update_time | +----+--------+------------+---------------------+ | 6 | | [NULL] | 2024-08-26 10:17:41 | +----+--------+------------+---------------------+ 1 row in set

Flink SQL> insert into default.result_t

select 6, cast(null as int), array[row(1,'2',3)], PROCTIME (); [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 575dc76faacd31944ad6bd5361f8aca5

Flink SQL> select * from default.result_t; +----+--------+------------+---------------------+ | id | sex | join_infos | update_time | +----+--------+------------+---------------------+ | 6 | | [NULL] | 2024-08-26 10:17:41 | +----+--------+------------+---------------------+ 1 row in set

Flink SQL> select * from default.result_t; +----+--------+------------+---------------------+ | id | sex | join_infos | update_time | +----+--------+------------+---------------------+ | 6 | | [NULL] | 2024-08-26 10:17:41 | +----+--------+------------+---------------------+ 1 row in set image

ArthurSXL8 commented 2 weeks ago

@dsanww it can't be reproduced in my pc with flink 1.18 and paimon 0.9 compiled by master.It completed successfully, and the data is correct by query.Could you try it once more?

@ArthurSXL8 I tried to use the new version again(paimon-flink-1.18-0.9-20240826.002332-69.jar), but still an error will be reported. To be more specific, after inserting the null row type data, the subsequent insertion of non-null data, the data did not change again. I checked flink web ui and found that the insertion error will be reported:

error track: Caused by: java.io.IOException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:225) at org.apache.paimon.flink.sink.TableWriteOperator.prepareCommit(TableWriteOperator.java:121) at org.apache.paimon.flink.sink.RowDataStoreWriteOperator.prepareCommit(RowDataStoreWriteOperator.java:189) at org.apache.paimon.flink.sink.PrepareCommitOperator.emitCommittables(PrepareCommitOperator.java:100) at org.apache.paimon.flink.sink.PrepareCommitOperator.endInput(PrepareCommitOperator.java:88) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:750) Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1 at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.paimon.compact.CompactFutureManager.obtainCompactResult(CompactFutureManager.java:67) at org.apache.paimon.compact.CompactFutureManager.innerGetCompactionResult(CompactFutureManager.java:53) at org.apache.paimon.mergetree.compact.MergeTreeCompactManager.getCompactionResult(MergeTreeCompactManager.java:220) at org.apache.paimon.mergetree.MergeTreeWriter.trySyncLatestCompaction(MergeTreeWriter.java:323) at org.apache.paimon.mergetree.MergeTreeWriter.prepareCommit(MergeTreeWriter.java:275) at org.apache.paimon.operation.AbstractFileStoreWrite.prepareCommit(AbstractFileStoreWrite.java:210) at org.apache.paimon.operation.MemoryFileStoreWrite.prepareCommit(MemoryFileStoreWrite.java:149) at org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:253) at org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:220) ... 16 more Caused by: java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.paimon.format.parquet.writer.ParquetRowDataBuilder$ParquetWriteSupport.write(ParquetRowDataBuilder.java:77) at org.apache.paimon.format.parquet.writer.ParquetRowDataBuilder$ParquetWriteSupport.write(ParquetRowDataBuilder.java:56) at org.apache.paimon.shade.org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:138) at org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:103) at org.apache.paimon.format.parquet.writer.ParquetBulkWriter.addElement(ParquetBulkWriter.java:47) at org.apache.paimon.io.SingleFileWriter.writeImpl(SingleFileWriter.java:126) at org.apache.paimon.io.StatsCollectingSingleFileWriter.write(StatsCollectingSingleFileWriter.java:70) at org.apache.paimon.io.KeyValueDataFileWriter.write(KeyValueDataFileWriter.java:112) at org.apache.paimon.io.KeyValueDataFileWriter.write(KeyValueDataFileWriter.java:53) at org.apache.paimon.io.RollingFileWriter.write(RollingFileWriter.java:81) at org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.rewriteOrProduceChangelog(ChangelogMergeTreeRewriter.java:149) at org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.rewrite(ChangelogMergeTreeRewriter.java:106) at org.apache.paimon.mergetree.compact.MergeTreeCompactTask.rewriteImpl(MergeTreeCompactTask.java:157) at org.apache.paimon.mergetree.compact.MergeTreeCompactTask.rewrite(MergeTreeCompactTask.java:152) at org.apache.paimon.mergetree.compact.MergeTreeCompactTask.doCompact(MergeTreeCompactTask.java:105) at org.apache.paimon.compact.CompactTask.call(CompactTask.java:49) at org.apache.paimon.compact.CompactTask.call(CompactTask.java:34) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

sql client query: Flink SQL> insert into default.result_t

select 6, cast(null as int), array[cast(null as row<id int,name STRING,age INT>)], PROCTIME (); [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: b06f80d8d02e9046458097f477ce1d4b

Flink SQL> select * from default.result_t; +----+--------+------------+---------------------+ | id | sex | join_infos | update_time | +----+--------+------------+---------------------+ | 6 | | [NULL] | 2024-08-26 10:17:41 | +----+--------+------------+---------------------+ 1 row in set

Flink SQL> insert into default.result_t

select 6, cast(null as int), array[row(1,'2',3)], PROCTIME (); [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 575dc76faacd31944ad6bd5361f8aca5

Flink SQL> select * from default.result_t; +----+--------+------------+---------------------+ | id | sex | join_infos | update_time | +----+--------+------------+---------------------+ | 6 | | [NULL] | 2024-08-26 10:17:41 | +----+--------+------------+---------------------+ 1 row in set

Flink SQL> select * from default.result_t; +----+--------+------------+---------------------+ | id | sex | join_infos | update_time | +----+--------+------------+---------------------+ | 6 | | [NULL] | 2024-08-26 10:17:41 | +----+--------+------------+---------------------+ 1 row in set image

@dsanww reproduced now, digging...

JingsongLi commented 2 weeks ago

@ArthurSXL8 @dsanww Do you have a more detailed exception stack trace? It seems we are missing a crucial part of the error stack.

dsanww commented 2 weeks ago

@ArthurSXL8 @dsanww Do you have a more detailed exception stack trace? It seems we are missing a crucial part of the error stack. taskmanager_container_e04_1719389044200_69157_01_000052_log.txt

@JingsongLi This is the log file for the taskmanager

davedwwang commented 4 days ago

@dsanww Problem solved?

dsanww commented 3 days ago

@dsanww Problem solved?

Not yet