apache / amoro

Apache Amoro (incubating) is a Lakehouse management system built on open data lake formats.
https://amoro.apache.org/
Apache License 2.0
762 stars 267 forks source link

[Bug]: java.lang.IllegalArgumentException: Row arity: 2, but serializer arity: 4 #3003

Open hezhi3f opened 1 week ago

hezhi3f commented 1 week ago

What happened?

CREATE CATALOG amoro_catalog WITH (
  'type' = 'arctic',
  'metastore.url' = 'thrift://ip:1260/amoro_catalog'
);

CREATE TABLE ODS_TB_A (
  PK_ID VARCHAR,
  XMMC VARCHAR,
  DWDM VARCHAR,
  XSSX INTEGER,
  FLAG VARCHAR,
  DWQYMC VARCHAR,
  PRIMARY KEY (PK_ID) NOT ENFORCED
);

CREATE TABLE ODS_TB_B (
  PK_ID VARCHAR,
  DDRBBH VARCHAR,
  PRIMARY KEY (PK_ID) NOT ENFORCED
);

CREATE TABLE ODS_TB_C (
  PK_ID VARCHAR,
  DWDM VARCHAR,
  XMDM INTEGER,
  VAL DOUBLE,
  PRIMARY KEY (PK_ID) NOT ENFORCED
);

INSERT INTO ODS_TB_A (PK_ID, XMMC, DWDM, XSSX, FLAG, DWQYMC) VALUES
('A10001','上海','1001',10,'0','华东'),
('A10002','北京','1002',20,'0','华北'),
('A10003','广州','1003',30,'0','华南');
INSERT INTO ODS_TB_A (PK_ID, XMMC, DWDM, XSSX, FLAG) VALUES
('A10004','华北','AAAA',40,'1'),
('A10005','华东','BBBB',50,'1'),
('A10006','华南','CCCC',60,'1');

INSERT INTO ODS_TB_B (PK_ID, DDRBBH) VALUES
('B10001', '1001-20240110'),
('B10002', '1002-20240110'),
('B10003', '1002-20240110'),
('B10004', '1003-20240612'),
('B10005', '1003-20240612'),
('B10006', '1003-20240612');

INSERT INTO ODS_TB_C (PK_ID,DWDM,XMDM,VAL) VALUES
('C10001','1001',1024,0.4),
('C10001','1001',1025,0.5),
('C10001','1002',1024,0.6),
('C10001','1003',1025,0.7),
('C10001','1003',1024,0.8);

SELECT
  UUID(),
  T2.DWMC AS COL1,
  DWDM AS COL2,
  T3.JHL AS COL3,
  RQ AS COL4
FROM(
  SELECT
    T1.XMMC AS DWMC,
    SUBSTR(T.DDRBBH,1,4) AS DWDM,
    SUBSTR(T.DDRBBH,6,8) AS RQ,
    T1.XSSX
  FROM ODS_TB_B T
  LEFT JOIN ODS_TB_A T1
    ON SUBSTR(T.DDRBBH,1,4) = T1.DWDM
  WHERE T1.FLAG = '1'
  GROUP BY T1.XMMC,SUBSTR(T.DDRBBH,1,4),SUBSTR(T.DDRBBH,6,8),T1.XSSX
) T2
LEFT JOIN(
  SELECT
    T1.DWQYMC AS DWMC,
    SUM(T.VAL) AS JHL
  FROM ODS_TB_C T
  LEFT JOIN ODS_TB_A T1
    ON T.DWDM = T1.DWDM
  WHERE T.XMDM = 1024
  GROUP BY T1.DWQYMC
)T3 ON TRIM(T2.DWMC) = TRIM(T3.DWMC);

I found an issue using MixedIceberg. This SQL statement is fine when executing a subquery separately. However, after executing a join, the following problem occurs: there is an ODS_TB_A in the subquery. If this table is copied again and one of the subqueries uses ODS_TB_A_COPY, this problem will not occur. Whether LogStore is used or not, this problem will occur. Excuse me, have you ever encountered a similar problem? Is there any solution?

There was no error using regular Flink or Amoro Trino

2024-07-04 15:21:12
java.lang.IllegalArgumentException: Row arity: 2, but serializer arity: 4
    at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:124)
    at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309)
    at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
    at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
    at org.apache.amoro.flink.read.hybrid.reader.MixedFormatRecordEmitter.emitRecord(MixedFormatRecordEmitter.java:56)
    at org.apache.amoro.flink.read.hybrid.reader.MixedFormatRecordEmitter.emitRecord(MixedFormatRecordEmitter.java:36)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144)
    at org.apache.amoro.flink.read.hybrid.reader.MixedFormatSourceReader.pollNext(MixedFormatSourceReader.java:125)
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.lang.Thread.run(Thread.java:748)
java.lang.RuntimeException: One or more fetchers have encountered exception
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
    at org.apache.amoro.flink.read.hybrid.reader.MixedFormatSourceReader.pollNext(MixedFormatSourceReader.java:125)
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: org.apache.amoro.shade.org.apache.iceberg.exceptions.ValidationException: Cannot find field 'FLAG' in struct: struct<3: DWDM: optional string, 6: DWQYMC: optional string, 1: PK_ID: required string>
    at org.apache.amoro.shade.org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
    at org.apache.amoro.shade.org.apache.iceberg.expressions.NamedReference.bind(NamedReference.java:45)
    at org.apache.amoro.shade.org.apache.iceberg.expressions.NamedReference.bind(NamedReference.java:26)
    at org.apache.amoro.shade.org.apache.iceberg.expressions.UnboundPredicate.bind(UnboundPredicate.java:111)
    at org.apache.amoro.shade.org.apache.iceberg.expressions.Binder$BindVisitor.predicate(Binder.java:159)
    at org.apache.amoro.shade.org.apache.iceberg.expressions.Binder$BindVisitor.predicate(Binder.java:118)
    at org.apache.amoro.shade.org.apache.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:347)
    at org.apache.amoro.shade.org.apache.iceberg.expressions.Binder.bind(Binder.java:60)
    at org.apache.amoro.shade.org.apache.iceberg.parquet.AdaptHiveParquetMetricsRowGroupFilter.<init>(AdaptHiveParquetMetricsRowGroupFilter.java:67)
    at org.apache.amoro.shade.org.apache.iceberg.parquet.AdaptHiveReadConf.<init>(AdaptHiveReadConf.java:109)
    at org.apache.amoro.shade.org.apache.iceberg.parquet.AdaptHiveParquetReader.init(AdaptHiveParquetReader.java:77)
    at org.apache.amoro.shade.org.apache.iceberg.parquet.AdaptHiveParquetReader.iterator(AdaptHiveParquetReader.java:98)
    at org.apache.amoro.shade.org.apache.iceberg.parquet.AdaptHiveParquetReader.iterator(AdaptHiveParquetReader.java:42)
    at org.apache.amoro.shade.org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:34)
    at org.apache.amoro.shade.org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:72)
    at org.apache.amoro.io.CloseableIterableWrapper.iterator(CloseableIterableWrapper.java:39)
    at org.apache.amoro.io.CloseableIterableWrapper.iterator(CloseableIterableWrapper.java:27)
    at org.apache.amoro.shade.org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:34)
    at org.apache.amoro.shade.org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:72)
    at org.apache.amoro.table.TableMetaStore.call(TableMetaStore.java:234)
    at org.apache.amoro.table.TableMetaStore.lambda$doAs$0(TableMetaStore.java:209)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:360)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
    at org.apache.amoro.table.TableMetaStore.doAs(TableMetaStore.java:209)
    at org.apache.amoro.io.AuthenticatedHadoopFileIO.doAs(AuthenticatedHadoopFileIO.java:202)
    at org.apache.amoro.flink.read.source.FlinkUnkyedDataReader.open(FlinkUnkyedDataReader.java:126)
    at org.apache.amoro.flink.read.source.DataIterator.openTaskIterator(DataIterator.java:154)
    at org.apache.amoro.flink.read.source.DataIterator.updateCurrentIterator(DataIterator.java:144)
    at org.apache.amoro.flink.read.source.DataIterator.seek(DataIterator.java:103)
    at org.apache.amoro.flink.read.hybrid.reader.DataIteratorReaderFunction.apply(DataIteratorReaderFunction.java:54)
    at org.apache.amoro.flink.read.hybrid.reader.DataIteratorReaderFunction.apply(DataIteratorReaderFunction.java:30)
    at org.apache.amoro.flink.read.hybrid.reader.HybridSplitReader.fetch(HybridSplitReader.java:68)
    at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
    ... 6 more

Affects Versions

amoro-0.6.1,Flink 1.17.2

What table formats are you seeing the problem on?

Mixed-Iceberg

What engines are you seeing the problem on?

Flink

How to reproduce

No response

Relevant log output

No response

Anything else

No response

Are you willing to submit a PR?

Code of Conduct