[X] I searched in the issues and found nothing similar.
Flink version
1.17
Flink CDC version
2.4
Database and its version
oracle 19c
Minimal reproduce step
online模式增量抓取数据
并发生成的arichive_log没有加载到logminer里
导致数据丢失
What did you expect to see?
数据不丢失
What did you see instead?
Getting logs to be mined for offset scn 11025749369192
2024-01-04 01:41:09,415 TRACE io.debezium.jdbc.JdbcConnection [] - running 'SELECT MIN(F.MEMBER) AS FILE_NAME, L.FIRST_CHANGE# FIRST_CHANGE, L.NEXT_CHANGE# NEXT_CHANGE, L.ARCHIVED, L.STATUS, 'ONLINE' AS TYPE, L.SEQUENCE# AS SEQ, 'NO' AS DICT_START, 'NO' AS DICT_END, L.THREAD# AS THREAD FROM V$LOGFILE F, V$LOG L LEFT JOIN V$ARCHIVED_LOG A ON A.FIRST_CHANGE# = L.FIRST_CHANGE# AND A.NEXT_CHANGE# = L.NEXT_CHANGE# WHERE (A.STATUS <> 'A' OR A.FIRST_CHANGE# IS NULL) AND F.GROUP# = L.GROUP# GROUP BY F.GROUP#, L.FIRST_CHANGE#, L.NEXT_CHANGE#, L.STATUS, L.ARCHIVED, L.SEQUENCE#, L.THREAD# UNION SELECT A.NAME AS FILE_NAME, A.FIRST_CHANGE# FIRST_CHANGE, A.NEXT_CHANGE# NEXT_CHANGE, 'YES', NULL, 'ARCHIVED', A.SEQUENCE# AS SEQ, A.DICTIONARY_BEGIN, A.DICTIONARY_END, A.THREAD# AS THREAD FROM V$ARCHIVED_LOG A WHERE A.NAME IS NOT NULL AND A.ARCHIVED = 'YES' AND A.STATUS = 'A' AND A.NEXT_CHANGE# > 11025749369192 AND A.DEST_ID IN (SELECT DEST_ID FROM V$ARCHIVE_DEST_STATUS WHERE STATUS='VALID' AND TYPE='LOCAL' AND ROWNUM=1) ORDER BY 7'
2024-01-04 01:41:09,692 TRACE io.debezium.connector.oracle.logminer.LogMinerHelper [] - Online redo log +ARCHDG/xxx/onlinelog/group_2.457.1110367797 with SCN range 11025749415608 to 9295429630892703743 (CURRENT) sequence 47362 to be added.
2024-01-04 01:41:09,693 TRACE io.debezium.connector.oracle.logminer.LogMinerHelper [] - Online redo log +ARCHDG/xxx/onlinelog/group_4.459.1110366137 with SCN range 11025749222427 to 9295429630892703743 (CURRENT) sequence 48030 to be added.
2024-01-04 01:41:09,693 TRACE io.debezium.connector.oracle.logminer.LogMinerHelper [] - Adding log file +ARCHDG/xxx/onlinelog/group_2.457.1110367797 to mining session
2024-01-04 01:41:09,695 TRACE io.debezium.connector.oracle.logminer.LogMinerHelper [] - Adding log file +ARCHDG/xxx/onlinelog/group_4.459.1110366137 to mining session
2024-01-04 01:41:09,697 DEBUG io.debezium.connector.oracle.logminer.LogMinerHelper [] - Last mined SCN: 11025749369192, Log file list to mine: [+ARCHDG/xxx/onlinelog/group_2.457.1110367797, +ARCHDG/xxx/onlinelog/group_4.459.1110366137]
2024-01-04 01:41:09,698 TRACE io.debezium.jdbc.JdbcConnection [] - running 'SELECT F.MEMBER, R.STATUS FROM V$LOGFILE F, V$LOG R WHERE F.GROUP# = R.GROUP# ORDER BY 2'
2024-01-04 01:41:09,713 INFO org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter [] - Sending bulk of 24 actions to Elasticsearch.
2024-01-04 01:41:09,732 TRACE io.debezium.jdbc.JdbcConnection [] - running 'SELECT F.MEMBER FROM V$LOG LOG, V$LOGFILE F WHERE LOG.GROUP#=F.GROUP# AND LOG.STATUS='CURRENT''
2024-01-04 01:41:09,765 TRACE io.debezium.jdbc.JdbcConnection [] - Executing statement BEGIN sys.dbms_logmnr.start_logmnr(startScn => '11025749369193', endScn => '11025749395336', OPTIONS => DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG + DBMS_LOGMNR.NO_ROWID_IN_STMT);END;
Search before asking
Flink version
1.17
Flink CDC version
2.4
Database and its version
oracle 19c
Minimal reproduce step
What did you expect to see?
数据不丢失
What did you see instead?
Getting logs to be mined for offset scn 11025749369192 2024-01-04 01:41:09,415 TRACE io.debezium.jdbc.JdbcConnection [] - running 'SELECT MIN(F.MEMBER) AS FILE_NAME, L.FIRST_CHANGE# FIRST_CHANGE, L.NEXT_CHANGE# NEXT_CHANGE, L.ARCHIVED, L.STATUS, 'ONLINE' AS TYPE, L.SEQUENCE# AS SEQ, 'NO' AS DICT_START, 'NO' AS DICT_END, L.THREAD# AS THREAD FROM V$LOGFILE F, V$LOG L LEFT JOIN V$ARCHIVED_LOG A ON A.FIRST_CHANGE# = L.FIRST_CHANGE# AND A.NEXT_CHANGE# = L.NEXT_CHANGE# WHERE (A.STATUS <> 'A' OR A.FIRST_CHANGE# IS NULL) AND F.GROUP# = L.GROUP# GROUP BY F.GROUP#, L.FIRST_CHANGE#, L.NEXT_CHANGE#, L.STATUS, L.ARCHIVED, L.SEQUENCE#, L.THREAD# UNION SELECT A.NAME AS FILE_NAME, A.FIRST_CHANGE# FIRST_CHANGE, A.NEXT_CHANGE# NEXT_CHANGE, 'YES', NULL, 'ARCHIVED', A.SEQUENCE# AS SEQ, A.DICTIONARY_BEGIN, A.DICTIONARY_END, A.THREAD# AS THREAD FROM V$ARCHIVED_LOG A WHERE A.NAME IS NOT NULL AND A.ARCHIVED = 'YES' AND A.STATUS = 'A' AND A.NEXT_CHANGE# > 11025749369192 AND A.DEST_ID IN (SELECT DEST_ID FROM V$ARCHIVE_DEST_STATUS WHERE STATUS='VALID' AND TYPE='LOCAL' AND ROWNUM=1) ORDER BY 7' 2024-01-04 01:41:09,692 TRACE io.debezium.connector.oracle.logminer.LogMinerHelper [] - Online redo log +ARCHDG/xxx/onlinelog/group_2.457.1110367797 with SCN range 11025749415608 to 9295429630892703743 (CURRENT) sequence 47362 to be added. 2024-01-04 01:41:09,693 TRACE io.debezium.connector.oracle.logminer.LogMinerHelper [] - Online redo log +ARCHDG/xxx/onlinelog/group_4.459.1110366137 with SCN range 11025749222427 to 9295429630892703743 (CURRENT) sequence 48030 to be added. 2024-01-04 01:41:09,693 TRACE io.debezium.connector.oracle.logminer.LogMinerHelper [] - Adding log file +ARCHDG/xxx/onlinelog/group_2.457.1110367797 to mining session 2024-01-04 01:41:09,695 TRACE io.debezium.connector.oracle.logminer.LogMinerHelper [] - Adding log file +ARCHDG/xxx/onlinelog/group_4.459.1110366137 to mining session 2024-01-04 01:41:09,697 DEBUG io.debezium.connector.oracle.logminer.LogMinerHelper [] - Last mined SCN: 11025749369192, Log file list to mine: [+ARCHDG/xxx/onlinelog/group_2.457.1110367797, +ARCHDG/xxx/onlinelog/group_4.459.1110366137] 2024-01-04 01:41:09,698 TRACE io.debezium.jdbc.JdbcConnection [] - running 'SELECT F.MEMBER, R.STATUS FROM V$LOGFILE F, V$LOG R WHERE F.GROUP# = R.GROUP# ORDER BY 2' 2024-01-04 01:41:09,713 INFO org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter [] - Sending bulk of 24 actions to Elasticsearch. 2024-01-04 01:41:09,732 TRACE io.debezium.jdbc.JdbcConnection [] - running 'SELECT F.MEMBER FROM V$LOG LOG, V$LOGFILE F WHERE LOG.GROUP#=F.GROUP# AND LOG.STATUS='CURRENT'' 2024-01-04 01:41:09,765 TRACE io.debezium.jdbc.JdbcConnection [] - Executing statement BEGIN sys.dbms_logmnr.start_logmnr(startScn => '11025749369193', endScn => '11025749395336', OPTIONS => DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG + DBMS_LOGMNR.NO_ROWID_IN_STMT);END;
Anything else?
这个并发是怎么控制的?
Are you willing to submit a PR?