apache / flink-cdc

Flink CDC is a streaming data integration tool
https://nightlies.apache.org/flink/flink-cdc-docs-stable
Apache License 2.0
5.71k stars 1.94k forks source link

lost data when gtid with Multiple transaction id on mgr cluster #3452

Closed zbingwen closed 4 months ago

zbingwen commented 4 months ago

version : flinkcdc 3.0

Merged GTID set is aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984950:220530100-221264502 is Incorrect

log:

2024-07-04 02:36:48,720 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - Starting offset is initialized to {transaction_id=null, ts_sec=0, file=mysql-bin.000501, pos=616624667, kind=SPECIFIC, gtids=aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984892:220530100-221264502, row=0, event=0, server_id=102}
2024-07-04 02:36:48,725 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - Merging server GTID set aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984950:220530100-221264502 with restored GTID set aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984892:220530100-221264502
2024-07-04 02:36:48,727 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - Merged GTID set is aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984950:220530100-221264502
2024-07-04 02:36:48,727 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - MySQL current GTID set aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984950:220530100-221264502 does contain the GTID set aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984950:220530100-221264502 required by the connector.
2024-07-04 02:36:48,738 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - Server has already purged aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219531535:220530100-220605830 GTIDs
2024-07-04 02:36:48,739 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - GTID set  known by the server but not processed yet, for replication are available only GTID set 
2024-07-04 02:36:48,776 INFO  io.debezium.util.Threads                                     [] - Requested thread factory for connector MySqlConnector, id = mysql_binlog_source named = binlog-client
2024-07-04 02:36:48,786 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - GTID set purged on server: aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219531535:220530100-220605830
2024-07-04 02:36:48,787 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Attempting to generate a filtered GTID set
2024-07-04 02:36:48,787 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - GTID set from previous recorded offset: aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984892:220530100-221264502
2024-07-04 02:36:48,787 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - GTID set available on server: aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984951:220530100-221264502
2024-07-04 02:36:48,787 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Using first available positions for new GTID channels
2024-07-04 02:36:48,787 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Relevant GTID set available on server: aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984951:220530100-221264502
2024-07-04 02:36:48,789 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Final merged GTID set to use when connecting to MySQL: aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984951:220530100-221264502
2024-07-04 02:36:48,789 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Registering binlog reader with GTID set: aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-219984951:220530100-221264502
zbingwen commented 4 months ago

restoredIntervalEnd logic is wrong

    public static GtidSet fixRestoredGtidSetOld(GtidSet serverGtidSet, GtidSet restoredGtidSet) {
        Map<String, GtidSet.UUIDSet> newSet = new HashMap<>();
        serverGtidSet.getUUIDSets().forEach(uuidSet -> newSet.put(uuidSet.getUUID(), uuidSet));
        for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) {
            GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID());
            if (serverUuidSet != null) {
                long restoredIntervalEnd = getIntervalEnd(uuidSet);
                List<com.github.shyiko.mysql.binlog.GtidSet.Interval> newIntervals =
                        new ArrayList<>();
                for (GtidSet.Interval serverInterval : serverUuidSet.getIntervals()) {
                    if (serverInterval.getEnd() <= restoredIntervalEnd) {
                        newIntervals.add(
                                new com.github.shyiko.mysql.binlog.GtidSet.Interval(
                                        serverInterval.getStart(), serverInterval.getEnd()));
                    } else if (serverInterval.getStart() <= restoredIntervalEnd
                            && serverInterval.getEnd() > restoredIntervalEnd) {
                        newIntervals.add(
                                new com.github.shyiko.mysql.binlog.GtidSet.Interval(
                                        serverInterval.getStart(), restoredIntervalEnd));
                    }
                }
                newSet.put(
                        uuidSet.getUUID(),
                        new GtidSet.UUIDSet(
                                new com.github.shyiko.mysql.binlog.GtidSet.UUIDSet(
                                        uuidSet.getUUID(), newIntervals)));
            } else {
                newSet.put(uuidSet.getUUID(), uuidSet);
            }
        }
        return new GtidSet(newSet);
    }
zbingwen commented 4 months ago

change to below , test is correct.

    public static GtidSet fixRestoredGtidSet(GtidSet serverGtidSet, GtidSet restoredGtidSet) {
        Map<String, GtidSet.UUIDSet> newSet = new HashMap<>();

        serverGtidSet.getUUIDSets().forEach(uuidSet -> newSet.put(uuidSet.getUUID(), uuidSet));

        for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) {

            GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID());

            if (serverUuidSet != null) {
                List<GtidSet.Interval> restoredIntervals = uuidSet.getIntervals();
                int restoredIntervalsSize = restoredIntervals.size();
                List<GtidSet.Interval> serverIntervals = serverUuidSet.getIntervals();

                List<com.github.shyiko.mysql.binlog.GtidSet.Interval> newIntervals =
                        new ArrayList<>();

                for (int i = 0; i < serverIntervals.size(); i++) {
                    GtidSet.Interval serverInterval = serverIntervals.get(i);
                    if (i < restoredIntervalsSize) {
                        GtidSet.Interval restoredInterval = restoredIntervals.get(i);
                        if (serverInterval.getEnd() <= restoredInterval.getEnd()) {
                            newIntervals.add(
                                    new com.github.shyiko.mysql.binlog.GtidSet.Interval(
                                            serverInterval.getStart(), serverInterval.getEnd()));
                        } else if (serverInterval.getStart() <= restoredInterval.getEnd()
                                && serverInterval.getEnd() > restoredInterval.getEnd()) {
                            newIntervals.add(
                                    new com.github.shyiko.mysql.binlog.GtidSet.Interval(
                                            serverInterval.getStart(), restoredInterval.getEnd()));
                        }
                    } else {
                        newIntervals.add(
                                new com.github.shyiko.mysql.binlog.GtidSet.Interval(
                                        serverInterval.getStart(), serverInterval.getEnd()));
                    }
                }
                newSet.put(
                        uuidSet.getUUID(),
                        new GtidSet.UUIDSet(
                                new com.github.shyiko.mysql.binlog.GtidSet.UUIDSet(
                                        uuidSet.getUUID(), newIntervals)));
            } else {
                newSet.put(uuidSet.getUUID(), uuidSet);
            }
        }
        return new GtidSet(newSet);
    }
leonardBang commented 4 months ago

As required by Apache Flink, please report bugs or new features on Apache Jira under the project Flink using component tag Flink CDC. You must have a JIRA account in order to log cases and issues. If you don’t have an ASF JIRA account, you can request one at the ASF Self-serve portal, account creation requires review by the PMC member of the application project, which normally takes one to two working days to be approved.

你好,缺陷和新功能需要在 Apache Jira 或 Flink 邮件列表(dev@flink.apache.org)中反馈,而不是在这里创建新 issue。 GitHub 上的新 issue 会被忽略且自动关闭。