long2ice / meilisync

Realtime sync data from MySQL/PostgreSQL/MongoDB to Meilisearch
https://github.com/long2ice/meilisync
Apache License 2.0
260 stars 40 forks source link

Lost Connection During Query / Attribute Error: `NoneType` object has no attribute `write` #83

Open jmsardoy opened 8 months ago

jmsardoy commented 8 months ago

Hi, I'm encountering difficulties with getting Meilisync to function properly due to a recurring issue of lost connection, accompanied by an error upon attempting to reconnect. I'm utilizing MySQL as the data source for Meilisync.

Here's what I've observed:

I managed to make a workound by moving self.ctl_conn initialization inside the _create_stream function on the mysql source class (i would happily make a PR with this change if necessary):

    async def _create_stream(self):
        self.ctl_conn = await asyncmy.connect(**self.kwargs)
        await self.ctl_conn.connect()
        self.stream = BinLogStream(
            self.conn,
            self.ctl_conn,
            server_id=self.server_id,
            master_log_file=self.progress["master_log_file"],
            master_log_position=int(self.progress["master_log_position"]),
            resume_stream=True,
            blocking=True,
            only_schemas=[self.database],
            only_tables=[f"{self.database}.{table}" for table in self.tables],
            only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent],
        )

    async def __aiter__(self):
        self.conn = await asyncmy.connect(**self.kwargs)
        if not self.progress:
            self.progress = await self.get_current_progress()
        yield ProgressEvent(
            progress=self.progress,
        )
        await self._create_stream()
                while True:
            try:
                async for event in self.stream:
                    ..:
            except OperationalError as e:
                logger.exception(f"Binlog stream error: {e}, sleep 10s and retry...")
                await asyncio.sleep(10)
                try:
                    await self.stream.close()
                    self.ctl_conn.close()
                    await self._create_stream()
                except Exception as e:
                    logger.exception(f"Recreate binlog stream error: {e}")

This adjustment resolves the reconnection issue, yet I aim to discover a method to maintain the connection alive to prevent recurrent OperationalError exception.

Here's the log of the OperationalError

2024-02-06 20:15:59.072 | ERROR    | meilisync.source.mysql:__aiter__:130 - Binlog stream error: (2013, 'Lost connection to MySQL server during query'), sleep 10s and retry...
Traceback (most recent call last):

  File "/usr/local/bin/meilisync", line 6, in <module>
    sys.exit(app())
    │   │    └ <typer.main.Typer object at 0x7f991300ba70>
    │   └ <built-in function exit>
    └ <module 'sys' (built-in)>
  File "/usr/local/lib/python3.12/site-packages/typer/main.py", line 311, in __call__
    return get_command(self)(*args, **kwargs)
           │           │      │       └ {}
           │           │      └ ()
           │           └ <typer.main.Typer object at 0x7f991300ba70>
           └ <function get_command at 0x7f9911e26340>
  File "/usr/local/lib/python3.12/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
           │    │     │       └ {}
           │    │     └ ()
           │    └ <function TyperGroup.main at 0x7f9911e24c20>
           └ <TyperGroup callback>
  File "/usr/local/lib/python3.12/site-packages/typer/core.py", line 778, in main
    return _main(
           └ <function _main at 0x7f9911e1fb00>
  File "/usr/local/lib/python3.12/site-packages/typer/core.py", line 216, in _main
    rv = self.invoke(ctx)
         │    │      └ <click.core.Context object at 0x7f990edf7c20>
         │    └ <function MultiCommand.invoke at 0x7f99122447c0>
         └ <TyperGroup callback>
  File "/usr/local/lib/python3.12/site-packages/click/core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
           │               │       │       │      └ <click.core.Context object at 0x7f990fe15040>
           │               │       │       └ <function Command.invoke at 0x7f9912244180>
           │               │       └ <TyperCommand start>
           │               └ <click.core.Context object at 0x7f990fe15040>
           └ <function MultiCommand.invoke.<locals>._process_result at 0x7f990e544040>
  File "/usr/local/lib/python3.12/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
           │   │      │    │           │   └ {}
           │   │      │    │           └ <click.core.Context object at 0x7f990fe15040>
           │   │      │    └ <function start at 0x7f990e53a020>
           │   │      └ <TyperCommand start>
           │   └ <function Context.invoke at 0x7f9912222ac0>
           └ <click.core.Context object at 0x7f990fe15040>
  File "/usr/local/lib/python3.12/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
                       │       └ {}
                       └ ()
  File "/usr/local/lib/python3.12/site-packages/typer/main.py", line 683, in wrapper
    return callback(**use_params)  # type: ignore
           │          └ {'context': <click.core.Context object at 0x7f990fe15040>}
           └ <function start at 0x7f990e539e40>

  File "/meilisync/meilisync/main.py", line 155, in start
    asyncio.run(run())
    │       │   └ <function start.<locals>.run at 0x7f990e51d3a0>
    │       └ <function run at 0x7f9912dc5e40>
    └ <module 'asyncio' from '/usr/local/lib/python3.12/asyncio/__init__.py'>

  File "/usr/local/lib/python3.12/asyncio/runners.py", line 194, in run
    return runner.run(main)
           │      │   └ <coroutine object start.<locals>.run at 0x7f990e512ea0>
           │      └ <function Runner.run at 0x7f991237bce0>
           └ <asyncio.runners.Runner object at 0x7f990e51a300>
  File "/usr/local/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           │    │     │                  └ <Task pending name='Task-4' coro=<start.<locals>.run() running at /meilisync/meilisync/main.py:153> wait_for=<_GatheringFutur...
           │    │     └ <function BaseEventLoop.run_until_complete at 0x7f9912379940>
           │    └ <_UnixSelectorEventLoop running=True closed=False debug=False>
           └ <asyncio.runners.Runner object at 0x7f990e51a300>
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
    self.run_forever()
    │    └ <function BaseEventLoop.run_forever at 0x7f99123798a0>
    └ <_UnixSelectorEventLoop running=True closed=False debug=False>
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
    self._run_once()
    │    └ <function BaseEventLoop._run_once at 0x7f991237b6a0>
    └ <_UnixSelectorEventLoop running=True closed=False debug=False>
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
    handle._run()
    │      └ <function Handle._run at 0x7f99124d9b20>
    └ <Handle Task.task_wakeup(<Future finished result=None>)>
  File "/usr/local/lib/python3.12/asyncio/events.py", line 84, in _run
    self._context.run(self._callback, *self._args)
    │    │            │    │           │    └ <member '_args' of 'Handle' objects>
    │    │            │    │           └ <Handle Task.task_wakeup(<Future finished result=None>)>
    │    │            │    └ <member '_callback' of 'Handle' objects>
    │    │            └ <Handle Task.task_wakeup(<Future finished result=None>)>
    │    └ <member '_context' of 'Handle' objects>
    └ <Handle Task.task_wakeup(<Future finished result=None>)>

  File "/meilisync/meilisync/main.py", line 102, in _
    async for event in source:
              │        └ <meilisync.source.mysql.MySQL object at 0x7f990e9523c0>
              └ Event(progress={'master_log_file': 'mysql-bin-changelog.492584', 'master_log_position': 3540744}, type=<EventType.update: 'up...

> File "/meilisync/meilisync/source/mysql.py", line 108, in __aiter__
    async for event in self.stream:
              │        │    └ <asyncmy.replication.binlogstream.BinLogStream object at 0x7f990e52e5a0>
              │        └ <meilisync.source.mysql.MySQL object at 0x7f990e9523c0>
              └ <asyncmy.replication.row_events.UpdateRowsEvent object at 0x7f990e56c110>

  File "/usr/local/lib/python3.12/site-packages/asyncmy/replication/binlogstream.py", line 374, in __anext__
    ret = await self._read()
                │    └ <function BinLogStream._read at 0x7f990fe5eac0>
                └ <asyncmy.replication.binlogstream.BinLogStream object at 0x7f990e52e5a0>
  File "/usr/local/lib/python3.12/site-packages/asyncmy/replication/binlogstream.py", line 310, in _read
    await binlog_event.init()
          │            └ <function BinLogPacket.init at 0x7f990fe0bb00>
          └ <asyncmy.replication.packets.BinLogPacket object at 0x7f991021eff0>
  File "/usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py", line 140, in init
    self.event and await self.event.init()
    │    │               │    │     └ <function TableMapEvent.init at 0x7f990fe5d260>
    │    │               │    └ <asyncmy.replication.row_events.TableMapEvent object at 0x7f990e56d6d0>
    │    │               └ <asyncmy.replication.packets.BinLogPacket object at 0x7f991021eff0>
    │    └ <asyncmy.replication.row_events.TableMapEvent object at 0x7f990e56d6d0>
    └ <asyncmy.replication.packets.BinLogPacket object at 0x7f991021eff0>
  File "/usr/local/lib/python3.12/site-packages/asyncmy/replication/row_events.py", line 580, in init
    await self._connection._get_table_information(self.schema, self.table_name)
          │    │           │                      │    │       │    └ 'test_courses'
          │    │           │                      │    │       └ <asyncmy.replication.row_events.TableMapEvent object at 0x7f990e56d6d0>
          │    │           │                      │    └ 'test_schema'
          │    │           │                      └ <asyncmy.replication.row_events.TableMapEvent object at 0x7f990e56d6d0>
          │    │           └ <bound method BinLogStream._get_table_information of <asyncmy.replication.binlogstream.BinLogStream object at 0x7f990e52e5a0>>
          │    └ <asyncmy.connection.Connection object at 0x7f990e52f8c0>
          └ <asyncmy.replication.row_events.TableMapEvent object at 0x7f990e56d6d0>
  File "/usr/local/lib/python3.12/site-packages/asyncmy/replication/binlogstream.py", line 350, in _get_table_information
    await cursor.execute(
          │      └ <cyfunction Cursor.execute at 0x7f990fe07440>
          └ <asyncmy.cursors.DictCursor object at 0x7f990e97d760>
  File "asyncmy/cursors.pyx", line 179, in execute
    result = await self._query(query)
  File "asyncmy/cursors.pyx", line 364, in _query
    await conn.query(q)
  File "asyncmy/connection.pyx", line 494, in query
    await self._read_query_result(unbuffered=unbuffered)
  File "asyncmy/connection.pyx", line 682, in _read_query_result
    await result.read()
  File "asyncmy/connection.pyx", line 1069, in read
    first_packet = await self.connection.read_packet()
  File "asyncmy/connection.pyx", line 623, in read_packet
    raise errors.OperationalError(
          │      └ <class 'asyncmy.errors.OperationalError'>
          └ <module 'asyncmy.errors' from '/usr/local/lib/python3.12/site-packages/asyncmy/errors.cpython-312-x86_64-linux-gnu.so'>

asyncmy.errors.OperationalError: (2013, 'Lost connection to MySQL server during query')

Here's the AttributeError Exception


─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /meilisync/meilisync/main.py:155 in start                                                        │
│                                                                                                  │
│   152 │   │   lock = asyncio.Lock()                                                              │
│   153 │   │   await asyncio.gather(_(), interval())                                              │
│   154 │                                                                                          │
│ ❱ 155 │   asyncio.run(run())                                                                     │
│   156                                                                                            │
│   157                                                                                            │
│   158 @app.command(help="Refresh all data by swap index")                                        │
│                                                                                                  │
│ /usr/local/lib/python3.12/asyncio/runners.py:194 in run                                          │
│                                                                                                  │
│   191 │   │   │   "asyncio.run() cannot be called from a running event loop")                    │
│   192 │                                                                                          │
│   193 │   with Runner(debug=debug, loop_factory=loop_factory) as runner:                         │
│ ❱ 194 │   │   return runner.run(main)                                                            │
│   195                                                                                            │
│   196                                                                                            │
│   197 def _cancel_all_tasks(loop):                                                               │
│                                                                                                  │
│                                                                                                  │
│ /usr/local/lib/python3.12/asyncio/runners.py:118 in run                                          │
│                                                                                                  │
│   115 │   │                                                                                      │
│   116 │   │   self._interrupt_count = 0                                                          │
│   117 │   │   try:                                                                               │
│ ❱ 118 │   │   │   return self._loop.run_until_complete(task)                                     │
│   119 │   │   except exceptions.CancelledError:                                                  │
│   120 │   │   │   if self._interrupt_count > 0:                                                  │
│   121 │   │   │   │   uncancel = getattr(task, "uncancel", None)                                 │
│                                                                                                  │
│                                                                                                  │
│ /usr/local/lib/python3.12/asyncio/base_events.py:684 in run_until_complete                       │
│                                                                                                  │
│    681 │   │   if not future.done():                                                             │
│    682 │   │   │   raise RuntimeError('Event loop stopped before Future completed.')             │
│    683 │   │                                                                                     │
│ ❱  684 │   │   return future.result()                                                            │
│    685 │                                                                                         │
│    686 │   def stop(self):                                                                       │
│    687 │   │   """Stop running the event loop.                                                   │
│                                                                                                  │
│                                                                                                  │
│ /meilisync/meilisync/main.py:153 in run                                                          │
│                                                                                                  │
│   150 │   async def run():                                                                       │
│   151 │   │   nonlocal lock                                                                      │
│   152 │   │   lock = asyncio.Lock()                                                              │
│ ❱ 153 │   │   await asyncio.gather(_(), interval())                                              │
│   154 │                                                                                          │
│   155 │   asyncio.run(run())                                                                     │
│   156                                                                                            │
│                                                                                                  │
│                                                                                                  │
│ /meilisync/meilisync/main.py:102 in _                                                            │
│                                                                                                  │
│    99 │   │   │   │   │   │   f'No data found for table "{settings.source.database}.{sync.tabl   │
│   100 │   │   │   │   │   )                                                                      │
│   101 │   │   logger.info(f'Start increment sync data from "{settings.source.type}" to MeiliSe   │
│ ❱ 102 │   │   async for event in source:                                                         │
│   103 │   │   │   if settings.debug:                                                             │
│   104 │   │   │   │   logger.debug(event)                                                        │
│   105 │   │   │   current_progress = event.progress                                              │
│                                                                                                  │
│ /meilisync/meilisync/source/mysql.py:108 in __aiter__                                            │
│                                                                                                  │
│   105 │   │   await self._create_stream()                                                        │
│   106 │   │   while True:                                                                        │
│   107 │   │   │   try:                                                                           │
│ ❱ 108 │   │   │   │   async for event in self.stream:                                            │
│   109 │   │   │   │   │   self.ctl_conn = await asyncmy.connect(**self.kwargs)                   │
│   110 │   │   │   │   │   if isinstance(event, WriteRowsEvent):                                  │
│   111 │   │   │   │   │   │   event_type = EventType.create                                      │
│                                                                                                  │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/binlogstream.py:374 in __anext__     │
│                                                                                                  │
│   371 │   │   │   await self._connect()                                                          │
│   372 │   │   ret = await self._read()                                                           │
│   373 │   │   while ret is None:                                                                 │
│ ❱ 374 │   │   │   ret = await self._read()                                                       │
│   375 │   │   │   continue                                                                       │
│   376 │   │   return ret                                                                         │
│   377                                                                                            │
│                                                                                                  │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/binlogstream.py:310 in _read         │
│                                                                                                  │
│   307 │   │   │   self._ignored_schemas,                                                         │
│   308 │   │   │   self._freeze_schema,                                                           │
│   309 │   │   )                                                                                  │
│ ❱ 310 │   │   await binlog_event.init()                                                          │
│   311 │   │                                                                                      │
│   312 │   │   if binlog_event.event_type == ROTATE_EVENT:                                        │
│   313 │   │   │   self._master_log_position = binlog_event.event.position                        │
│                                                                                                  │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:140 in init               │
│                                                                                                  │
│   137 │   │   │   self.event = None                                                              │
│   138 │                                                                                          │
│   139 │   async def init(self):                                                                  │
│ ❱ 140 │   │   self.event and await self.event.init()                                             │
│   141 │                                                                                          │
│   142 │   def read(self, size):                                                                  │
│   143 │   │   size = int(size)                                                                   │
│                                                                                                  │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/row_events.py:580 in init            │
│                                                                                                  │
│   577 │   │   │   self.column_schemas = self._table_map[self.table_id].column_schemas            │
│   578 │   │   else:                                                                              │
│   579 │   │   │   self.column_schemas = await (                                                  │
│ ❱ 580 │   │   │   │   await self._connection._get_table_information(self.schema, self.table_na   │
│   581 │   │   │   )                                                                              │
│   582 │   │   ordinal_pos_loc = 0                                                                │
│   583                                                                                            │
│                                                                                                  │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/binlogstream.py:350 in               │
│ _get_table_information                                                                           │
│                                                                                                  │
│   347 │                                                                                          │
│   348 │   async def _get_table_information(self, schema, table):                                 │
│   349 │   │   async with self._ctl_connection.cursor(DictCursor) as cursor:                      │
│ ❱ 350 │   │   │   await cursor.execute(                                                          │
│   351 │   │   │   │   """                                                                        │
│   352 │   │   │   │   │   SELECT                                                                 │
│   353 │   │   │   │   │   │   COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME,                   │
│                                                                                                  │
│                                                                                                  │
│ in execute:179                                                                                   │
│                                                                                                  │
│ in _query:364                                                                                    │
│                                                                                                  │
│ in query:493                                                                                     │
│                                                                                                  │
│ in _execute_command:729                                                                          │
│                                                                                                  │
│ in asyncmy.connection.Connection._write_bytes:668                                                │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
AttributeError: 'NoneType' object has no attribute 'write'