agronholm / apscheduler

Task scheduling library for Python
MIT License
5.98k stars 694 forks source link

asyncio.exceptions.CancelledError: Cancelled by cancel scope 7f27c9d9bbe0 #834

Closed vadympop closed 6 months ago

vadympop commented 6 months ago

Things to check first

Version

4.0.0a4

What happened?

In random moment scheduler is crashing and throw this error:

api      | 2024-01-01 14:22:34,081 | INFO     | apscheduler._schedulers.async_:917 - Job 01e098c6-6d31-4ad4-9cf5-f331670c11fe was cancelled
api      | 2024-01-01 14:22:34,096 | ERROR    | apscheduler._schedulers.async_:693 - Scheduler crashed
api      |   + Exception Group Traceback (most recent call last):
api      |   |   File "/usr/local/lib/python3.10/site-packages/apscheduler/_schedulers/async_.py", line 667, in run_until_stopped
api      |   |     async with create_task_group() as task_group:
api      |   |   File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
api      |   |     raise BaseExceptionGroup(
api      |   | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
api      |   +-+---------------- 1 ----------------
api      |     | Traceback (most recent call last):
api      |     |   File "/usr/local/lib/python3.10/site-packages/apscheduler/_schedulers/async_.py", line 889, in _process_jobs
api      |     |     await wakeup_event.wait()
api      |     |   File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1641, in wait
api      |     |     await self._event.wait()
api      |     |   File "/usr/local/lib/python3.10/asyncio/locks.py", line 214, in wait
api      |     |     await fut
api      |     | asyncio.exceptions.CancelledError: Cancelled by cancel scope 7f27c9de4f40
api      |     |
api      |     | During handling of the above exception, another exception occurred:
api      |     |
api      |     | Exception Group Traceback (most recent call last):
api      |     |   File "/usr/local/lib/python3.10/site-packages/apscheduler/_schedulers/async_.py", line 863, in _process_jobs
api      |     |     async with AsyncExitStack() as exit_stack:
api      |     |   File "/usr/local/lib/python3.10/contextlib.py", line 714, in __aexit__
api      |     |     raise exc_details[1]
api      |     |   File "/usr/local/lib/python3.10/contextlib.py", line 697, in __aexit__
api      |     |     cb_suppress = await cb(*exc_details)
api      |     |   File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
api      |     |     raise BaseExceptionGroup(
api      |     | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
api      |     +-+---------------- 1 ----------------
api      |       | Traceback (most recent call last):
api      |       |   File "/usr/local/lib/python3.10/site-packages/apscheduler/_schedulers/async_.py", line 966, in _run_job
api      |       |     self._running_jobs.remove(job.id)
api      |       | KeyError: UUID('482d3fd1-1095-4d51-8c24-2e33e1ad7c3c')
api      |       +------------------------------------
api      | 2024-01-01 14:22:34,104 | ERROR    | uvicorn.error:134 - Traceback (most recent call last):
api      |   File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 714, in lifespan
api      |     await receive()
api      |   File "/usr/local/lib/python3.10/site-packages/uvicorn/lifespan/on.py", line 137, in receive
api      |     return await self.receive_queue.get()
api      |   File "/usr/local/lib/python3.10/asyncio/queues.py", line 159, in get
api      |     await getter
api      | asyncio.exceptions.CancelledError: Cancelled by cancel scope 7f27c9d9bbe0

Scheduler was tried to remove running job, but was throw error KeyError: UUID('482d3fd1-1095-4d51-8c24-2e33e1ad7c3c')

How can we reproduce the bug?

I dont know how to reproduce this bug, it's working always with SqlalchemyDatastore

agronholm commented 6 months ago

If it works with SQLAlchemyDataStore, then which job data store did you have problems with? Note that this report isn't very useful if I can't reproduce the problem here.

vadympop commented 6 months ago

I can reproduce this bug only with SQLAlchemyDataStore, other code doesn't affect for reproducing this bug

agronholm commented 6 months ago

I can reproduce this bug only with SQLAlchemyDataStore, other code doesn't affect for reproducing this bug

You said in the report that it's working always with SQLAlchemyDataStore. Which is it?

vadympop commented 6 months ago

I'm sorry, I didn't express myself correctly in the report. This bug is working always with SQLAlchemyDataStore

agronholm commented 6 months ago

Which database driver was this with? Can you reproduce it with another driver?

vadympop commented 6 months ago

PostgreSQL

agronholm commented 6 months ago

That's not a database driver. Are you using psycopg, asyncpg or what?

vadympop commented 6 months ago

asyncpg

agronholm commented 6 months ago

And have you tried with another driver yet?

vadympop commented 6 months ago

No, haven't tried yet

agronholm commented 6 months ago

Could you also try with the latest master version?

vadympop commented 6 months ago

Okay, I'll try it now

vadympop commented 6 months ago

I've received this error after startup the application

2024-01-01 17:14:19,870 | ERROR    | apscheduler._schedulers.async_:743 - Scheduler crashed
  + Exception Group Traceback (most recent call last):
  |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 709, in run_until_stopped
  |     async with create_task_group() as task_group:
  |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\anyio\_backends\_asyncio.py", line 664, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 930, in _process_jobs
    |     jobs = await self.data_store.acquire_jobs(self.identity, limit)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\datastores\sqlalchemy.py", line 804, in acquire_jobs
    |     async for attempt in self._retry():
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\tenacity\_asyncio.py", line 71, in __anext__
    |     do = self.iter(retry_state=self._retry_state)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\tenacity\__init__.py", line 314, in iter
    |     return fut.result()
    |   File "C:\Python310\lib\concurrent\futures\_base.py", line 439, in result
    |     return self.__get_result()
    |   File "C:\Python310\lib\concurrent\futures\_base.py", line 391, in __get_result
    |     raise self._exception
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\datastores\sqlalchemy.py", line 806, in acquire_jobs
    |     async with self._begin_transaction() as conn:
    |   File "C:\Python310\lib\contextlib.py", line 199, in __aenter__
    |     return await anext(self.gen)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\datastores\sqlalchemy.py", line 199, in _begin_transaction
    |     conn = await async_cm.__aenter__()
    |   File "C:\Python310\lib\contextlib.py", line 199, in __aenter__
    |     return await anext(self.gen)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\ext\asyncio\engine.py", line 1061, in begin
    |     async with conn:
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\ext\asyncio\base.py", line 125, in __aenter__
    |     return await self.start(is_ctxmanager=True)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\ext\asyncio\engine.py", line 270, in start
    |     await greenlet_spawn(self.sync_engine.connect)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 190, in greenlet_spawn
    |     result = context.throw(*sys.exc_info())
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 3268, in connect
    |     return self._connection_cls(self)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 145, in __init__
    |     self._dbapi_connection = engine.raw_connection()
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 3292, in raw_connection
    |     return self.pool.connect()
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\pool\base.py", line 452, in connect
    |     return _ConnectionFairy._checkout(self)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\pool\base.py", line 1269, in _checkout
    |     fairy = _ConnectionRecord.checkout(pool)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\pool\base.py", line 716, in checkout
    |     rec = pool._do_get()
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\pool\impl.py", line 169, in _do_get
    |     with util.safe_reraise():
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\langhelpers.py", line 146, in __exit__
    |     raise exc_value.with_traceback(exc_tb)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\pool\impl.py", line 167, in _do_get
    |     return self._create_connection()
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\pool\base.py", line 393, in _create_connection
    |     return _ConnectionRecord(self)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\pool\base.py", line 678, in __init__
    |     self.__connect()
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\pool\base.py", line 902, in __connect
    |     with util.safe_reraise():
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\langhelpers.py", line 146, in __exit__
    |     raise exc_value.with_traceback(exc_tb)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\pool\base.py", line 898, in __connect
    |     self.dbapi_connection = connection = pool._invoke_creator(self)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\create.py", line 637, in connect
    |     return dialect.connect(*cargs, **cparams)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\default.py", line 616, in connect
    |     return self.loaded_dbapi.connect(*cargs, **cparams)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 936, in connect
    |     await_only(creator_fn(*arg, **kw)),
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 125, in await_only
    |     return current.driver.switch(awaitable)  # type: ignore[no-any-return]
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 185, in greenlet_spawn
    |     value = await result
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\connection.py", line 2329, in connect
    |     return await connect_utils._connect(
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\connect_utils.py", line 991, in _connect
    |     conn = await _connect_addr(
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\connect_utils.py", line 828, in _connect_addr
    |     return await __connect_addr(params, True, *args)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\connect_utils.py", line 873, in __connect_addr
    |     tr, pr = await connector
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\connect_utils.py", line 752, in _create_ssl_connection
    |     do_ssl_upgrade = await pr.on_data
    | asyncio.exceptions.CancelledError: Cancelled by cancel scope 27c6be8bc70
    | 
    | During handling of the above exception, another exception occurred:
    | 
    | Exception Group Traceback (most recent call last):
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 913, in _process_jobs
    |     async with AsyncExitStack() as exit_stack:
    |   File "C:\Python310\lib\contextlib.py", line 714, in __aexit__
    |     raise exc_details[1]
    |   File "C:\Python310\lib\contextlib.py", line 697, in __aexit__
    |     cb_suppress = await cb(*exc_details)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\anyio\_backends\_asyncio.py", line 664, in __aexit__
    |     raise BaseExceptionGroup(
    | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (3 sub-exceptions)
    +-+---------------- 1 ----------------
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 994, in _run_job
      |     await self.data_store.release_job(self.identity, job, result)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\datastores\sqlalchemy.py", line 919, in release_job
      |     await self._event_broker.publish(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\eventbrokers\asyncpg.py", line 174, in publish
      |     notification = self.generate_notification_str(event)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\eventbrokers\base.py", line 115, in generate_notification_str
      |     serialized = self.serializer.serialize(event.marshal())
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\serializers\json.py", line 66, in serialize
      |     return dumps(obj, ensure_ascii=False, **self.dump_options).encode("utf-8")
      |   File "C:\Python310\lib\json\__init__.py", line 238, in dumps
      |     **kw).encode(obj)
      |   File "C:\Python310\lib\json\encoder.py", line 199, in encode
      |     chunks = self.iterencode(o, _one_shot=True)
      |   File "C:\Python310\lib\json\encoder.py", line 257, in iterencode
      |     return _iterencode(o, 0)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\serializers\json.py", line 54, in _default_hook
      |     raise TypeError(
      | TypeError: Object of type 'JobOutcome' is not JSON serializable
      +---------------- 2 ----------------
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\pba\core\services\iiko\client.py", line 82, in __post
      |     async with session.post(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\aiohttp\client.py", line 1167, in __aenter__
      |     self._resp = await self._coro
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\aiohttp\client.py", line 562, in _request
      |     conn = await self._connector.connect(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\aiohttp\connector.py", line 540, in connect
      |     proto = await self._create_connection(req, traces, timeout)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\aiohttp\connector.py", line 901, in _create_connection
      |     _, proto = await self._create_direct_connection(req, traces, timeout)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\aiohttp\connector.py", line 1155, in _create_direct_connection
      |     hosts = await asyncio.shield(host_resolved)
      | asyncio.exceptions.CancelledError: Cancelled by cancel scope 27c6be8bc70
      | 
      | During handling of the above exception, another exception occurred:
      | 
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 962, in _run_job
      |     retval = await job_executor.run_job(func, job)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\executors\async_.py", line 22, in run_job
      |     retval = await retval
      |   File "D:\PyProg\PizzaBotAPI\pba\core\scheduler\jobs\iiko.py", line 15, in save_and_update_menu
      |     nomenclature = await iiko.nomenclature()
      |   File "D:\PyProg\PizzaBotAPI\pba\core\services\iiko\client.py", line 195, in nomenclature
      |     response = await self.request("/api/1/nomenclature", data)
      |   File "D:\PyProg\PizzaBotAPI\pba\core\services\iiko\client.py", line 100, in request
      |     await self.refresh_token()
      |   File "D:\PyProg\PizzaBotAPI\pba\core\services\iiko\client.py", line 93, in refresh_token
      |     result = await self.__post('/api/1/access_token', data)
      |   File "D:\PyProg\PizzaBotAPI\pba\core\services\iiko\client.py", line 81, in __post
      |     async with aiohttp.ClientSession() as session:
      | asyncio.exceptions.CancelledError: Cancelled by cancel scope 27c6be8bc70
      | 
      | During handling of the above exception, another exception occurred:
      | 
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 970, in _run_job
      |     await self.data_store.release_job(self.identity, job, result)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\datastores\sqlalchemy.py", line 919, in release_job
      |     await self._event_broker.publish(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\eventbrokers\asyncpg.py", line 174, in publish
      |     notification = self.generate_notification_str(event)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\eventbrokers\base.py", line 115, in generate_notification_str
      |     serialized = self.serializer.serialize(event.marshal())
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\serializers\json.py", line 66, in serialize
      |     return dumps(obj, ensure_ascii=False, **self.dump_options).encode("utf-8")
      |   File "C:\Python310\lib\json\__init__.py", line 238, in dumps
      |     **kw).encode(obj)
      |   File "C:\Python310\lib\json\encoder.py", line 199, in encode
      |     chunks = self.iterencode(o, _one_shot=True)
      |   File "C:\Python310\lib\json\encoder.py", line 257, in iterencode
      |     return _iterencode(o, 0)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\serializers\json.py", line 54, in _default_hook
      |     raise TypeError(
      | TypeError: Object of type 'JobOutcome' is not JSON serializable
      +---------------- 3 ----------------
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\pba\core\scheduler\jobs\other.py", line 19, in send_planned_orders
      |     planned_orders = [Order.from_orm(i) for i in await crud.orders.get_planned(db=db)]
      |   File "D:\PyProg\PizzaBotAPI\pba\database\crud\orders.py", line 496, in get_planned
      |     resp = await db.execute(select(self.model).join(self.model.status).where(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\ext\asyncio\session.py", line 455, in execute
      |     result = await greenlet_spawn(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 190, in greenlet_spawn
      |     result = context.throw(*sys.exc_info())
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 2308, in execute
      |     return self._execute_internal(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 2190, in _execute_internal
      |     result: Result[Any] = compile_state_cls.orm_execute_statement(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\context.py", line 293, in orm_execute_statement
      |     result = conn.execute(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1416, in execute
      |     return meth(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\sql\elements.py", line 516, in _execute_on_connection
      |     return connection._execute_clauseelement(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1639, in _execute_clauseelement
      |     ret = self._execute_context(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1848, in _execute_context
      |     return self._exec_single_context(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1988, in _exec_single_context
      |     self._handle_dbapi_exception(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 2346, in _handle_dbapi_exception
      |     raise exc_info[1].with_traceback(exc_info[2])
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1969, in _exec_single_context
      |     self.dialect.do_execute(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\default.py", line 922, in do_execute
      |     cursor.execute(statement, parameters)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 591, in execute
      |     self._adapt_connection.await_(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 125, in await_only
      |     return current.driver.switch(awaitable)  # type: ignore[no-any-return]
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 185, in greenlet_spawn
      |     value = await result
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 557, in _prepare_and_execute
      |     self._rows = await prepared_stmt.fetch(*parameters)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\prepared_stmt.py", line 176, in fetch
      |     data = await self.__bind_execute(args, 0, timeout)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\prepared_stmt.py", line 241, in __bind_execute
      |     data, status, _ = await self.__do_execute(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\prepared_stmt.py", line 230, in __do_execute
      |     return await executor(protocol)
      |   File "asyncpg\protocol\protocol.pyx", line 207, in bind_execute
      | asyncio.exceptions.CancelledError: Cancelled by cancel scope 27c6be8bc70
      | 
      | During handling of the above exception, another exception occurred:
      | 
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 962, in _run_job
      |     retval = await job_executor.run_job(func, job)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\executors\async_.py", line 22, in run_job
      |     retval = await retval
      |   File "D:\PyProg\PizzaBotAPI\pba\core\scheduler\jobs\other.py", line 18, in send_planned_orders
      |     async with Session() as db:
      | asyncio.exceptions.CancelledError: Cancelled by cancel scope 27c6be8bc70
      | 
      | During handling of the above exception, another exception occurred:
      | 
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 970, in _run_job
      |     await self.data_store.release_job(self.identity, job, result)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\datastores\sqlalchemy.py", line 919, in release_job
      |     await self._event_broker.publish(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\eventbrokers\asyncpg.py", line 174, in publish
      |     notification = self.generate_notification_str(event)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\eventbrokers\base.py", line 115, in generate_notification_str
      |     serialized = self.serializer.serialize(event.marshal())
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\serializers\json.py", line 66, in serialize
      |     return dumps(obj, ensure_ascii=False, **self.dump_options).encode("utf-8")
      |   File "C:\Python310\lib\json\__init__.py", line 238, in dumps
      |     **kw).encode(obj)
      |   File "C:\Python310\lib\json\encoder.py", line 199, in encode
      |     chunks = self.iterencode(o, _one_shot=True)
      |   File "C:\Python310\lib\json\encoder.py", line 257, in iterencode
      |     return _iterencode(o, 0)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\serializers\json.py", line 54, in _default_hook
      |     raise TypeError(
      | TypeError: Object of type 'JobOutcome' is not JSON serializable
      +------------------------------------
agronholm commented 6 months ago

Ok, have you tried with the default pickle serializer?

vadympop commented 6 months ago

Where I should specify the default pickle serializer?

agronholm commented 6 months ago

Where do you have code that makes it use the JSON serializer?

vadympop commented 6 months ago

I use custom JsonSerializer in sqlalchemy engine

class CustomSerializer(json.JSONEncoder):
    def default(self, obj: typing.Any) -> typing.Any:
        if isinstance(obj, (datetime.date, datetime.datetime)):
            return obj.isoformat()
        elif isinstance(obj, BaseModel):
            return obj.dict()

def database_json_serializer(v):
    return json.dumps(v, indent=2, ensure_ascii=False, cls=CustomSerializer)

engine = create_async_engine(
    config.postgres_dsn,
    pool_size=20,
    max_overflow=60,
    json_serializer=database_json_serializer
)
agronholm commented 6 months ago

The exception originated from the event broker though. Are you passing a JSON serializer to that?

vadympop commented 6 months ago

No

class CustomSerializer(json.JSONEncoder):
    def default(self, obj: typing.Any) -> typing.Any:
        if isinstance(obj, (datetime.date, datetime.datetime)):
            return obj.isoformat()
        elif isinstance(obj, BaseModel):
            return obj.dict()

def database_json_serializer(v):
    return json.dumps(v, indent=2, ensure_ascii=False, cls=CustomSerializer)

engine = create_async_engine(
    config.postgres_dsn,
    pool_size=20,
    max_overflow=60,
    json_serializer=database_json_serializer
)
data_store = SQLAlchemyDataStore(scheduler_engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(scheduler_engine)
scheduler = AsyncScheduler(data_store, event_broker)
agronholm commented 6 months ago

Does the async_postgres example work for you?

vadympop commented 6 months ago

Yeah, it's working, but I use apscheduler with fastapi. A minimal example I described in #833

import asyncio
import datetime
import json
import logging
import typing
from contextlib import asynccontextmanager

import uvicorn
from apscheduler import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
from apscheduler.triggers.interval import IntervalTrigger
from fastapi import FastAPI, Response
from fastapi.middleware import Middleware
from pydantic import BaseModel
from sqlalchemy import Column, BigInteger, String
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import declarative_base
from starlette.types import ASGIApp, Scope, Receive, Send

logger = logging.getLogger(__name__)

class CustomSerializer(json.JSONEncoder):
    def default(self, obj: typing.Any) -> typing.Any:
        if isinstance(obj, (datetime.date, datetime.datetime)):
            return obj.isoformat()
        elif isinstance(obj, BaseModel):
            return obj.dict()

def database_json_serializer(v):
    return json.dumps(v, indent=2, ensure_ascii=False, cls=CustomSerializer)

engine = create_async_engine(
    "postgresql+asyncpg://",
    pool_size=20,
    max_overflow=60,
    json_serializer=database_json_serializer
)
Session = async_sessionmaker(autocommit=False, autoflush=False, bind=engine, class_=AsyncSession)
Base = declarative_base()

class Item(Base):
    __tablename__ = "items"
    id = Column(BigInteger, autoincrement=True, primary_key=True)
    name = Column(String, nullable=False)
    code = Column(String, unique=True, nullable=False)

async def job_1():
    logger.info("JOB_1 EXECUTE START")
    await asyncio.sleep(5)
    logger.info("JOB_1 EXECUTE END")

async def job_2():
    logger.info("JOB_2 EXECUTE START")

    async with Session() as db:
        new_obj = Item(name="Item", code="1a1a1a")
        db.add(new_obj)
        await db.commit()
        await db.refresh(new_obj)

    await asyncio.sleep(5)
    logger.info("JOB_2 EXECUTE END")

async def job_3():
    logger.info("JOB_3 EXECUTE START")
    await asyncio.sleep(10)
    logger.info("JOB_3 EXECUTE END")

class SchedulerMiddleware:
    def __init__(
        self,
        app: ASGIApp,
        scheduler: AsyncScheduler,
    ) -> None:
        self.app = app
        self.scheduler = scheduler

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] == "lifespan":
            async with self.scheduler:
                await self.scheduler.add_schedule(
                    job_1,
                    IntervalTrigger(minutes=1),
                    id="job_1"
                )
                await self.scheduler.add_schedule(
                    job_2,
                    IntervalTrigger(minutes=1),
                    id="job_2"
                )
                await self.scheduler.add_schedule(
                    job_3,
                    IntervalTrigger(minutes=1),
                    id="job_3"
                )

                await self.scheduler.start_in_background()
                await self.app(scope, receive, send)
        else:
            await self.app(scope, receive, send)

@asynccontextmanager
async def lifespan(_: FastAPI):
    logger.info("LIFESPAN EVENT")
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    yield

data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
scheduler = AsyncScheduler(data_store, event_broker)
app = FastAPI(lifespan=lifespan, middleware=[
    Middleware(SchedulerMiddleware, scheduler=scheduler)
])

@app.get("/")
async def route_main():
    return Response(status_code=202)

if __name__ == "__main__":
    uvicorn.run(
        "apscheduler_test:app",
        port=5000,
        host="localhost",
        reload=False,
        proxy_headers=False,
        log_config="logging.json"
    )
agronholm commented 6 months ago

Ok, I can reproduce the error on my end now using the asgi_fastapi example.

agronholm commented 6 months ago

I've pushed an update to master. Could you update and try again?

vadympop commented 6 months ago

Yeah, the new update no longer has this error(TypeError: Object of type 'JobOutcome' is not JSON serializable)

agronholm commented 6 months ago

Are you seeing any other errors anymore? Can I close this issue?

vadympop commented 6 months ago

We can close this issue, but can you look at #833? Or #833 already fixed in last commits?

vadympop commented 6 months ago

I did a few more tests and found that the error remained. This time I added a long delay to the job to simulate a real long job.

Code:

async def job_1():
    print("JOB_1 EXECUTE START")
    await asyncio.sleep(5)
    print("JOB_1 EXECUTE END")

async def job_2():
    print("JOB_2 EXECUTE START")
    await asyncio.sleep(180)
    print("JOB_2 EXECUTE END")

class SchedulerMiddleware:
    def __init__(self, app: ASGIApp, scheduler: AsyncScheduler) -> None:
        self.app = app
        self.scheduler = scheduler

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] == "lifespan":
            async with self.scheduler:
                await self.scheduler.add_schedule(job_1, IntervalTrigger(minutes=1), id="job_1")
                await self.scheduler.add_schedule(job_2, IntervalTrigger(minutes=1), id="job_2")
                await self.scheduler.start_in_background()
                await self.app(scope, receive, send)
        else:
            await self.app(scope, receive, send)

@asynccontextmanager
async def lifespan(_: FastAPI):
    print("LIFESPAN EVENT")
    yield

engine = create_async_engine("postgresql+asyncpg://...")
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
scheduler = AsyncScheduler(data_store, event_broker)
app = FastAPI(lifespan=lifespan, middleware=[Middleware(SchedulerMiddleware, scheduler=scheduler)])

Logs:

INFO:     Started server process [17500]
INFO:     Waiting for application startup.
LIFESPAN EVENT
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
JOB_1 EXECUTE START
JOB_2 EXECUTE START
JOB_1 EXECUTE END
JOB_2 EXECUTE START
JOB_1 EXECUTE START
JOB_2 EXECUTE START
JOB_1 EXECUTE END
JOB_2 EXECUTE START
JOB_2 EXECUTE START
JOB_1 EXECUTE START
JOB_2 EXECUTE START
JOB_1 EXECUTE END
JOB_2 EXECUTE END
JOB_2 EXECUTE START
JOB_2 EXECUTE START
JOB_2 EXECUTE START
JOB_1 EXECUTE START
JOB_2 EXECUTE START
JOB_1 EXECUTE END
JOB_2 EXECUTE START
JOB_2 EXECUTE START
JOB_2 EXECUTE START
JOB_1 EXECUTE START
JOB_2 EXECUTE START
JOB_2 EXECUTE END
JOB_2 EXECUTE END
JOB_1 EXECUTE END
JOB_2 EXECUTE END
JOB_2 EXECUTE END
Scheduler crashed
  + Exception Group Traceback (most recent call last):
  |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 709, in run_until_stopped
  |     async with create_task_group() as task_group:
  |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\anyio\_backends\_asyncio.py", line 664, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 939, in _process_jobs
    |     await wakeup_event.wait()
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\anyio\_backends\_asyncio.py", line 1621, in wait
    |     await self._event.wait()
    |   File "C:\Python310\lib\asyncio\locks.py", line 214, in wait
    |     await fut
    | asyncio.exceptions.CancelledError: Cancelled by cancel scope 1d880dbb460
    | 
    | During handling of the above exception, another exception occurred:
    | 
    | Exception Group Traceback (most recent call last):
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 913, in _process_jobs
    |     async with AsyncExitStack() as exit_stack:
    |   File "C:\Python310\lib\contextlib.py", line 714, in __aexit__
    |     raise exc_details[1]
    |   File "C:\Python310\lib\contextlib.py", line 697, in __aexit__
    |     cb_suppress = await cb(*exc_details)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\anyio\_backends\_asyncio.py", line 664, in __aexit__
    |     raise BaseExceptionGroup(
    | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (8 sub-exceptions)
    +-+---------------- 1 ----------------
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 998, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('323e8109-946f-4fbf-994d-c8ee64cc1bce')
      +---------------- 2 ----------------
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 994, in _run_job
      |     await self.data_store.release_job(self.identity, job, result)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\datastores\sqlalchemy.py", line 919, in release_job
      |     await self._event_broker.publish(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\eventbrokers\asyncpg.py", line 180, in publish
      |     await self._send.send(notification)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\anyio\streams\memory.py", line 215, in send
      |     await checkpoint()
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\anyio\lowlevel.py", line 27, in checkpoint
      |     await get_async_backend().checkpoint()
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\anyio\_backends\_asyncio.py", line 1984, in checkpoint
      |     await sleep(0)
      |   File "C:\Python310\lib\asyncio\tasks.py", line 596, in sleep
      |     await __sleep0()
      |   File "C:\Python310\lib\asyncio\tasks.py", line 590, in __sleep0
      |     yield
      | asyncio.exceptions.CancelledError: Cancelled by cancel scope 1d880dbb460
      | 
      | During handling of the above exception, another exception occurred:
      | 
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 998, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('1d638f58-599a-444e-a2d5-c684ab171d07')
      +---------------- 3 ----------------
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 998, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('00694d10-3d59-45a6-aa3b-ba32e29a5471')
      +---------------- 4 ----------------
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 998, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('1d638f58-599a-444e-a2d5-c684ab171d07')
      +---------------- 5 ----------------
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 998, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('00694d10-3d59-45a6-aa3b-ba32e29a5471')
      +---------------- 6 ----------------
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 998, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('323e8109-946f-4fbf-994d-c8ee64cc1bce')
      +---------------- 7 ----------------
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 998, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('323e8109-946f-4fbf-994d-c8ee64cc1bce')
      +---------------- 8 ----------------
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 998, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('70e5eee8-6894-43e0-82c2-8b9a7a1bf090')
      +------------------------------------
ERROR:    Traceback (most recent call last):
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\starlette\routing.py", line 714, in lifespan
    await receive()
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\uvicorn\lifespan\on.py", line 137, in receive
    return await self.receive_queue.get()
  File "C:\Python310\lib\asyncio\queues.py", line 159, in get
    await getter
asyncio.exceptions.CancelledError: Cancelled by cancel scope 1d880d1feb0

INFO:     Shutting down
INFO:     Finished server process [17500]
vadympop commented 6 months ago

@agronholm can you check, please?

agronholm commented 6 months ago

I'll check it this weekend.

vadympop commented 6 months ago

Also I've noticed that this error always throws after a long job(ex. 10 minutes)

api      | 2024-01-05 22:51:53,161 | INFO     | pba.core.scheduler.jobs.fuel_cards:126 - End retrieving in 547.7623925209045 secs
api      | 2024-01-05 22:51:53,162 | INFO     | apscheduler._schedulers.async_:988 - Job 6def27e2-fd7b-4519-aac1-6ac792bf8ef2 completed successfully
api      | 2024-01-05 22:51:53,169 | ERROR    | apscheduler._schedulers.async_:743 - Scheduler crashed
api      |   + Exception Group Traceback (most recent call last):
api      |   |   File "/usr/local/lib/python3.10/site-packages/apscheduler/_schedulers/async_.py", line 709, in run_until_stopped
api      |   |     async with create_task_group() as task_group:
api      |   |   File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
api      |   |     raise BaseExceptionGroup(
api      |   | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
api      |   +-+---------------- 1 ----------------
api      |     | Traceback (most recent call last):
api      |     |   File "/usr/local/lib/python3.10/site-packages/apscheduler/_schedulers/async_.py", line 939, in _process_jobs
api      |     |     await wakeup_event.wait()
api      |     |   File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1641, in wait
api      |     |     await self._event.wait()
api      |     |   File "/usr/local/lib/python3.10/asyncio/locks.py", line 214, in wait
api      |     |     await fut
api      |     | asyncio.exceptions.CancelledError: Cancelled by cancel scope 7f3d40d5f970
api      |     |
api      |     | During handling of the above exception, another exception occurred:
api      |     |
api      |     | Exception Group Traceback (most recent call last):
api      |     |   File "/usr/local/lib/python3.10/site-packages/apscheduler/_schedulers/async_.py", line 913, in _process_jobs
api      |     |     async with AsyncExitStack() as exit_stack:
api      |     |   File "/usr/local/lib/python3.10/contextlib.py", line 714, in __aexit__
api      |     |     raise exc_details[1]
api      |     |   File "/usr/local/lib/python3.10/contextlib.py", line 697, in __aexit__
api      |     |     cb_suppress = await cb(*exc_details)
api      |     |   File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
api      |     |     raise BaseExceptionGroup(
api      |     | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
api      |     +-+---------------- 1 ----------------
api      |       | Traceback (most recent call last):
api      |       |   File "/usr/local/lib/python3.10/site-packages/apscheduler/_schedulers/async_.py", line 998, in _run_job
api      |       |     self._running_jobs.remove(job.id)
api      |       | KeyError: UUID('6def27e2-fd7b-4519-aac1-6ac792bf8ef2')
api      |       +------------------------------------
api      | 2024-01-05 22:51:53,179 | ERROR    | uvicorn.error:134 - Traceback (most recent call last):
api      |   File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 714, in lifespan
api      |     await receive()
api      |   File "/usr/local/lib/python3.10/site-packages/uvicorn/lifespan/on.py", line 137, in receive
api      |     return await self.receive_queue.get()
api      |   File "/usr/local/lib/python3.10/asyncio/queues.py", line 159, in get
api      |     await getter
api      | asyncio.exceptions.CancelledError: Cancelled by cancel scope 7f3d416bfdc0
vadympop commented 6 months ago

@agronholm can you check, please?

agronholm commented 6 months ago

I said I would check it this weekend which isn't over yet. Patience! I have plenty of other projects to work on too.

agronholm commented 6 months ago

My changes that were supposed to fix the traceback length weren't correct. I've pushed the corrected code to this branch. I'll merge it to master when I have tests for it ready.

agronholm commented 6 months ago

I was able to reproduce the KeyError: UUID('469d2e0f-2b9f-40ca-9b2b-df73e87d0d8d') even with the default in-memory data store.

agronholm commented 6 months ago

That bug seems to be the same as #803.