toluaina / pgsync

Postgres to Elasticsearch/OpenSearch sync
https://pgsync.com
MIT License
1.11k stars 174 forks source link

parallel_sync multiprocess_async coroutine 'run_tasks' was never awaited #380

Open loren opened 1 year ago

loren commented 1 year ago

PGSync version: 2.3.3 Postgres version: 14.4 Elasticsearch version: 8.4 Python version: 3.9.12 Problem Description: When I run parallel_sync with the default multiprocess_async mode, it runs for a while and then errors with

Error Message (if any):

File "/Users/foo/workspace/pgsync/./bin/parallel_sync", line 441, in main
    multiprocess_async(tasks, document, nprocs=nprocs, verbose=verbose)
  File "/Users/foo/workspace/pgsync/pgsync/utils.py", line 29, in timed
    fn = func(*args, **kwargs)
  File "/Users/foo/workspace/pgsync/./bin/parallel_sync", line 312, in multiprocess_async
    event_loop.run_until_complete(
  File "/Users/foo/opt/anaconda3/lib/python3.9/asyncio/base_events.py", line 622, in run_until_complete
    self._check_closed()
  File "/Users/foo/opt/anaconda3/lib/python3.9/asyncio/base_events.py", line 515, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
sys:1: RuntimeWarning: coroutine 'run_tasks' was never awaited

As run_tasks is a coroutine, maybe it just needs to be called with asyncio.run()?

    sys.stdout.write("Multi-process async\n")
    executor: ProcessPoolExecutor = ProcessPoolExecutor(max_workers=nprocs)
    # event_loop = asyncio.get_event_loop()
    try:
        asyncio.run(
            run_tasks(executor, tasks, doc, verbose=verbose, validate=validate)
        )
    except KeyboardInterrupt:
        pass
    # finally:
    #     event_loop.close()
toluaina commented 1 year ago

Oops sorry fixed this in main branch. sorry