apache / amoro

Apache Amoro (incubating) is a Lakehouse management system built on open data lake formats.
https://amoro.apache.org/
Apache License 2.0
744 stars 260 forks source link

[Bug]:[flink1.12]cast map from source to sink error,even the map uses common structure #565

Closed Sugariscool closed 1 year ago

Sugariscool commented 1 year ago

What happened?

I want to transfer kafka‘s data to arctic. The structure of the data is similar to { "a":"123456","b":"123","t":"1667805600","d":{"d0":"1.0","d1":"-6","d2":"0",...,"dn":"197"}}, which contains some strings and one map. I use kafka to be as a source and I want it to be stored in arctic. The sink table is similar to source as follower: create table IF NOT EXISTS Arctic.db.test( p string, e string, t string, d MAP<string, string>, primary key(t) not enforced )with( 'key'=' value' and execute INSERT INTO Arctic.db.test SELECT p,e,t, d FROM kafka_source; when I select data from table test, just like select * from Arctic.db.test; There are errors: com.netease.arctic.flink.read.AdaptHiveFlinkParquetReaders$ReusableMapData cannot be cast to org.apache.flink.table.data.binary.BinaryMapData

Affects Versions

arctic-0.3.2

What engines are you seeing the problem on?

No response

How to reproduce

data structure: { "p":"123456","e":"123","t":"1667805600","d":{"d0":"1.0","d1":"-6","d2":"0",...,"dn":"197"}}

--create kafka source create TABLE kafka_source( p string, e string, t string, d MAP<string,string> )WITH( 'connector' = 'kafka', 'topic' = 'test', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'lakehouse-consumer-sql', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'true', 'json.ignore-parse-errors' = 'false' ); -- open the table dynamic conf HINT SET table.dynamic-table-options.enabled=true; -- have a show select p,e,t,d from kafka_source; --create catalog create CATALOG Arctic WITH ( 'type' = 'arctic', 'metastore.url'='thrift://localhost:1260/arctic_catalog' ); --create sink create DATABASE IF NOT EXISTS Arctic.db; create table IF NOT EXISTS Arctic.db.test( p string, e string, t string, d MAP<string,string>, primary key(t) not enforced )with( 'key'='value' ); --transfer data from source to sink INSERT INTO Arctic.db.test SELECT p,e,t,d FROM kafka_source; --have a look select * from Arctic.db.test;

Relevant log output

java.lang.RuntimeException: One or more fetchers have encountered exception
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199) ~[flink-table_2.12-1.12.7.jar:1.12.7]
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154) ~[flink-table_2.12-1.12.7.jar:1.12.7]
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116) ~[flink-table_2.12-1.12.7.jar:1.12.7]
    at com.netease.arctic.flink.read.hybrid.reader.ArcticSourceReader.pollNext(ArcticSourceReader.java:121) ~[arctic-flink-runtime-1.12-0.3.1.jar:?]
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:275) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) [flink-dist_2.12-1.12.7.jar:1.12.7]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) [flink-dist_2.12-1.12.7.jar:1.12.7]
    at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342-342]
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146) ~[flink-table_2.12-1.12.7.jar:1.12.7]
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101) ~[flink-table_2.12-1.12.7.jar:1.12.7]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_342-342]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_342-342]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342-342]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342-342]
    ... 1 more
Caused by: java.lang.ClassCastException: com.netease.arctic.flink.read.AdaptHiveFlinkParquetReaders$ReusableMapData cannot be cast to org.apache.flink.table.data.binary.BinaryMapData
    at org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:110) ~[flink-table-blink_2.12-1.12.7.jar:1.12.7]
    at org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:45) ~[flink-table-blink_2.12-1.12.7.jar:1.12.7]
    at com.netease.arctic.shaded.org.apache.iceberg.flink.data.RowDataUtil.clone(RowDataUtil.java:95) ~[arctic-flink-runtime-1.12-0.3.1.jar:?]
    at com.netease.arctic.flink.read.hybrid.reader.RowDataRecordFactory.clone(RowDataRecordFactory.java:62) ~[arctic-flink-runtime-1.12-0.3.1.jar:?]
    at com.netease.arctic.flink.read.hybrid.reader.RowDataRecordFactory.clone(RowDataRecordFactory.java:31) ~[arctic-flink-runtime-1.12-0.3.1.jar:?]
    at com.netease.arctic.flink.read.hybrid.reader.ArrayPoolDataIteratorBatcher$ArrayPoolBatchIterator.next(ArrayPoolDataIteratorBatcher.java:103) ~[arctic-flink-runtime-1.12-0.3.1.jar:?]
    at com.netease.arctic.flink.read.hybrid.reader.ArrayPoolDataIteratorBatcher$ArrayPoolBatchIterator.next(ArrayPoolDataIteratorBatcher.java:71) ~[arctic-flink-runtime-1.12-0.3.1.jar:?]
    at com.netease.arctic.flink.read.hybrid.reader.HybridSplitReader.fetch(HybridSplitReader.java:74) ~[arctic-flink-runtime-1.12-0.3.1.jar:?]
    at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56) ~[flink-table_2.12-1.12.7.jar:1.12.7]
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138) ~[flink-table_2.12-1.12.7.jar:1.12.7]
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101) ~[flink-table_2.12-1.12.7.jar:1.12.7]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_342-342]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_342-342]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342-342]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342-342]
    ... 1 more

Anything else

No response

Code of Conduct

Sugariscool commented 1 year ago

and I also want to know: how cast structure MAP<string,string> to structure MAP<int,double> in sql or using JAVA/Scala?

zstraw commented 1 year ago

There is a bug in iceberg flink. Refer to https://issues.apache.org/jira/browse/FLINK-21247 And the solution of community is to fix it in flink. So it's recommended to bump flink to 1.14.5 or 1.15

zstraw commented 1 year ago

and I also want to know: how cast structure MAP<string,string> to structure MAP<int,double> in sql or using JAVA/Scala?

For the specified case, you can cast the data by user defined function. You can also send email to flink user mail(user@flink.apache.org) for more help.