apache / doris-flink-connector

Flink Connector for Apache Doris
https://doris.apache.org/
Apache License 2.0
307 stars 217 forks source link

[Bug] Data Visible in Flink UI but Not Reaching Downstream Database #442

Closed HeyChiang closed 1 month ago

HeyChiang commented 1 month ago

Search before asking

Version

flink-1.19.1 flink-doris-connector-1.19-1.6.2.jar flink-cdc-common-3.0.1.jar flink-sql-connector-mysql-cdc-3.0.1.jar flink-sql-connector-oracle-cdc-3.0.1.jar

What's Wrong?

QQ_1721803834101

I'm observing a discrepancy between the data shown in the Flink UI and what's actually being written to the downstream database. The Flink UI shows a significant amount of data being processed and sent (as indicated by the colored blocks representing different tasks), but when I check the downstream database, no records are present. This suggests that while data is being read from the source and processed within Flink, it's not successfully being written to the target database.

What You Expected?

I expected that the data shown as processed in the Flink UI would correspond to actual records written to the downstream database.

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

Code of Conduct

HeyChiang commented 1 month ago

QQ_1721804529446 ` "Process -> (BMS_BATCH_DEF: Writer -> BMS_BATCH_DEF: Committer, BMS_LOT_DEF: Writer -> BMS_LOT_DEF: Committer, BMS_STORER_POS: Writer -> BMS_STORER_POS: Committer, BMS_ST_IO_DOC: Writer -> BMS_ST_IO_DOC: Committer, BMS_ST_IO_DTL: Writer -> BMS_ST_IO_DTL: Committer, BMS_SU_SET_DOC: Writer -> BMS_SU_SET_DOC: Committer, BMS_SU_SET_DTL: Writer -> BMS_SU_SET_DTL: Committer, GPCS_INSIDER: Writer -> GPCS_INSIDER: Committer, GPCS_PLACEPOINT: Writer -> GPCS_PLACEPOINT: Committer, GPCS_PLACEPOINT_CLASS: Writer -> GPCS_PLACEPOINT_CLASS: Committer, GPCS_PLACEPOINT_CLASS_DTL: Writer -> GPCS_PLACEPOINT_CLASS_DTL: Committer, GRESA_SA_DOC: Writer -> GRESA_SA_DOC: Committer, GRESA_SA_DTL: Writer -> GRESA_SA_DTL: Committer, NGPCS_ALL_PRICE: Writer -> NGPCS_ALL_PRICE: Committer, NP_EFILES_OP_FILE: Writer -> NP_EFILES_OP_FILE: Committer, PUB_BRAND: Writer -> PUB_BRAND: Committer, PUB_EMPLOYEE: Writer -> PUB_EMPLOYEE: Committer, PUB_FACTORY: Writer -> PUB_FACTORY: Committer, PUB_GOODS: Writer -> PUB_GOODS: Committer, PUB_GOODS_CLASS: Writer -> PUB_GOODS_CLASS: Committer, PUB_GOODS_CLASS_DTL: Writer -> PUB_GOODS_CLASS_DTL: Committer) (2/3)#0" Id=12810 WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@24273330 at java.base@11.0.22/jdk.internal.misc.Unsafe.park(Native Method)

JasonLeeCoding commented 1 month ago

Is checkpoint enabled for the flink job?

HeyChiang commented 1 month ago

Is checkpoint enabled for the flink job?

Yes. Here is the Flink command I'm using:

bin/flink run \
     -Dexecution.checkpointing.interval=2min \
     -Dexecution.checkpointing.timeout=60min \
     -Dexecution.checkpointing.tolerable-failed-checkpoints=3 \
     -Dparallelism.default=3 \
     -c org.apache.doris.flink.tools.cdc.CdcTools \
     lib/flink-doris-connector-1.19-1.6.2.jar \
     oracle-sync-database \
     --database djwkerp \
     --oracle-conf hostname=192.168.191.210 \
     --oracle-conf port=1521 \
     --oracle-conf username=cdc_sync_user \
     --oracle-conf password="xxxxxxxxxx" \
     --oracle-conf database-name=data \
     --oracle-conf schema-name=DJWKERP \
     --including-tables "NGPCS_ALL_PRICE|GPCS_PLACEPOINT|PUB_EMPLOYEE|BMS_BATCH_DEF|BMS_LOT_DEF|BMS_STORER_POS|GRESA_SA_DOC|GRESA_SA_DTL|GPCS_INSIDER|BMS_ST_IO_DOC|BMS_ST_IO_DTL|GPCS_PLACEPOINT_CLASS|GPCS_PLACEPOINT_CLASS_DTL|PUB_GOODS_CLASS|PUB_GOODS_CLASS_DTL|BMS_SU_SET_DOC|BMS_SU_SET_DTL|PUB_GOODS|PUB_FACTORY|PUB_BRAND|NP_EFILES_OP_FILE" \
     --sink-conf fenodes=192.168.191.93:8030 \
     --sink-conf username=cdc_sync_user \
     --sink-conf password="xxxxxxxxxx" \
     --sink-conf jdbc-url=jdbc:mysql://192.168.191.93:9030?allowPublicKeyRetrieval=true \
     --sink-conf sink.label-prefix=label \
     --table-conf replication_num=3
JasonLeeCoding commented 1 month ago

Have checkpoints been completed successfully?

HeyChiang commented 1 month ago

QQ_1721806383826 FAILED

JNSimba commented 1 month ago

During full synchronization, Oracle will not start writing until it has completed pulling the existing data. You can try increasing these parameters https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/oracle-cdc/#cant-perform-checkpoint-during-scanning-snapshot-of-tables

HeyChiang commented 1 month ago

thanks, bro @JNSimba