long2ice / meilisync

Realtime sync data from MySQL/PostgreSQL/MongoDB to Meilisearch
https://github.com/long2ice/meilisync
Apache License 2.0
250 stars 37 forks source link

error: required argument is not an integer #4

Open naijoug opened 1 year ago

naijoug commented 1 year ago

run meilisync start, init sync mysql data success. after start increment sync data from "mysql" to MeiliSearch..., crash!

image

long2ice commented 1 year ago

What's your mysql version? Did you enable binlog as row format?

naijoug commented 1 year ago

What's your mysql version? Did you enable binlog as row format?

long2ice commented 1 year ago

confusing...

naijoug commented 1 year ago

Thanks for your reply. Does mysql need to check other config?

long2ice commented 1 year ago

No, looks like a proto error, but I never see it in my local

naijoug commented 1 year ago

Thiis is the binlog detail config.

image

long2ice commented 1 year ago

This is mine:

image
naijoug commented 1 year ago

Ok, thanks, I will check this.

oleynikd commented 6 months ago

Having the same issue

2024-03-01 18:05:36.262 | INFO     | meilisync.main:_:101 - Start increment sync data from "SourceType.mysql" to MeiliSearch...
╭───────────────────── Traceback (most recent call last) ──────────────────────╮
│ /meilisync/meilisync/main.py:140 in start                                    │
│                                                                              │
│   137 │   │   lock = asyncio.Lock()                                          │
│   138 │   │   await asyncio.gather(_(), interval())                          │
│   139 │                                                                      │
│ ❱ 140 │   asyncio.run(run())                                                 │
│   141                                                                        │
│   142                                                                        │
│   143 @app.command(help="Refresh all data by swap index")                    │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │                _ = <function start.<locals>._ at 0x7f4cd1568a40>         │ │
│ │       collection = <meilisync.event.EventCollection object at            │ │
│ │                    0x7f4cd2bdf9e0>                                       │ │
│ │          context = <click.core.Context object at 0x7f4cd2d2d7c0>         │ │
│ │ current_progress = {                                                     │ │
│ │                    │   'master_log_file': 'binlog.022055',               │ │
│ │                    │   'master_log_position': 839805975                  │ │
│ │                    }                                                     │ │
│ │         interval = <function start.<locals>.interval at 0x7f4cd1568e00>  │ │
│ │             lock = <asyncio.locks.Lock object at 0x7f4cd1576090          │ │
│ │                    [unlocked]>                                           │ │
│ │            meili = <meilisync.meili.Meili object at 0x7f4cd2bdf920>      │ │
│ │   meili_settings = MeiliSearch(                                          │ │
│ │                    │   api_url='http://127.0.0.1:7708',                  │ │
│ │                    │   api_key='UYo8!74f3I@UYG87wf8uiy23o*uygffweg',     │ │
│ │                    │   insert_size=1000,                                 │ │
│ │                    │   insert_interval=10                                │ │
│ │                    )                                                     │ │
│ │         progress = <meilisync.progress.file.File object at               │ │
│ │                    0x7f4cd5b16900>                                       │ │
│ │              run = <function start.<locals>.run at 0x7f4cd156a340>       │ │
│ │         settings = Settings(                                             │ │
│ │                    │   plugins=[],                                       │ │
│ │                    │   progress=Progress(                                │ │
│ │                    │   │   type=<ProgressType.file: 'file'>              │ │
│ │                    │   ),                                                │ │
│ │                    │   debug=False,                                      │ │
│ │                    │   source=Source(                                    │ │
│ │                    │   │   type=<SourceType.mysql: 'mysql'>,             │ │
│ │                    │   │   database='inextapi',                          │ │
│ │                    │   │   host='127.0.0.1',                             │ │
│ │                    │   │   port=3306,                                    │ │
│ │                    │   │   user='root',                                  │ │
│ │                    │   │   password='NfvljiJOp9erghpu9HJLBIFIPURwgbpu3'  │ │
│ │                    │   ),                                                │ │
│ │                    │   meilisearch=MeiliSearch(                          │ │
│ │                    │   │   api_url='http://127.0.0.1:7708',              │ │
│ │                    │   │   api_key='UYo8!74f3I@UYG87wf8uiy23o*uygffweg', │ │
│ │                    │   │   insert_size=1000,                             │ │
│ │                    │   │   insert_interval=10                            │ │
│ │                    │   ),                                                │ │
│ │                    │   sync=[                                            │ │
│ │                    │   │   Sync(                                         │ │
│ │                    │   │   │   plugins=[],                               │ │
│ │                    │   │   │   table='macs',                             │ │
│ │                    │   │   │   pk='id',                                  │ │
│ │                    │   │   │   full=True,                                │ │
│ │                    │   │   │   index=None,                               │ │
│ │                    │   │   │   fields={                                  │ │
│ │                    │   │   │   │   'id': None,                           │ │
│ │                    │   │   │   │   'mac': None,                          │ │
│ │                    │   │   │   │   'soc_serial': None,                   │ │
│ │                    │   │   │   │   'sn': None,                           │ │
│ │                    │   │   │   │   'last_ip': None,                      │ │
│ │                    │   │   │   │   'isp_id': None                        │ │
│ │                    │   │   │   }                                         │ │
│ │                    │   │   )                                             │ │
│ │                    │   ],                                                │ │
│ │                    │   sentry=None                                       │ │
│ │                    )                                                     │ │
│ │           source = <meilisync.source.mysql.MySQL object at               │ │
│ │                    0x7f4cd2157da0>                                       │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/local/lib/python3.12/asyncio/runners.py:194 in run                      │
│                                                                              │
│   191 │   │   │   "asyncio.run() cannot be called from a running event loop" │
│   192 │                                                                      │
│   193 │   with Runner(debug=debug, loop_factory=loop_factory) as runner:     │
│ ❱ 194 │   │   return runner.run(main)                                        │
│   195                                                                        │
│   196                                                                        │
│   197 def _cancel_all_tasks(loop):                                           │
│                                                                              │
│ ╭──────────────────────────────── locals ────────────────────────────────╮   │
│ │        debug = None                                                    │   │
│ │ loop_factory = None                                                    │   │
│ │         main = <coroutine object start.<locals>.run at 0x7f4cd152f5a0> │   │
│ │       runner = <asyncio.runners.Runner object at 0x7f4cd2bdf950>       │   │
│ ╰────────────────────────────────────────────────────────────────────────╯   │
│                                                                              │
│ /usr/local/lib/python3.12/asyncio/runners.py:118 in run                      │
│                                                                              │
│   115 │   │                                                                  │
│   116 │   │   self._interrupt_count = 0                                      │
│   117 │   │   try:                                                           │
│ ❱ 118 │   │   │   return self._loop.run_until_complete(task)                 │
│   119 │   │   except exceptions.CancelledError:                              │
│   120 │   │   │   if self._interrupt_count > 0:                              │
│   121 │   │   │   │   uncancel = getattr(task, "uncancel", None)             │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │        context = <_contextvars.Context object at 0x7f4cd1595f00>         │ │
│ │           coro = <coroutine object start.<locals>.run at 0x7f4cd152f5a0> │ │
│ │           self = <asyncio.runners.Runner object at 0x7f4cd2bdf950>       │ │
│ │ sigint_handler = functools.partial(<bound method Runner._on_sigint of    │ │
│ │                  <asyncio.runners.Runner object at 0x7f4cd2bdf950>>,     │ │
│ │                  main_task=<Task finished name='Task-4'                  │ │
│ │                  coro=<start.<locals>.run() done, defined at             │ │
│ │                  /meilisync/meilisync/main.py:135>                       │ │
│ │                  exception=error('required argument is not an            │ │
│ │                  integer')>)                                             │ │
│ │           task = <Task finished name='Task-4' coro=<start.<locals>.run() │ │
│ │                  done, defined at /meilisync/meilisync/main.py:135>      │ │
│ │                  exception=error('required argument is not an integer')> │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/local/lib/python3.12/asyncio/base_events.py:685 in run_until_complete   │
│                                                                              │
│    682 │   │   if not future.done():                                         │
│    683 │   │   │   raise RuntimeError('Event loop stopped before Future comp │
│    684 │   │                                                                 │
│ ❱  685 │   │   return future.result()                                        │
│    686 │                                                                     │
│    687 │   def stop(self):                                                   │
│    688 │   │   """Stop running the event loop.                               │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │   future = <Task finished name='Task-4' coro=<start.<locals>.run() done, │ │
│ │            defined at /meilisync/meilisync/main.py:135>                  │ │
│ │            exception=error('required argument is not an integer')>       │ │
│ │ new_task = False                                                         │ │
│ │     self = <_UnixSelectorEventLoop running=False closed=True             │ │
│ │            debug=False>                                                  │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /meilisync/meilisync/main.py:138 in run                                      │
│                                                                              │
│   135 │   async def run():                                                   │
│   136 │   │   nonlocal lock                                                  │
│   137 │   │   lock = asyncio.Lock()                                          │
│ ❱ 138 │   │   await asyncio.gather(_(), interval())                          │
│   139 │                                                                      │
│   140 │   asyncio.run(run())                                                 │
│   141                                                                        │
│                                                                              │
│ ╭────────────────────────────── locals ───────────────────────────────╮      │
│ │        _ = <function start.<locals>._ at 0x7f4cd1568a40>            │      │
│ │ interval = <function start.<locals>.interval at 0x7f4cd1568e00>     │      │
│ │     lock = <asyncio.locks.Lock object at 0x7f4cd1576090 [unlocked]> │      │
│ ╰─────────────────────────────────────────────────────────────────────╯      │
│                                                                              │
│ /meilisync/meilisync/main.py:102 in _                                        │
│                                                                              │
│    99 │   │   │   │   │   │   f'No data found for table "{settings.source.da │
│   100 │   │   │   │   │   )                                                  │
│   101 │   │   logger.info(f'Start increment sync data from "{settings.source │
│ ❱ 102 │   │   async for event in source:                                     │
│   103 │   │   │   if settings.debug:                                         │
│   104 │   │   │   │   logger.debug(event)                                    │
│   105 │   │   │   current_progress = event.progress                          │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │       collection = <meilisync.event.EventCollection object at            │ │
│ │                    0x7f4cd2bdf9e0>                                       │ │
│ │ current_progress = {                                                     │ │
│ │                    │   'master_log_file': 'binlog.022055',               │ │
│ │                    │   'master_log_position': 839805975                  │ │
│ │                    }                                                     │ │
│ │            event = ProgressEvent(                                        │ │
│ │                    │   progress={                                        │ │
│ │                    │   │   'master_log_file': 'binlog.022055',           │ │
│ │                    │   │   'master_log_position': 839805975              │ │
│ │                    │   }                                                 │ │
│ │                    )                                                     │ │
│ │             lock = <asyncio.locks.Lock object at 0x7f4cd1576090          │ │
│ │                    [unlocked]>                                           │ │
│ │            meili = <meilisync.meili.Meili object at 0x7f4cd2bdf920>      │ │
│ │   meili_settings = MeiliSearch(                                          │ │
│ │                    │   api_url='http://127.0.0.1:7708',                  │ │
│ │                    │   api_key='UYo8!74f3I@UYG87wf8uiy23o*uygffweg',     │ │
│ │                    │   insert_size=1000,                                 │ │
│ │                    │   insert_interval=10                                │ │
│ │                    )                                                     │ │
│ │         progress = <meilisync.progress.file.File object at               │ │
│ │                    0x7f4cd5b16900>                                       │ │
│ │         settings = Settings(                                             │ │
│ │                    │   plugins=[],                                       │ │
│ │                    │   progress=Progress(                                │ │
│ │                    │   │   type=<ProgressType.file: 'file'>              │ │
│ │                    │   ),                                                │ │
│ │                    │   debug=False,                                      │ │
│ │                    │   source=Source(                                    │ │
│ │                    │   │   type=<SourceType.mysql: 'mysql'>,             │ │
│ │                    │   │   database='inextapi',                          │ │
│ │                    │   │   host='127.0.0.1',                             │ │
│ │                    │   │   port=3306,                                    │ │
│ │                    │   │   user='root',                                  │ │
│ │                    │   │   password='NfvljiJOp9erghpu9HJLBIFIPURwgbpu3'  │ │
│ │                    │   ),                                                │ │
│ │                    │   meilisearch=MeiliSearch(                          │ │
│ │                    │   │   api_url='http://127.0.0.1:7708',              │ │
│ │                    │   │   api_key='UYo8!74f3I@UYG87wf8uiy23o*uygffweg', │ │
│ │                    │   │   insert_size=1000,                             │ │
│ │                    │   │   insert_interval=10                            │ │
│ │                    │   ),                                                │ │
│ │                    │   sync=[                                            │ │
│ │                    │   │   Sync(                                         │ │
│ │                    │   │   │   plugins=[],                               │ │
│ │                    │   │   │   table='macs',                             │ │
│ │                    │   │   │   pk='id',                                  │ │
│ │                    │   │   │   full=True,                                │ │
│ │                    │   │   │   index=None,                               │ │
│ │                    │   │   │   fields={                                  │ │
│ │                    │   │   │   │   'id': None,                           │ │
│ │                    │   │   │   │   'mac': None,                          │ │
│ │                    │   │   │   │   'soc_serial': None,                   │ │
│ │                    │   │   │   │   'sn': None,                           │ │
│ │                    │   │   │   │   'last_ip': None,                      │ │
│ │                    │   │   │   │   'isp_id': None                        │ │
│ │                    │   │   │   }                                         │ │
│ │                    │   │   )                                             │ │
│ │                    │   ],                                                │ │
│ │                    │   sentry=None                                       │ │
│ │                    )                                                     │ │
│ │           source = <meilisync.source.mysql.MySQL object at               │ │
│ │                    0x7f4cd2157da0>                                       │ │
│ │             sync = Sync(                                                 │ │
│ │                    │   plugins=[],                                       │ │
│ │                    │   table='macs',                                     │ │
│ │                    │   pk='id',                                          │ │
│ │                    │   full=True,                                        │ │
│ │                    │   index=None,                                       │ │
│ │                    │   fields={                                          │ │
│ │                    │   │   'id': None,                                   │ │
│ │                    │   │   'mac': None,                                  │ │
│ │                    │   │   'soc_serial': None,                           │ │
│ │                    │   │   'sn': None,                                   │ │
│ │                    │   │   'last_ip': None,                              │ │
│ │                    │   │   'isp_id': None                                │ │
│ │                    │   }                                                 │ │
│ │                    )                                                     │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /meilisync/meilisync/source/mysql.py:113 in __aiter__                        │
│                                                                              │
│   110 │   │   │   │   │   │   data = event.rows[0]["values"]                 │
│   111 │   │   │   │   │   elif isinstance(event, UpdateRowsEvent):           │
│   112 │   │   │   │   │   │   event_type = EventType.update                  │
│ ❱ 113 │   │   │   │   │   │   data = event.rows[0]["after_values"]           │
│   114 │   │   │   │   │   elif isinstance(event, DeleteRowsEvent):           │
│   115 │   │   │   │   │   │   event_type = EventType.delete                  │
│   116 │   │   │   │   │   │   data = event.rows[0]["values"]                 │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │      event = <asyncmy.replication.row_events.UpdateRowsEvent object at   │ │
│ │              0x7f4cd115e0c0>                                             │ │
│ │ event_type = <EventType.update: 'update'>                                │ │
│ │       self = <meilisync.source.mysql.MySQL object at 0x7f4cd2157da0>     │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/row_events.py:45 │
│ 3 in rows                                                                    │
│                                                                              │
│   450 │   @property                                                          │
│   451 │   def rows(self):                                                    │
│   452 │   │   if self._rows is None:                                         │
│ ❱ 453 │   │   │   self._fetch_rows()                                         │
│   454 │   │   return self._rows                                              │
│   455                                                                        │
│   456                                                                        │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ self = <asyncmy.replication.row_events.UpdateRowsEvent object at         │ │
│ │        0x7f4cd115e0c0>                                                   │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/row_events.py:44 │
│ 8 in _fetch_rows                                                             │
│                                                                              │
│   445 │   │   │   return                                                     │
│   446 │   │                                                                  │
│   447 │   │   while self.packet.read_bytes < self.event_size:                │
│ ❱ 448 │   │   │   self._rows.append(self._fetch_one_row())                   │
│   449 │                                                                      │
│   450 │   @property                                                          │
│   451 │   def rows(self):                                                    │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ self = <asyncmy.replication.row_events.UpdateRowsEvent object at         │ │
│ │        0x7f4cd115e0c0>                                                   │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/row_events.py:51 │
│ 4 in _fetch_one_row                                                          │
│                                                                              │
│   511 │                                                                      │
│   512 │   def _fetch_one_row(self):                                          │
│   513 │   │   row = {                                                        │
│ ❱ 514 │   │   │   "before_values": self._read_column_data(self.columns_prese │
│   515 │   │   │   "after_values": self._read_column_data(self.columns_presen │
│   516 │   │   }                                                              │
│   517                                                                        │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ self = <asyncmy.replication.row_events.UpdateRowsEvent object at         │ │
│ │        0x7f4cd115e0c0>                                                   │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/row_events.py:20 │
│ 9 in _read_column_data                                                       │
│                                                                              │
│   206 │   │   │   elif column.type == GEOMETRY:                              │
│   207 │   │   │   │   values[name] = self.packet.read_length_coded_pascal_st │
│   208 │   │   │   elif column.type == JSON:                                  │
│ ❱ 209 │   │   │   │   values[name] = self.packet.read_binary_json(column.len │
│   210 │   │   │   else:                                                      │
│   211 │   │   │   │   raise NotImplementedError("Unknown MySQL column type:  │
│   212                                                                        │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │       cols_bitmap = b'\xff\xff\xff'                                      │ │
│ │            column = <asyncmy.replication.column.Column object at         │ │
│ │                     0x7f4cd115dfa0>                                      │ │
│ │                 i = 20                                                   │ │
│ │              name = 'status'                                             │ │
│ │        nb_columns = 24                                                   │ │
│ │       null_bitmap = b'\x04\x00 '                                         │ │
│ │ null_bitmap_index = 20                                                   │ │
│ │              self = <asyncmy.replication.row_events.UpdateRowsEvent      │ │
│ │                     object at 0x7f4cd115e0c0>                            │ │
│ │          unsigned = False                                                │ │
│ │            values = {                                                    │ │
│ │                     │   'id': 402723,                                    │ │
│ │                     │   'mac': 'FC:A4:7A:AC:46:64',                      │ │
│ │                     │   'old_mac': None,                                 │ │
│ │                     │   'sn': 'ITV521801479',                            │ │
│ │                     │   'soc_serial': 'ac00141568840761b0e',             │ │
│ │                     │   'push_client_id':                                │ │
│ │                     'F5708619141EEBAC0B0AE0C5D70433BF',                  │ │
│ │                     │   'device_id': 300,                                │ │
│ │                     │   'isp_id': 59,                                    │ │
│ │                     │   'isp_net_locked': 0,                             │ │
│ │                     │   'blocked': 0,                                    │ │
│ │                     │   ... +10                                          │ │
│ │                     }                                                    │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:344   │
│ in read_binary_json                                                          │
│                                                                              │
│   341 │   │   self.unread(payload)                                           │
│   342 │   │   t = self.read_uint8()                                          │
│   343 │   │                                                                  │
│ ❱ 344 │   │   return self.read_binary_json_type(t, length)                   │
│   345 │                                                                      │
│   346 │   def read_binary_json_type(self, t, length):                        │
│   347 │   │   large = t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY) │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │  length = 13008                                                          │ │
│ │ payload = b'\x00\x02\x00\xcf2\x12\x00\x06\x00\x18\x00\x06\x00\x00\x1e\x… │ │
│ │           \x00\x03\x00#\x00\x03\x00&\x00\x03\x00)\x00\x03\x00\x0c,\x00\… │ │
│ │    self = <asyncmy.replication.packets.BinLogPacket object at            │ │
│ │           0x7f4cd115e060>                                                │ │
│ │    size = 4                                                              │ │
│ │       t = 0                                                              │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:349   │
│ in read_binary_json_type                                                     │
│                                                                              │
│   346 │   def read_binary_json_type(self, t, length):                        │
│   347 │   │   large = t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY) │
│   348 │   │   if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT):    │
│ ❱ 349 │   │   │   return self.read_binary_json_object(length - 1, large)     │
│   350 │   │   elif t in (JSONB_TYPE_SMALL_ARRAY, JSONB_TYPE_LARGE_ARRAY):    │
│   351 │   │   │   return self.read_binary_json_array(length - 1, large)      │
│   352 │   │   elif t in (JSONB_TYPE_STRING,):                                │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │  large = False                                                           │ │
│ │ length = 13008                                                           │ │
│ │   self = <asyncmy.replication.packets.BinLogPacket object at             │ │
│ │          0x7f4cd115e060>                                                 │ │
│ │      t = 0                                                               │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:439   │
│ in read_binary_json_object                                                   │
│                                                                              │
│   436 │   │   │   │   data = value_type_inlined_lengths[i][2]                │
│   437 │   │   │   else:                                                      │
│   438 │   │   │   │   t = value_type_inlined_lengths[i][0]                   │
│ ❱ 439 │   │   │   │   data = self.read_binary_json_type(t, length)           │
│   440 │   │   │   out[keys[i]] = data                                        │
│   441 │   │                                                                  │
│   442 │   │   return out                                                     │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │                   elements = 2                                           │ │
│ │                          i = 0                                           │ │
│ │         key_offset_lengths = [(18, 6), (24, 6)]                          │ │
│ │                       keys = [b'health', b'status']                      │ │
│ │                      large = False                                       │ │
│ │                     length = 13007                                       │ │
│ │                        out = {}                                          │ │
│ │                       self = <asyncmy.replication.packets.BinLogPacket   │ │
│ │                              object at 0x7f4cd115e060>                   │ │
│ │                       size = 13007                                       │ │
│ │                          t = 0                                           │ │
│ │ value_type_inlined_lengths = [(0, 30, None), (0, 120, None)]             │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:349   │
│ in read_binary_json_type                                                     │
│                                                                              │
│   346 │   def read_binary_json_type(self, t, length):                        │
│   347 │   │   large = t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY) │
│   348 │   │   if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT):    │
│ ❱ 349 │   │   │   return self.read_binary_json_object(length - 1, large)     │
│   350 │   │   elif t in (JSONB_TYPE_SMALL_ARRAY, JSONB_TYPE_LARGE_ARRAY):    │
│   351 │   │   │   return self.read_binary_json_array(length - 1, large)      │
│   352 │   │   elif t in (JSONB_TYPE_STRING,):                                │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │  large = False                                                           │ │
│ │ length = 13007                                                           │ │
│ │   self = <asyncmy.replication.packets.BinLogPacket object at             │ │
│ │          0x7f4cd115e060>                                                 │ │
│ │      t = 0                                                               │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:439   │
│ in read_binary_json_object                                                   │
│                                                                              │
│   436 │   │   │   │   data = value_type_inlined_lengths[i][2]                │
│   437 │   │   │   else:                                                      │
│   438 │   │   │   │   t = value_type_inlined_lengths[i][0]                   │
│ ❱ 439 │   │   │   │   data = self.read_binary_json_type(t, length)           │
│   440 │   │   │   out[keys[i]] = data                                        │
│   441 │   │                                                                  │
│   442 │   │   return out                                                     │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │                   elements = 1                                           │ │
│ │                          i = 0                                           │ │
│ │         key_offset_lengths = [(11, 11)]                                  │ │
│ │                       keys = [b'temperature']                            │ │
│ │                      large = False                                       │ │
│ │                     length = 13006                                       │ │
│ │                        out = {}                                          │ │
│ │                       self = <asyncmy.replication.packets.BinLogPacket   │ │
│ │                              object at 0x7f4cd115e060>                   │ │
│ │                       size = 90                                          │ │
│ │                          t = 0                                           │ │
│ │ value_type_inlined_lengths = [(0, 22, None)]                             │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:349   │
│ in read_binary_json_type                                                     │
│                                                                              │
│   346 │   def read_binary_json_type(self, t, length):                        │
│   347 │   │   large = t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY) │
│   348 │   │   if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT):    │
│ ❱ 349 │   │   │   return self.read_binary_json_object(length - 1, large)     │
│   350 │   │   elif t in (JSONB_TYPE_SMALL_ARRAY, JSONB_TYPE_LARGE_ARRAY):    │
│   351 │   │   │   return self.read_binary_json_array(length - 1, large)      │
│   352 │   │   elif t in (JSONB_TYPE_STRING,):                                │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │  large = False                                                           │ │
│ │ length = 13006                                                           │ │
│ │   self = <asyncmy.replication.packets.BinLogPacket object at             │ │
│ │          0x7f4cd115e060>                                                 │ │
│ │      t = 0                                                               │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:439   │
│ in read_binary_json_object                                                   │
│                                                                              │
│   436 │   │   │   │   data = value_type_inlined_lengths[i][2]                │
│   437 │   │   │   else:                                                      │
│   438 │   │   │   │   t = value_type_inlined_lengths[i][0]                   │
│ ❱ 439 │   │   │   │   data = self.read_binary_json_type(t, length)           │
│   440 │   │   │   out[keys[i]] = data                                        │
│   441 │   │                                                                  │
│   442 │   │   return out                                                     │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │                   elements = 4                                           │ │
│ │                          i = 0                                           │ │
│ │         key_offset_lengths = [(32, 3), (35, 3), (38, 3), (41, 3)]        │ │
│ │                       keys = [b'cpu', b'ddr', b'gpu', b'vee']            │ │
│ │                      large = False                                       │ │
│ │                     length = 13005                                       │ │
│ │                        out = {}                                          │ │
│ │                       self = <asyncmy.replication.packets.BinLogPacket   │ │
│ │                              object at 0x7f4cd115e060>                   │ │
│ │                       size = 68                                          │ │
│ │                          t = 12                                          │ │
│ │ value_type_inlined_lengths = [                                           │ │
│ │                              │   (12, 44, None),                         │ │
│ │                              │   (12, 50, None),                         │ │
│ │                              │   (12, 56, None),                         │ │
│ │                              │   (12, 62, None)                          │ │
│ │                              ]                                           │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:353   │
│ in read_binary_json_type                                                     │
│                                                                              │
│   350 │   │   elif t in (JSONB_TYPE_SMALL_ARRAY, JSONB_TYPE_LARGE_ARRAY):    │
│   351 │   │   │   return self.read_binary_json_array(length - 1, large)      │
│   352 │   │   elif t in (JSONB_TYPE_STRING,):                                │
│ ❱ 353 │   │   │   return self.read_variable_length_string()                  │
│   354 │   │   elif t in (JSONB_TYPE_LITERAL,):                               │
│   355 │   │   │   value = self.read_uint8()                                  │
│   356 │   │   │   if value == JSONB_LITERAL_NULL:                            │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │  large = False                                                           │ │
│ │ length = 13005                                                           │ │
│ │   self = <asyncmy.replication.packets.BinLogPacket object at             │ │
│ │          0x7f4cd115e060>                                                 │ │
│ │      t = 12                                                              │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│                                                                              │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:254   │
│ in read_variable_length_string                                               │
│                                                                              │
│   251 │   │   length = 0                                                     │
│   252 │   │   bits_read = 0                                                  │
│   253 │   │   while byte & 0x80 != 0:                                        │
│ ❱ 254 │   │   │   byte = struct.pack("!B", self.read(1))                     │
│   255 │   │   │   length = length | ((byte & 0x7F) << bits_read)             │
│   256 │   │   │   bits_read = bits_read + 7                                  │
│   257 │   │   return self.read(length)                                       │
│                                                                              │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ bits_read = 0                                                            │ │
│ │      byte = 128                                                          │ │
│ │    length = 0                                                            │ │
│ │      self = <asyncmy.replication.packets.BinLogPacket object at          │ │
│ │             0x7f4cd115e060>                                              │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────────────────────────────────────────────────────────╯
`SHOW VARIABLES LIKE "%bin%";`
 ──────────────────────────────────────────────────────────────────────────────
| bind_address                                   | *                           |
| binlog_cache_size                              | 32768                       |
| binlog_checksum                                | CRC32                       |
| binlog_direct_non_transactional_updates        | OFF                         |
| binlog_encryption                              | OFF                         |
| binlog_error_action                            | ABORT_SERVER                |
| binlog_expire_logs_auto_purge                  | ON                          |
| binlog_expire_logs_seconds                     | 86400                       |
| binlog_format                                  | ROW                         |
| binlog_group_commit_sync_delay                 | 0                           |
| binlog_group_commit_sync_no_delay_count        | 0                           |
| binlog_gtid_simple_recovery                    | ON                          |
| binlog_max_flush_queue_time                    | 0                           |
| binlog_order_commits                           | ON                          |
| binlog_rotate_encryption_master_key_at_startup | OFF                         |
| binlog_row_event_max_size                      | 8192                        |
| binlog_row_image                               | FULL                        |
| binlog_row_metadata                            | MINIMAL                     |
| binlog_row_value_options                       |                             |
| binlog_rows_query_log_events                   | OFF                         |
| binlog_stmt_cache_size                         | 32768                       |
| binlog_transaction_compression                 | OFF                         |
| binlog_transaction_compression_level_zstd      | 3                           |
| binlog_transaction_dependency_history_size     | 25000                       |
| binlog_transaction_dependency_tracking         | WRITESET                    |
| innodb_api_enable_binlog                       | OFF                         |
| log_bin                                        | ON                          |
| log_bin_basename                               | /var/lib/mysql/binlog       |
| log_bin_index                                  | /var/lib/mysql/binlog.index |
| log_bin_trust_function_creators                | OFF                         |
| log_statements_unsafe_for_binlog               | ON                          |
| max_binlog_cache_size                          | 18446744073709547520        |
| max_binlog_size                                | 1073741824                  |
| max_binlog_stmt_cache_size                     | 18446744073709547520        |
| mysqlx_bind_address                            | *                           |
| sql_log_bin                                    | ON                          |
| sync_binlog                                    | 1                           |
 ──────────────────────────────────────────────────────────────────────────────
chenzhony commented 3 months ago

the reason should be because some column of the database has JSON format data. below is what my data generate, this should let you easy to identify the problem

import pandas as pd from faker import Faker import random import json from sqlalchemy import create_engine, Column, Integer, String, DateTime, JSON, Text from sqlalchemy.orm import sessionmaker, declarative_base import time

db_user = 'root' db_password = 'password' db_host = 'localhost' db_name = 'database'

engine = create_engine(f'mysql+mysqlconnector://{db_user}:{db_password}@{db_host}/{db_name}')

Base = declarative_base()

class FakeData(Base): tablename = 'conversation'

id = Column(Integer, primary_key=True, autoincrement=True)
participant = Column(JSON)
date = Column(DateTime)
type = Column(String(50))
application = Column(String(50))
content = Column(Text)
date_timestamp = Column(Integer)
attachment = Column(String(255))

Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)

fake = Faker() fake_zh = Faker('zh_CN')

def generate_fake_data(num_rows): fake_data = [] phone_roles = ['Caller', 'Callee'] emailroles = ['Sender', 'Receiver'] emails = [fake.email() for in range(1000)]
phone_numbers = [fake.phonenumber() for in range(300)]

for num in range(num_rows):
    application = random.choice(['reutersFST', 'reuters', 'bloomberg', 'exchange', 'deskphone'])
    if application in ['reutersFST', 'reuters']:
        record_type = 'chat'
    elif application == 'bloomberg':
        record_type = random.choice(['chat', 'email'])
    elif application == 'exchange':
        record_type = 'email'
    elif application == 'deskphone':
        record_type = 'phone'

    if record_type == 'chat':
        participant = [{'role': 'participant', 'email': random.choice(emails)} for _ in range(random.randint(1, 6))]
        content = f'{fake.date_time()}\n {fake_zh.paragraph()}\n# {fake.sentence()} {fake.paragraph()}'
    elif record_type == 'phone':
        participant = [{'role': random.choice(phone_roles), 'email': random.choice(phone_numbers)}]
        content = f'{fake.paragraph()}\n{fake_zh.paragraph()}{fake.paragraph()}\n{fake_zh.paragraph()}{fake.paragraph()}\n{fake_zh.paragraph()}' 
    elif record_type == 'email':
        sender = {'role': 'Sender', 'email': random.choice(emails)}
        receivers = [{'role': 'Receiver', 'email': random.choice(emails)} for _ in range(random.randint(1, 5))]
        participant = [sender] + receivers
        content = f'Receive Time: {fake.date_time()}\nSender Time: {fake.date_time()}\nReceive Address: {fake.address()}\nSend Address: {fake.address()}\nSubject: {fake.sentence()}{fake_zh.sentence()}\nBody: {fake.paragraph()}{fake_zh.paragraph()}\nAttachment: {fake.file_path()}'
    date = fake.date_time()
    date_timestamp = int(time.mktime(date.timetuple()))

    fake_row = {
        'participant': json.dumps(participant),
        'date': date,
        'type': record_type,
        'application': application,
        'content': content,
        'date_timestamp': date_timestamp,
        'attachment': f'/home/spdblon/{fake.word()}/{fake.date()}/file{random.randint(1, 100)}',
    }
    fake_data.append(fake_row)

return fake_data

def insert_fake_data(num_rows, batch_size=10000): Session = sessionmaker(bind=engine) session = Session()

for i in range(0, num_rows, batch_size):
    batch_data = generate_fake_data(batch_size)
    session.bulk_insert_mappings(FakeData, batch_data)
    session.commit()
    print(f'Inserted {i + batch_size} records')

insert_fake_data(10000)