apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.47k stars 2.43k forks source link

[SUPPORT] Flink streaming read MOR table, thrown Unexpected cdc file split infer case: LOG_FILE Exception #10539

Closed nicholasxu closed 10 months ago

nicholasxu commented 10 months ago

Describe the problem you faced

When using MOR table which stream changelog.enabled, cdc.enabled and read.streaming.enabled by flink sql, got an Unexpected cdc file split infer case: LOG_FILE Exception

To Reproduce

Steps to reproduce the behavior:

1.Set up a Flink sql client and set config as follows

set execution.checkpointing.interval='10s';
set state.checkpoints.dir='hdfs://HDFS-TEST/flink/checkpoints/nick_test_cdc';
set execution.checkpointing.externalized-checkpoint-retention='RETAIN_ON_CANCELLATION';
set execution.checkpointing.timeout=600000;
set state.checkpoints.num-retained=3;
set state.savepoints.dir = 'hdfs://HDFS-TES/flink/savepoints/nick_test_cdc';

2.Create a MySQL CDC table in default catalog:

CREATE TABLE `DBS` (
    `DB_ID` bigint,
    `DESC` varchar(4000),
    `DB_LOCATION_URI` varchar(4000),
    `NAME` varchar(128),
    `OWNER_NAME` varchar(128),
    `OWNER_TYPE` varchar(10),
    `CTLG_NAME` varchar(256),
    PRIMARY KEY (`DB_ID`)  NOT ENFORCED
)  WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '***',
    'port' = '3306',
    'username' = 'root',
    'password' = '***',
    'database-name' = 'hivedb',
    'table-name' = 'DBS'
);

3.Create a HUDI catalog and use it:

CREATE CATALOG hudi_hive_catalog
  WITH (
    'type'='hudi',
    'catalog.path' = 'cosn://bdp-xxx-yyy/user/hive/warehouse', 
    'hive.conf.dir' = '/path_to_hive',
    'mode'='hms',
    'table.external' = 'true', 
    'default-database' = 'hudi_default' 
  );
use CATALOG hudi_hive_catalog;

4.Create a HUDI table and insert data

CREATE TABLE `DBS_TEST_CDC` (
    `DB_ID` bigint,
    `DESC` varchar(4000),
    `DB_LOCATION_URI` varchar(4000),
    `NAME` varchar(128),
    `OWNER_NAME` varchar(128),
    `OWNER_TYPE` varchar(10),
    `CTLG_NAME` varchar(256),
    PRIMARY KEY (`DB_ID`)  NOT ENFORCED
)  WITH (
 'connector' = 'hudi',
 'path' = 'cosn://bdp-xxx-yyy/user/hive/warehouse/hudi_default.db/DBS_TEST_CDC',
 'table.type' = 'MERGE_ON_READ',
 'changelog.enabled' = 'true',
 'cdc.enabled' = 'true',
 'read.streaming.enabled'= 'true',
 'read.streaming.check-interval'= '3',
 'read.start-commit' = 'earliest',
 'compaction.async.enabled' = 'true',
 'compaction.delta_commits' = '3',
 'hive_sync.enable' = 'true', 
 'hive_sync.mode' = 'hms', 
 'hive_sync.metastore.uris' = 'thrift://xxx:9083'
);

insert into hudi_hive_catalog.hudi_default.DBS_TEST_CDC
select * from default_catalog.default_database.DBS

5.Query HUDI table

set sql-client.execution.result-mode = tableau;
select * from hudi_hive_catalog.hudi_default.DBS_TEST_CDC_TARGET;

image

6.Got flink exception

java.lang.AssertionError: Unexpected cdc file split infer case: LOG_FILE
    at org.apache.hudi.table.format.cdc.CdcInputFormat.getRecordIterator(CdcInputFormat.java:190) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
    at org.apache.hudi.table.format.cdc.CdcInputFormat.getRecordIteratorV2(CdcInputFormat.java:150) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
    at org.apache.hudi.table.format.cdc.CdcInputFormat.lambda$initIterator$0(CdcInputFormat.java:104) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
    at org.apache.hudi.table.format.cdc.CdcInputFormat$CdcFileSplitsIterator.hasNext(CdcInputFormat.java:224) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
    at org.apache.hudi.table.format.mor.MergeOnReadInputFormat.reachedEnd(MergeOnReadInputFormat.java:269) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
    at org.apache.hudi.source.StreamReadOperator.consumeAsMiniBatch(StreamReadOperator.java:194) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
    at org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:174) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.17.2.jar:1.17.2]
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist-1.17.2.jar:1.17.2]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) ~[flink-dist-1.17.2.jar:1.17.2]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383) ~[flink-dist-1.17.2.jar:1.17.2]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345) ~[flink-dist-1.17.2.jar:1.17.2]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) ~[flink-dist-1.17.2.jar:1.17.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.2.jar:1.17.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.2.jar:1.17.2]
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.2.jar:1.17.2]
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) [flink-dist-1.17.2.jar:1.17.2]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-dist-1.17.2.jar:1.17.2]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.17.2.jar:1.17.2]
    at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322]
  1. The hudi directory list as follows: image

Expected behavior I wonder if there is an underlying bug?

Environment Description

danny0405 commented 10 months ago

either changelog.enabled or cdc.enabled, only one option is support at a time.

nicholasxu commented 10 months ago

cdc.enabled @danny0405 Thx!, so what's the difference between them? I just follow official example: image

nicholasxu commented 10 months ago

either changelog.enabled or cdc.enabled, only one option is support at a time.

I tried, only changelog.enabled is ok, but only cdc.enabled is not ok.

The latter Includes 2 scenes:

  1. when no baseFile is generated, throws java.lang.IllegalArgumentException: Can not create a Path from an empty string (https://github.com/apache/hudi/issues/10458)
  2. when baseFiles has already generated, still throws AssertionError: Unexpected cdc file split infer case: LOG_FILE
danny0405 commented 10 months ago

The changelog mode is used for upstream data source that is already a changelog stream(has operation insert or delete per-row), so that Hudi can keep these operations as it is and keep good write throughput, the cdc mode is more general for all data source, Hudi itself maintains the changes so it is more costly and only COW table is supported for Flink now.

nicholasxu commented 10 months ago

The changelog mode is used for upstream data source that is already a changelog stream(has operation insert or delete per-row), so that Hudi can keep these operations as it is and keep good write throughput, the cdc mode is more general for all data source, Hudi itself maintains the changes so it is more costly and only COW table is supported for Flink now.

Thx,danny

ad1happy2go commented 10 months ago

@nicholasxu Closing out this issue. Please reopen or create a new one in case of any further queries/issues. Thanks.

nicholasxu commented 10 months ago

@nicholasxu Closing out this issue. Please reopen or create a new one in case of any further queries/issues. Thanks.

ok!