julien-duponchelle / python-mysql-replication

Pure Python Implementation of MySQL replication protocol build on top of PyMYSQL
2.33k stars 679 forks source link

BinLogStreamReader is not resuming from last position (log_file, log_pos not working). #618

Open zygias opened 5 months ago

zygias commented 5 months ago

Hello, i face issue, that BinLogStreamReader is not resuming from last position. I have hardcoded 1146 position, but process takes 875 position too. It should resume log stream from >1146 position.

    stream = BinLogStreamReader(
        connection_settings=config,
        server_id=1,
        only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent],
        is_mariadb=True,
        blocking=True,
        resume_stream=True,
        # log_file=log_file,
        # log_pos=log_pos,
        log_file="mysqld-bin.000003",
        log_pos=1146,
    )

MariaDB version: 10.3.39 Package version: mysql-replication=="0.45.1" (Works with 10.3.39 MariaDB version)

my.cnf:

[mysqld]
log-bin
binlog-format=ROW
server-id=1
binlog_row_image=FULL

Process logs:

INFO:root:Resuming from mysqld-bin.000003:1146
INFO:root:Starting BinLogStreamReader with log_file=mysqld-bin.000003 and log_pos=1146
DEBUG:root:BinLogStreamReader initialized with log_file=mysqld-bin.000003 and log_pos=1146
INFO:root: -> Insert into mydatabase.example_table: {'id': 1, 'name': 'Alice', 'created_at': datetime.datetime(2024, 6, 28, 0, 34, 17), '_deleted': False}
INFO:root:Saved logs: mysqld-bin.000003:875
INFO:root: -> Insert into mydatabase.example_table: {'id': 2, 'name': 'Alice', 'created_at': datetime.datetime(2024, 6, 28, 0, 56, 39), '_deleted': False}
INFO:root:Saved logs: mysqld-bin.000003:1146

Python code:

import time
# mysql-replication=="0.45.1"
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent
from pymysqlreplication.event import RotateEvent
import logging

# Database connection parameters
config = {
    'host': 'localhost',
    'port': 3307,
    'user': 'root',
    'password': 'rootpassword',
    'database': 'mydatabase'
}

# Setup logging
logging.basicConfig(level=logging.DEBUG)

def save_binlog_position(log_file, log_pos):
    with open("binlog_position.txt", "w") as f:
        f.write(f"{log_file},{log_pos}")
        logging.info(f"Saved logs: {log_file}:{log_pos}")

def load_binlog_position():
    try:
        with open("binlog_position.txt", "r") as f:
            log_file, log_pos = f.read().strip().split(',')
            logging.info(f"Resuming from {log_file}:{log_pos}")
            return log_file, int(log_pos)
    except FileNotFoundError:
        logging.info("Starting from beginning")
        return None, None

def get_values_from_logs(stream):
    for event in stream:
        table_name = event.table
        if isinstance(event, WriteRowsEvent):
            for row in event.rows:
                row['values'].update({"_deleted": False})
                logging.info(f" -> Insert into {event.schema}.{event.table}: {row['values']}")
        elif isinstance(event, UpdateRowsEvent):
            for row in event.rows:
                row['after_values'].update({"_deleted": False})
                logging.info(
                    f" -> Update {event.schema}.{event.table}: {row['before_values']} -> {row['after_values']}")
        elif isinstance(event, DeleteRowsEvent):
            for row in event.rows:
                row['values'].update({"_deleted": True})
                logging.info(f" -> Delete from {event.schema}.{event.table}: {row['values']}")
        log_file, log_pos = stream.log_file, stream.log_pos
        save_binlog_position(log_file, log_pos)

# Function to parse binary log events
def parse_binlog_events():
    log_file, log_pos = load_binlog_position()

    logging.info(f"Starting BinLogStreamReader with log_file={log_file} and log_pos={log_pos}")

    stream = BinLogStreamReader(
        connection_settings=config,
        server_id=1,
        only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent],
        is_mariadb=True,
        blocking=True,
        resume_stream=True,
        # log_file=log_file,
        # log_pos=log_pos,
        log_file="mysqld-bin.000003",
        log_pos=1146,
    )

    logging.debug(f"BinLogStreamReader initialized with log_file={stream.log_file} and log_pos={stream.log_pos}")
    get_values_from_logs(stream)
    stream.close()

if __name__ == "__main__":
    parse_binlog_events()

Docker-compose:

version: '3'

services:
  mariadb:
    image: mariadb:10.3.39
    ports:
      - "3307:3306"
    environment:
      MYSQL_ROOT_PASSWORD: rootpassword
      MYSQL_DATABASE: mydatabase
      MYSQL_USER: myuser
      MYSQL_PASSWORD: mypassword
    volumes:
      - ./mariadb/my.cnf:/etc/mysql/conf.d/my.cnf

Anyone could help? Thank you in advance!

dongwook-chan commented 5 months ago

@zygias Thanks for reporting. Could you please provide provide binary logs or queries so that I could reproduce the issue?