apache / amoro

Apache Amoro (incubating) is a Lakehouse management system built on open data lake formats.
https://amoro.apache.org/
Apache License 2.0
875 stars 291 forks source link

[Bug]: Failed to commit changes using rename: s3a://xxxx/change/metadata/v366.metadata.json #2472

Closed luohaifang closed 10 months ago

luohaifang commented 11 months ago

What happened?

hi~ Environmental configuration: flink: 1.15.4 store: minio ams: 0.4.1 table format: Mixed Iceberg metastore: Aarctic metastore

My operations: read oracle table, write to arctic lake table. the oracle table about 140,000,000 items data, after filtering based on time, about 10,000,000 pieces of data will be written to arctic.

When 3/4 is write to arctic, I will get this question:

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.netease.arctic.flink.interceptor.KerberosInvocationHandler.lambda$invoke$0(KerberosInvocationHandler.java:58)
    at com.netease.arctic.table.TableMetaStore.doAsUgi(TableMetaStore.java:365)
    at com.netease.arctic.table.TableMetaStore.lambda$doAs$0(TableMetaStore.java:345)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:360)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1742)
    at com.netease.arctic.table.TableMetaStore.doAs(TableMetaStore.java:345)
    at com.netease.arctic.io.ArcticHadoopFileIO.doAs(ArcticHadoopFileIO.java:184)
    at com.netease.arctic.flink.interceptor.KerberosInvocationHandler.invoke(KerberosInvocationHandler.java:55)
    at com.sun.proxy.$Proxy58.initializeState(Unknown Source)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:703)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:679)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:646)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.netease.arctic.flink.interceptor.KerberosInvocationHandler.lambda$invoke$0(KerberosInvocationHandler.java:58)
    at com.netease.arctic.table.TableMetaStore.doAsUgi(TableMetaStore.java:365)
    at com.netease.arctic.table.TableMetaStore.lambda$doAs$0(TableMetaStore.java:345)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:360)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1742)
    at com.netease.arctic.table.TableMetaStore.doAs(TableMetaStore.java:345)
    at com.netease.arctic.io.ArcticHadoopFileIO.doAs(ArcticHadoopFileIO.java:184)
    at com.netease.arctic.flink.interceptor.KerberosInvocationHandler.invoke(KerberosInvocationHandler.java:55)
    at com.sun.proxy.$Proxy58.initializeState(Unknown Source)
    ... 24 more
Caused by: com.netease.arctic.shaded.org.apache.iceberg.exceptions.CommitFailedException: Failed to commit changes using rename: s3a://datalake/lakehouse/cusdec_catalog/src/T_ONEWINX_EDI_HEAD/change/metadata/v366.metadata.json
    at com.netease.arctic.shaded.org.apache.iceberg.hadoop.HadoopTableOperations.renameToFinal(HadoopTableOperations.java:357)
    at com.netease.arctic.shaded.org.apache.iceberg.hadoop.HadoopTableOperations.commit(HadoopTableOperations.java:158)
    at com.netease.arctic.op.ArcticHadoopTableOperations.lambda$commit$1(ArcticHadoopTableOperations.java:48)
    at com.netease.arctic.table.TableMetaStore.doAsUgi(TableMetaStore.java:365)
    at com.netease.arctic.table.TableMetaStore.lambda$doAs$0(TableMetaStore.java:345)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:360)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1742)
    at com.netease.arctic.table.TableMetaStore.doAs(TableMetaStore.java:345)
    at com.netease.arctic.io.ArcticHadoopFileIO.doAs(ArcticHadoopFileIO.java:184)
    at com.netease.arctic.op.ArcticHadoopTableOperations.commit(ArcticHadoopTableOperations.java:46)
    at com.netease.arctic.shaded.org.apache.iceberg.BaseTransaction.lambda$commitSimpleTransaction$5(BaseTransaction.java:356)
    at com.netease.arctic.shaded.org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404)
    at com.netease.arctic.shaded.org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:214)
    at com.netease.arctic.shaded.org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:198)
    at com.netease.arctic.shaded.org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:190)
    at com.netease.arctic.shaded.org.apache.iceberg.BaseTransaction.commitSimpleTransaction(BaseTransaction.java:348)
    at com.netease.arctic.shaded.org.apache.iceberg.BaseTransaction.commitTransaction(BaseTransaction.java:238)
    at com.netease.arctic.op.ArcticUpdate.commit(ArcticUpdate.java:137)
    at com.netease.arctic.shaded.org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitOperation(IcebergFilesCommitter.java:314)
    at com.netease.arctic.shaded.org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitDeltaTxn(IcebergFilesCommitter.java:278)
    at com.netease.arctic.shaded.org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:220)
    at com.netease.arctic.shaded.org.apache.iceberg.flink.sink.IcebergFilesCommitter.initializeState(IcebergFilesCommitter.java:155)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283)
    ... 38 more

Same, oracle table have 400,000,000 items data, when written 300,000,000 data, this problem will also occur.

The following is my execution statement: spark sql, create arctic table:

CREATE TABLE  src.T_ONEWINX_EDI_HEAD (
    ID DECIMAL(16, 0) ,
    ...150 column...
    primary key (ID)
) using arctic
TBLPROPERTIES ('write.upsert.enabled' = 'true')

flink sql, execute write to arctic:

CREATE CATALOG cusdec_catalog WITH (
  'type'='arctic',
  'metastore.url'='thrift://xxxx:1260/cusdec_catalog'
);

SET parallelism.default=8;

CREATE TABLE IF NOT EXISTS `CUSDEC.T_ONEWINX_EDI_HEAD`(
    `ID` DECIMAL(16, 0) COMMENT 'ID号',
    ...150 column...
    PRIMARY KEY (`ID`) NOT ENFORCED
) WITH (
    'scan.fetch-size' = '1000000',
    'connector' = 'jdbc',
    'url' = 'jdbc:oracle:thin:@//xxx:1521/xxx',
    'username' = 'xxx',
    'password' = 'xxx',
    'table-name' = 'CUSDEC.T_ONEWINX_EDI_HEAD'
);

INSERT INTO `cusdec_catalog`.`src`.`T_ONEWINX_EDI_HEAD`
    SELECT 
    CAST(`ID` as DECIMAL(16, 0)) AS `ID`,
    ...150 column...
    FROM `CUSDEC.T_ONEWINX_EDI_HEAD` 
 where UPDATE_TIME < timestamp '2020-01-01 00:00:00';

Excuse me, what is this question.

Affects Versions

0.4.1

What engines are you seeing the problem on?

No response

How to reproduce

No response

Relevant log output

No response

Anything else

No response

Are you willing to submit a PR?

Code of Conduct

luohaifang commented 11 months ago

It seems to be an iceberg problem

zhoujinsong commented 11 months ago

@luohaifang Thanks a lot for trying Amoro and bringing this feedback.

Amoro 0.4.1 is a really old version for now. In version 0.4.1, there are still many limitations in the implementation of Amoro. For example, the Mixed Iceberg Format can only be built based on the Iceberg Hadoop Catalog, which is not suitable for use on S3 storage because rename operations cannot be used on S3 to ensure atomic commits. Based on the information you provided, I can't be entirely certain that this is due to using the Iceberg Hadoop Catalog on S3, but I suspect it is highly related.

I suggest you upgrade Amoro to the latest version: 0.6.0 or master, and test this scenario again. In version 0.6.0, Amoro now supports building Mixed Iceberg Format on all Iceberg Catalogs.

luohaifang commented 11 months ago

@luohaifang Thanks a lot for trying Amoro and bringing this feedback.

Amoro 0.4.1 is a really old version for now. In version 0.4.1, there are still many limitations in the implementation of Amoro. For example, the Mixed Iceberg Format can only be built based on the Iceberg Hadoop Catalog, which is not suitable for use on S3 storage because rename operations cannot be used on S3 to ensure atomic commits. Based on the information you provided, I can't be entirely certain that this is due to using the Iceberg Hadoop Catalog on S3, but I suspect it is highly related.

I suggest you upgrade Amoro to the latest version: 0.6.0 or master, and test this scenario again. In version 0.6.0, Amoro now supports building Mixed Iceberg Format on all Iceberg Catalogs.

Why I get "Not support Metastore [ams], Storage Type [S3], Table Format [MIXED_ICEBERG]". Does "Internal Catalog" mean "Amoro Metastore"? There seems to be no difference from version "0.4.1". 17036570637320

If catalog type is "Internal Catalog", and storage type is Hadoop, I suspect this problem will still occur. my core-site.xml is :

<configuration>
    <property>
        <name>fs.s3a.connection.ssl.enabled</name>
        <value>false</value>
    </property>
    <property>
        <name>fs.s3a.endpoint</name>
        <value>http://xxx:9000</value>
    </property>
    <property>
        <name>fs.s3a.access.key</name>
        <value>admin</value>
    </property>
    <property>
        <name>fs.s3a.secret.key</name>
        <value>admin</value>
    </property>
</configuration>
zhoujinsong commented 11 months ago

If you plan to use the Internal Catalog to employ Mixed Format on S3, I'm afraid you can only use the code from the master branch and manually build it, as this new feature has already been supported by #2157 on the master branch but not included in version 0.6.x.

BTW, you may need to use s3 protocol directly if you choose S3 storage.

luohaifang commented 10 months ago

After I upgraded to version 0.6.0, this problem no longer occurs. The catalog type used is "Internal Catalog" and the table type is "iceberg"