Open martin-ro opened 6 months ago
Can you show the error stack?
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /Users/martin/Projects/meilisync/meilisync/main.py:140 in start │
│ │
│ 137 │ │ lock = asyncio.Lock() │
│ 138 │ │ await asyncio.gather(_(), interval()) │
│ 139 │ │
│ ❱ 140 │ asyncio.run(run()) │
│ 141 │
│ 142 │
│ 143 @app.command(help="Refresh all data by swap index") │
│ │
│ ╭───────────────────────────────────────── locals ──────────────────────────────────────────╮ │
│ │ _ = <function start.<locals>._ at 0x1096b8a40> │ │
│ │ collection = <meilisync.event.EventCollection object at 0x1096db490> │ │
│ │ context = <click.core.Context object at 0x1088f76d0> │ │
│ │ current_progress = {'master_log_file': 'binlog.000026', 'master_log_position': 946322893} │ │
│ │ interval = <function start.<locals>.interval at 0x1096b8900> │ │
│ │ lock = <asyncio.locks.Lock object at 0x1096d9d10 [unlocked]> │ │
│ │ meili = <meilisync.meili.Meili object at 0x1096da790> │ │
│ │ meili_settings = MeiliSearch( │ │
│ │ │ api_url='http://127.0.0.1:7700', │ │
│ │ │ api_key='masterKey', │ │
│ │ │ insert_size=1000, │ │
│ │ │ insert_interval=10 │ │
│ │ ) │ │
│ │ progress = <meilisync.progress.file.File object at 0x1096b53d0> │ │
│ │ run = <function start.<locals>.run at 0x1096b8cc0> │ │
│ │ settings = Settings( │ │
│ │ │ plugins=['my_plugin.MyPlugin'], │ │
│ │ │ progress=Progress(type=<ProgressType.file: 'file'>), │ │
│ │ │ debug=False, │ │
│ │ │ source=Source( │ │
│ │ │ │ type=<SourceType.mysql: 'mysql'>, │ │
│ │ │ │ database='app_cams_local', │ │
│ │ │ │ host='localhost', │ │
│ │ │ │ port=3306, │ │
│ │ │ │ user='root', │ │
│ │ │ │ password='password' │ │
│ │ │ ), │ │
│ │ │ meilisearch=MeiliSearch( │ │
│ │ │ │ api_url='http://127.0.0.1:7700', │ │
│ │ │ │ api_key='masterKey', │ │
│ │ │ │ insert_size=1000, │ │
│ │ │ │ insert_interval=10 │ │
│ │ │ ), │ │
│ │ │ sync=[ │ │
│ │ │ │ Sync( │ │
│ │ │ │ │ plugins=[], │ │
│ │ │ │ │ table='cams', │ │
│ │ │ │ │ pk='id', │ │
│ │ │ │ │ full=False, │ │
│ │ │ │ │ index='local_cams', │ │
│ │ │ │ │ fields={'online': None, 'live_image_url': None} │ │
│ │ │ │ ) │ │
│ │ │ ], │ │
│ │ │ sentry=None │ │
│ │ ) │ │
│ │ source = <meilisync.source.mysql.MySQL object at 0x106b8af90> │ │
│ ╰───────────────────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py:190 in run │
│ │
│ 187 │ │ │ "asyncio.run() cannot be called from a running event loop") │
│ 188 │ │
│ 189 │ with Runner(debug=debug) as runner: │
│ ❱ 190 │ │ return runner.run(main) │
│ 191 │
│ 192 │
│ 193 def _cancel_all_tasks(loop): │
│ │
│ ╭─────────────────────────── locals ────────────────────────────╮ │
│ │ debug = None │ │
│ │ main = <coroutine object start.<locals>.run at 0x109600e50> │ │
│ │ runner = <asyncio.runners.Runner object at 0x1096d97d0> │ │
│ ╰───────────────────────────────────────────────────────────────╯ │
│ │
│ /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py:118 in run │
│ │
│ 115 │ │ │
│ 116 │ │ self._interrupt_count = 0 │
│ 117 │ │ try: │
│ ❱ 118 │ │ │ return self._loop.run_until_complete(task) │
│ 119 │ │ except exceptions.CancelledError: │
│ 120 │ │ │ if self._interrupt_count > 0: │
│ 121 │ │ │ │ uncancel = getattr(task, "uncancel", None) │
│ │
│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
│ │ context = <_contextvars.Context object at 0x1096d95c0> │ │
│ │ coro = <coroutine object start.<locals>.run at 0x109600e50> │ │
│ │ self = <asyncio.runners.Runner object at 0x1096d97d0> │ │
│ │ sigint_handler = functools.partial(<bound method Runner._on_sigint of │ │
│ │ <asyncio.runners.Runner object at 0x1096d97d0>>, main_task=<Task finished │ │
│ │ name='Task-4' coro=<start.<locals>.run() done, defined at │ │
│ │ /Users/martin/Projects/meilisync/meilisync/main.py:135> │ │
│ │ exception=error('required argument is not an integer')>) │ │
│ │ task = <Task finished name='Task-4' coro=<start.<locals>.run() done, defined at │ │
│ │ /Users/martin/Projects/meilisync/meilisync/main.py:135> │ │
│ │ exception=error('required argument is not an integer')> │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py:653 in │
│ run_until_complete │
│ │
│ 650 │ │ if not future.done(): │
│ 651 │ │ │ raise RuntimeError('Event loop stopped before Future completed.') │
│ 652 │ │ │
│ ❱ 653 │ │ return future.result() │
│ 654 │ │
│ 655 │ def stop(self): │
│ 656 │ │ """Stop running the event loop. │
│ │
│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
│ │ future = <Task finished name='Task-4' coro=<start.<locals>.run() done, defined at │ │
│ │ /Users/martin/Projects/meilisync/meilisync/main.py:135> exception=error('required │ │
│ │ argument is not an integer')> │ │
│ │ new_task = False │ │
│ │ self = <_UnixSelectorEventLoop running=False closed=True debug=False> │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Users/martin/Projects/meilisync/meilisync/main.py:138 in run │
│ │
│ 135 │ async def run(): │
│ 136 │ │ nonlocal lock │
│ 137 │ │ lock = asyncio.Lock() │
│ ❱ 138 │ │ await asyncio.gather(_(), interval()) │
│ 139 │ │
│ 140 │ asyncio.run(run()) │
│ 141 │
│ │
│ ╭───────────────────────────── locals ─────────────────────────────╮ │
│ │ _ = <function start.<locals>._ at 0x1096b8a40> │ │
│ │ interval = <function start.<locals>.interval at 0x1096b8900> │ │
│ │ lock = <asyncio.locks.Lock object at 0x1096d9d10 [unlocked]> │ │
│ ╰──────────────────────────────────────────────────────────────────╯ │
│ │
│ /Users/martin/Projects/meilisync/meilisync/main.py:102 in _ │
│ │
│ 99 │ │ │ │ │ │ f'No data found for table "{settings.source.database}.{sync.tabl │
│ 100 │ │ │ │ │ ) │
│ 101 │ │ logger.info(f'Start increment sync data from "{settings.source.type}" to MeiliSe │
│ ❱ 102 │ │ async for event in source: │
│ 103 │ │ │ if settings.debug: │
│ 104 │ │ │ │ logger.debug(event) │
│ 105 │ │ │ current_progress = event.progress │
│ │
│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
│ │ collection = <meilisync.event.EventCollection object at 0x1096db490> │ │
│ │ current_progress = {'master_log_file': 'binlog.000026', 'master_log_position': 946322893} │ │
│ │ event = ProgressEvent( │ │
│ │ │ progress={ │ │
│ │ │ │ 'master_log_file': 'binlog.000026', │ │
│ │ │ │ 'master_log_position': 946322893 │ │
│ │ │ } │ │
│ │ ) │ │
│ │ lock = <asyncio.locks.Lock object at 0x1096d9d10 [unlocked]> │ │
│ │ meili = <meilisync.meili.Meili object at 0x1096da790> │ │
│ │ meili_settings = MeiliSearch( │ │
│ │ │ api_url='http://127.0.0.1:7700', │ │
│ │ │ api_key='masterKey', │ │
│ │ │ insert_size=1000, │ │
│ │ │ insert_interval=10 │ │
│ │ ) │ │
│ │ progress = <meilisync.progress.file.File object at 0x1096b53d0> │ │
│ │ settings = Settings( │ │
│ │ │ plugins=['my_plugin.MyPlugin'], │ │
│ │ │ progress=Progress(type=<ProgressType.file: 'file'>), │ │
│ │ │ debug=False, │ │
│ │ │ source=Source( │ │
│ │ │ │ type=<SourceType.mysql: 'mysql'>, │ │
│ │ │ │ database='app_cams_local', │ │
│ │ │ │ host='localhost', │ │
│ │ │ │ port=3306, │ │
│ │ │ │ user='root', │ │
│ │ │ │ password='password' │ │
│ │ │ ), │ │
│ │ │ meilisearch=MeiliSearch( │ │
│ │ │ │ api_url='http://127.0.0.1:7700', │ │
│ │ │ │ api_key='masterKey', │ │
│ │ │ │ insert_size=1000, │ │
│ │ │ │ insert_interval=10 │ │
│ │ │ ), │ │
│ │ │ sync=[ │ │
│ │ │ │ Sync( │ │
│ │ │ │ │ plugins=[], │ │
│ │ │ │ │ table='cams', │ │
│ │ │ │ │ pk='id', │ │
│ │ │ │ │ full=False, │ │
│ │ │ │ │ index='local_cams', │ │
│ │ │ │ │ fields={'online': None, 'live_image_url': None} │ │
│ │ │ │ ) │ │
│ │ │ ], │ │
│ │ │ sentry=None │ │
│ │ ) │ │
│ │ source = <meilisync.source.mysql.MySQL object at 0x106b8af90> │ │
│ │ sync = Sync( │ │
│ │ │ plugins=[], │ │
│ │ │ table='cams', │ │
│ │ │ pk='id', │ │
│ │ │ full=False, │ │
│ │ │ index='local_cams', │ │
│ │ │ fields={'online': None, 'live_image_url': None} │ │
│ │ ) │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Users/martin/Projects/meilisync/meilisync/source/mysql.py:110 in __aiter__ │
│ │
│ 107 │ │ │ │ async for event in self.stream: │
│ 108 │ │ │ │ │ if isinstance(event, WriteRowsEvent): │
│ 109 │ │ │ │ │ │ event_type = EventType.create │
│ ❱ 110 │ │ │ │ │ │ data = event.rows[0]["values"] │
│ 111 │ │ │ │ │ elif isinstance(event, UpdateRowsEvent): │
│ 112 │ │ │ │ │ │ event_type = EventType.update │
│ 113 │ │ │ │ │ │ data = event.rows[0]["after_values"] │
│ │
│ ╭────────────────────────────────────── locals ──────────────────────────────────────╮ │
│ │ event = <asyncmy.replication.row_events.WriteRowsEvent object at 0x1096f5e50> │ │
│ │ event_type = <EventType.create: 'create'> │ │
│ │ self = <meilisync.source.mysql.MySQL object at 0x106b8af90> │ │
│ ╰────────────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/asyncmy/replicat │
│ ion/row_events.py:453 in rows │
│ │
│ 450 │ @property │
│ 451 │ def rows(self): │
│ 452 │ │ if self._rows is None: │
│ ❱ 453 │ │ │ self._fetch_rows() │
│ 454 │ │ return self._rows │
│ 455 │
│ 456 │
│ │
│ ╭─────────────────────────────────── locals ───────────────────────────────────╮ │
│ │ self = <asyncmy.replication.row_events.WriteRowsEvent object at 0x1096f5e50> │ │
│ ╰──────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/asyncmy/replicat │
│ ion/row_events.py:448 in _fetch_rows │
│ │
│ 445 │ │ │ return │
│ 446 │ │ │
│ 447 │ │ while self.packet.read_bytes < self.event_size: │
│ ❱ 448 │ │ │ self._rows.append(self._fetch_one_row()) │
│ 449 │ │
│ 450 │ @property │
│ 451 │ def rows(self): │
│ │
│ ╭─────────────────────────────────── locals ───────────────────────────────────╮ │
│ │ self = <asyncmy.replication.row_events.WriteRowsEvent object at 0x1096f5e50> │ │
│ ╰──────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/asyncmy/replicat │
│ ion/row_events.py:489 in _fetch_one_row │
│ │
│ 486 │ │ │ self.columns_present_bitmap = self.packet.read((self.number_of_columns + 7) │
│ 487 │ │
│ 488 │ def _fetch_one_row(self): │
│ ❱ 489 │ │ return {"values": self._read_column_data(self.columns_present_bitmap)} │
│ 490 │
│ 491 │
│ 492 class UpdateRowsEvent(RowsEvent): │
│ │
│ ╭─────────────────────────────────── locals ───────────────────────────────────╮ │
│ │ self = <asyncmy.replication.row_events.WriteRowsEvent object at 0x1096f5e50> │ │
│ ╰──────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/asyncmy/replicat │
│ ion/row_events.py:209 in _read_column_data │
│ │
│ 206 │ │ │ elif column.type == GEOMETRY: │
│ 207 │ │ │ │ values[name] = self.packet.read_length_coded_pascal_string(column.length │
│ 208 │ │ │ elif column.type == JSON: │
│ ❱ 209 │ │ │ │ values[name] = self.packet.read_binary_json(column.length_size) │
│ 210 │ │ │ else: │
│ 211 │ │ │ │ raise NotImplementedError("Unknown MySQL column type: %d" % (column.type │
│ 212 │
│ │
│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
│ │ cols_bitmap = b'\xff\xff\xff\xff' │ │
│ │ column = <asyncmy.replication.column.Column object at 0x1096f5510> │ │
│ │ i = 15 │ │
│ │ name = 'languages' │ │
│ │ nb_columns = 25 │ │
│ │ null_bitmap = b'\x00p/\x00' │ │
│ │ null_bitmap_index = 15 │ │
│ │ self = <asyncmy.replication.row_events.WriteRowsEvent object at 0x1096f5e50> │ │
│ │ unsigned = False │ │
│ │ values = { │ │
│ │ │ 'id': 1, │ │
│ │ │ 'site_id': 7, │ │
│ │ │ 'display_name': 'sdfg', │ │
│ │ │ 'slug': 'sdfg', │ │
│ │ │ 'online': 1, │ │
│ │ │ 'rank': 1, │ │
│ │ │ 'image_url': │ │
│ │ 'https://localhost', │ │
│ │ │ 'live_image_url': │ │
│ │ 'https://localhost… │ │
│ │ │ 'stream_url': │ │
│ │ 'https://localhost… │ │
│ │ │ 'affiliate_link': 'localhost', │ │
│ │ │ ... +5 │ │
│ │ } │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/asyncmy/replicat │
│ ion/packets.py:344 in read_binary_json │
│ │
│ 341 │ │ self.unread(payload) │
│ 342 │ │ t = self.read_uint8() │
│ 343 │ │ │
│ ❱ 344 │ │ return self.read_binary_json_type(t, length) │
│ 345 │ │
│ 346 │ def read_binary_json_type(self, t, length): │
│ 347 │ │ large = t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY) │
│ │
│ ╭────────────────────────────────── locals ──────────────────────────────────╮ │
│ │ length = 11 │ │
│ │ payload = b'\x02\x01\x00\n\x00\x0c\x07\x00\x02en' │ │
│ │ self = <asyncmy.replication.packets.BinLogPacket object at 0x1096f5e10> │ │
│ │ size = 4 │ │
│ │ t = 2 │ │
│ ╰────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/asyncmy/replicat │
│ ion/packets.py:351 in read_binary_json_type │
│ │
│ 348 │ │ if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT): │
│ 349 │ │ │ return self.read_binary_json_object(length - 1, large) │
│ 350 │ │ elif t in (JSONB_TYPE_SMALL_ARRAY, JSONB_TYPE_LARGE_ARRAY): │
│ ❱ 351 │ │ │ return self.read_binary_json_array(length - 1, large) │
│ 352 │ │ elif t in (JSONB_TYPE_STRING,): │
│ 353 │ │ │ return self.read_variable_length_string() │
│ 354 │ │ elif t in (JSONB_TYPE_LITERAL,): │
│ │
│ ╭───────────────────────────────── locals ──────────────────────────────────╮ │
│ │ large = False │ │
│ │ length = 11 │ │
│ │ self = <asyncmy.replication.packets.BinLogPacket object at 0x1096f5e10> │ │
│ │ t = 2 │ │
│ ╰───────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/asyncmy/replicat │
│ ion/packets.py:464 in read_binary_json_array │
│ │
│ 461 │ │ │ │ return x[2] │
│ 462 │ │ │ return self.read_binary_json_type(x[0], length) │
│ 463 │ │ │
│ ❱ 464 │ │ return [_read(x) for x in values_type_offset_inline] │
│ 465 │ │
│ 466 │ @staticmethod │
│ 467 │ def read_offset_or_inline(packet, large): │
│ │
│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
│ │ _read = <function BinLogPacket.read_binary_json_array.<locals>._read at │ │
│ │ 0x1096d5da0> │ │
│ │ elements = 1 │ │
│ │ large = False │ │
│ │ length = 10 │ │
│ │ self = <asyncmy.replication.packets.BinLogPacket object at 0x1096f5e10> │ │
│ │ size = 10 │ │
│ │ values_type_offset_inline = [(12, 7, None)] │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/asyncmy/replicat │
│ ion/packets.py:464 in <listcomp> │
│ │
│ 461 │ │ │ │ return x[2] │
│ 462 │ │ │ return self.read_binary_json_type(x[0], length) │
│ 463 │ │ │
│ ❱ 464 │ │ return [_read(x) for x in values_type_offset_inline] │
│ 465 │ │
│ 466 │ @staticmethod │
│ 467 │ def read_offset_or_inline(packet, large): │
│ │
│ ╭─────────────────────────────────────── locals ───────────────────────────────────────╮ │
│ │ .0 = <list_iterator object at 0x109657fd0> │ │
│ │ _read = <function BinLogPacket.read_binary_json_array.<locals>._read at 0x1096d5da0> │ │
│ │ x = (12, 7, None) │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/asyncmy/replicat │
│ ion/packets.py:462 in _read │
│ │
│ 459 │ │ def _read(x): │
│ 460 │ │ │ if x[1] is None: │
│ 461 │ │ │ │ return x[2] │
│ ❱ 462 │ │ │ return self.read_binary_json_type(x[0], length) │
│ 463 │ │ │
│ 464 │ │ return [_read(x) for x in values_type_offset_inline] │
│ 465 │
│ │
│ ╭───────────────────────────────── locals ──────────────────────────────────╮ │
│ │ length = 10 │ │
│ │ self = <asyncmy.replication.packets.BinLogPacket object at 0x1096f5e10> │ │
│ │ x = (12, 7, None) │ │
│ ╰───────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/asyncmy/replicat │
│ ion/packets.py:353 in read_binary_json_type │
│ │
│ 350 │ │ elif t in (JSONB_TYPE_SMALL_ARRAY, JSONB_TYPE_LARGE_ARRAY): │
│ 351 │ │ │ return self.read_binary_json_array(length - 1, large) │
│ 352 │ │ elif t in (JSONB_TYPE_STRING,): │
│ ❱ 353 │ │ │ return self.read_variable_length_string() │
│ 354 │ │ elif t in (JSONB_TYPE_LITERAL,): │
│ 355 │ │ │ value = self.read_uint8() │
│ 356 │ │ │ if value == JSONB_LITERAL_NULL: │
│ │
│ ╭───────────────────────────────── locals ──────────────────────────────────╮ │
│ │ large = False │ │
│ │ length = 10 │ │
│ │ self = <asyncmy.replication.packets.BinLogPacket object at 0x1096f5e10> │ │
│ │ t = 12 │ │
│ ╰───────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/asyncmy/replicat │
│ ion/packets.py:254 in read_variable_length_string │
│ │
│ 251 │ │ length = 0 │
│ 252 │ │ bits_read = 0 │
│ 253 │ │ while byte & 0x80 != 0: │
│ ❱ 254 │ │ │ byte = struct.pack("!B", self.read(1)) │
│ 255 │ │ │ length = length | ((byte & 0x7F) << bits_read) │
│ 256 │ │ │ bits_read = bits_read + 7 │
│ 257 │ │ return self.read(length) │
│ │
│ ╭─────────────────────────────────── locals ───────────────────────────────────╮ │
│ │ bits_read = 0 │ │
│ │ byte = 128 │ │
│ │ length = 0 │ │
│ │ self = <asyncmy.replication.packets.BinLogPacket object at 0x1096f5e10> │ │
│ ╰──────────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
Hello guys 👋 I get the same error, Sentry gives me this as details:
so may i ask how to fix this problem,,, this is because we using wrong json format in mysql database?
the reason probably because when you read data from mysql, the JSON data type will parse to string format by some reason, I do not know how to fix this problem. but please look at it @long2ice
same error. how to fix it?
When I update a JSON column in my MySQL database I get:
The issue seems to be related to how the asyncmy library handles JSON columns in MySQL replication events. When the UpdateRowsEvent is processed, and the asyncmy library attempts to read the value of my languages column, which is a JSON column, it encounters an error.
But I can't really figure how to handle JSON column updates?
Am I missing something here?