shyiko / mysql-binlog-connector-java

MySQL Binary Log connector
2.21k stars 806 forks source link

Possible to lose events when using keep alive #53

Open wushujames opened 9 years ago

wushujames commented 9 years ago

I think it is possible to lose binlog events when using the keepalive thread that auto-reconnects upon a disconnect. I remember running into this a while back using v0.1.0. I don't have the time to try to reproduce it right now, though, so I thought I would post it and see if you think it is a problem.

BinaryLogClient.java keeps track of the binlogFilename and binlogPosition of the current event, and updates it after each event is processed:

                if (isConnected()) {
                    notifyEventListeners(event);
                    updateClientBinlogFilenameAndPosition(event);
                    updateGtidSet(event);
                }

A InnoDB transaction in row-based binlogs has many events: a QUERY event of "BEGIN" to start it off, a TABLE_MAP, WRITE/DELETE/UPDATE events, etc.

If you get disconnected on one of the events in the middle, the keep alive thread will reconnect you at those binlog coordinates. However, in my testing, I noticed that MariaDB 5.5 would not allow you to connect in the middle of a transaction. It would let you connect but would start at the NEXT transaction that appears after your requested coordinates.

Thus, you would skip over events upon reconnecting. You would lose that entire transaction.

Does this sound correct? Again, sorry that I don't have time to attempt a repro right now.

shyiko commented 9 years ago

Wow. It's definitely worth investigating. Thanks.

cycyyy commented 9 years ago

Has this bug been fixed? : )

shyiko commented 9 years ago

@cycyyy fix hasn't been incorporated into mysql-binlog-connector-java just yet (note that it's MariaDB-specific), BUT one can easily put things right on application level (by rolling binlog position back to the beginning of event group (in this case - transaction) on disconnect), like so:

class BinaryLogClientPosition {
    String binlogFilename; long binlogPosition;
}
final BinaryLogClient binaryLogClient = new BinaryLogClient("username", "password");
final BinaryLogClientPosition groupPosition = new BinaryLogClientPosition();
binaryLogClient.registerEventListener(new BinaryLogClient.EventListener() {

    @Override
    public void onEvent(Event event) {
        EventType eventType = event.getHeader().getEventType();
        if (!EventType.isRowMutation(eventType) && eventType != EventType.TABLE_MAP) {
            groupPosition.binlogFilename = binaryLogClient.getBinlogFilename();
            groupPosition.binlogPosition = binaryLogClient.getBinlogPosition();
        }
    }
});
final BinaryLogClientPosition skipToPosition = new BinaryLogClientPosition();
binaryLogClient.registerLifecycleListener(new BinaryLogClient.AbstractLifecycleListener() {

    @Override
    public void onDisconnect(BinaryLogClient client) {
        skipToPosition.binlogFilename = client.getBinlogFilename();
        skipToPosition.binlogPosition = client.getBinlogPosition();
        client.setBinlogFilename(groupPosition.binlogFilename);
        client.setBinlogPosition(groupPosition.binlogPosition);
    }
});
binaryLogClient.registerEventListener(new BinaryLogClient.EventListener() {
    @Override
    public void onEvent(Event event) {
        if (skipToPosition.binlogFilename != null) {
            if (skipToPosition.binlogPosition != binaryLogClient.getBinlogPosition()) {
                return;
            } else {
                skipToPosition.binlogFilename = null;
            }
        }
        // handle event here
    }
});
binaryLogClient.connect();

NOTE: not tested.

cycyyy commented 9 years ago

Thanks a lot : )

lengmianshi commented 4 years ago

@cycyyy fix hasn't been incorporated into mysql-binlog-connector-java just yet (note that it's MariaDB-specific), BUT one can easily put things right on application level (by rolling binlog position back to the beginning of event group (in this case - transaction) on disconnect), like so:

class BinaryLogClientPosition {
    String binlogFilename; long binlogPosition;
}
final BinaryLogClient binaryLogClient = new BinaryLogClient("username", "password");
final BinaryLogClientPosition groupPosition = new BinaryLogClientPosition();
binaryLogClient.registerEventListener(new BinaryLogClient.EventListener() {

    @Override
    public void onEvent(Event event) {
        EventType eventType = event.getHeader().getEventType();
        if (!EventType.isRowMutation(eventType) && eventType != EventType.TABLE_MAP) {
            groupPosition.binlogFilename = binaryLogClient.getBinlogFilename();
            groupPosition.binlogPosition = binaryLogClient.getBinlogPosition();
        }
    }
});
final BinaryLogClientPosition skipToPosition = new BinaryLogClientPosition();
binaryLogClient.registerLifecycleListener(new BinaryLogClient.AbstractLifecycleListener() {

    @Override
    public void onDisconnect(BinaryLogClient client) {
        skipToPosition.binlogFilename = client.getBinlogFilename();
        skipToPosition.binlogPosition = client.getBinlogPosition();
        client.setBinlogFilename(groupPosition.binlogFilename);
        client.setBinlogPosition(groupPosition.binlogPosition);
    }
});
binaryLogClient.registerEventListener(new BinaryLogClient.EventListener() {
    @Override
    public void onEvent(Event event) {
        if (skipToPosition.binlogFilename != null) {
            if (skipToPosition.binlogPosition != binaryLogClient.getBinlogPosition()) {
                return;
            } else {
                skipToPosition.binlogFilename = null;
            }
        }
        // handle event here
    }
});
binaryLogClient.connect();

NOTE: not tested.

onDisconnect方法在停止进程时没有被调用