apache / flink-cdc

Flink CDC is a streaming data integration tool
https://nightlies.apache.org/flink/flink-cdc-docs-stable
Apache License 2.0
5.62k stars 1.9k forks source link

mongocdc 2.4.0 从指定timestamp启动报错Caused by: java.lang.IllegalArgumentException: Unknown keyType of timestamp: 1 #2738

Closed chengyangfeng closed 7 months ago

chengyangfeng commented 10 months ago

Search before asking

Flink version

flink 1.15.2

Flink CDC version

mongocdc 2.4.0

Database and its version

aws documentdb 4.0,非官方mongodb,是aws的文档数据库100%兼容mongodb

Minimal reproduce step

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Configuration configuration = tableEnv.getConfig().getConfiguration(); configuration.setString("pipeline.name", "overseas-mate-mongo-cdc-repair-job"); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000)); env.enableCheckpointing(TimeUnit.MINUTES.toMillis(5)); env.setParallelism(2); CheckpointConfig checkpointConf = env.getCheckpointConfig(); checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); checkpointConf.setMinPauseBetweenCheckpoints(50000L); checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(5)); checkpointConf.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    MongoDBSource<String> mateai = MongoDBSource.<String>builder()
            .hosts(appConf.getMateMongoHost())
            .username(appConf.getMongoCdcUser())
            .password(appConf.getMongoCdcPassword())
            .startupOptions(StartupOptions.timestamp(1700728666367L))
            .databaseList("mateai") // set captured database, support regex
            .deserializer(new MongoRecordDeserializer())
            .build();

    SingleOutputStreamOperator<MongoToPaimonEvent> dataStream = env.fromSource(mateai , WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
            .map(new MongoCdcToPaimonMap());

    dataStream.print("mongo-cdc-data");

What did you expect to see?

从指定时间戳开始消费

What did you see instead?

任务报错 Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: java.lang.IllegalArgumentException: Unknown keyType of timestamp: 1

Anything else?

老师你好,我在使用mongocdc2.4.0从指定timestamp启动消费时,任务抛出异常Caused by: java.lang.IllegalArgumentException: Unknown keyType of timestamp: 1,我查了issue有几个类似的,但是和我的情况还不同。我定位到抛出异常的位置为com/ververica/cdc/connectors/mongodb/source/utils/ResumeTokenUtils.java这个方法中的if (kType != K_TIMESTAMP) 。代码中要求kType =K_TIMESTAMP=130,而我的kType 是1。我把K_TIMESTAMP的值改成1,代码就可以正常运行从指定时间戳捕获数据变更了。 所以我的问题是 1、为什么要判断这个kType =K_TIMESTAMP=130?我没理解这个判断的目的是什么? 2、这个是否属于bug,是否会进行修复?

附上完整的tm log.期待你的答复,谢谢 taskmanager_container_1684236611667_463759_01_000002_log.txt @Jiabao-Sun

Are you willing to submit a PR?

Jiabao-Sun commented 10 months ago

We need to extract the time from Resumetoken. The analysis method can refer to. https://github.com/mongodb-js/mongodb-resumetoken-decoder/blob/main/src/keystringdecoder.ts

Maybe the Resumetoken of AWS documentdb make custom reconstruction. Can you provide a few Resumetoken values?

By the way, use legacy source may bypass this problem this problem.

chengyangfeng commented 9 months ago

@Jiabao-Sun 老师你好,我再com.ververica.cdc.connectors.mongodb.source.utils.ResumeTokenUtils方法中,通过LOG.info(resumeToken.toString())的方式打印了部分resumeToken,附在taskmanager的log中,请查阅。 另外你回复中提到的可以通过历史代码规避这个问题,具体是指什么呢?我查阅git提交记录,没有看到相关的历史代码。 以上,期待你的答复,谢谢。 taskmanager_container_1700130586722_11933_01_000002_log.txt

Jiabao-Sun commented 9 months ago

Thanks @chengyangfeng. We can decoded resume token on https://npm.runkit.com/mongodb-resumetoken-decoder?t=1701165524539. It doesn't look like a legal resumeToken.

image

chengyangfeng commented 9 months ago

@Jiabao-Sun 好的,那应该是aws documentdb的一些兼容性问题了,我再去确认下。另外想了解下,我现在把k_type=K_TIMESTAMP=130的判断给改了,目前可以正常从指定时间戳获取,并且获取出来的数据我看是正常的。这是否代表着我可以使用此种方式进行增量数据同步?感谢您的解答