agronholm / apscheduler

Task scheduling library for Python
MIT License
5.98k stars 694 forks source link

After exception AsyncScheduler crashes #833

Closed vadympop closed 2 months ago

vadympop commented 6 months ago

Things to check first

Version

4.0.0a4

What happened?

After an exception the AsyncScheduler crashes. I'm tried reproduce this with throwing basic exception, but it works only with Sqlalchemy exception. Also I'm using Sqlalchemy datastore. After exception apscheduler is throwing this:

2023-12-31 09:43:20,954 | ERROR    | apscheduler._schedulers.async_:931 - Job ade58903-05d3-45a1-b8b3-107531e0d1ae raised an exception
Traceback (most recent call last):
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 557, in _prepare_and_execute
    self._rows = await prepared_stmt.fetch(*parameters)
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\prepared_stmt.py", line 176, in fetch
    data = await self.__bind_execute(args, 0, timeout)
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\prepared_stmt.py", line 241, in __bind_execute
    data, status, _ = await self.__do_execute(
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\prepared_stmt.py", line 230, in __do_execute
    return await executor(protocol)
  File "asyncpg\protocol\protocol.pyx", line 207, in bind_execute
asyncpg.exceptions.UniqueViolationError: повторювані значення ключа порушують обмеження унікальності "items_code_key"
DETAIL:  Ключ (code)=(1a1a1a) вже існує.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1969, in _exec_single_context
    self.dialect.do_execute(
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\default.py", line 922, in do_execute
    cursor.execute(statement, parameters)
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 591, in execute
    self._adapt_connection.await_(
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 125, in await_only
    return current.driver.switch(awaitable)  # type: ignore[no-any-return]
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 185, in greenlet_spawn
    value = await result
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 569, in _prepare_and_execute
    self._handle_exception(error)
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 520, in _handle_exception
    self._adapt_connection._handle_exception(error)
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 808, in _handle_exception
    raise translated_error from error
sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.IntegrityError: <class 'asyncpg.exceptions.UniqueViolationError'>: повторювані значення ключа порушують обмеження унікальності "items_code_key"
DETAIL:  Ключ (code)=(1a1a1a) вже існує.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 915, in _run_job
    retval = await job_executor.run_job(func, job)
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\executors\async_.py", line 22, in run_job
    retval = await retval
  File "D:\PyProg\PizzaBotAPI\apscheduler_test.py", line 67, in job_2
    await db.commit()
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\ext\asyncio\session.py", line 1011, in commit
    await greenlet_spawn(self.sync_session.commit)
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 192, in greenlet_spawn
    result = context.switch(value)
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1969, in commit
    trans.commit(_to_root=True)
  File "<string>", line 2, in commit
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\state_changes.py", line 139, in _go
    ret_value = fn(self, *arg, **kw)
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1256, in commit
    self._prepare_impl()
  File "<string>", line 2, in _prepare_impl
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\state_changes.py", line 139, in _go
    ret_value = fn(self, *arg, **kw)
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1231, in _prepare_impl
    self.session.flush()
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 4312, in flush
    self._flush(objects)
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 4447, in _flush
    with util.safe_reraise():
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\langhelpers.py", line 146, in __exit__
    raise exc_value.with_traceback(exc_tb)
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 4408, in _flush
    flush_context.execute()
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\unitofwork.py", line 466, in execute
    rec.execute(self)
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\unitofwork.py", line 642, in execute
    util.preloaded.orm_persistence.save_obj(
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\persistence.py", line 93, in save_obj
    _emit_insert_statements(
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\persistence.py", line 1226, in _emit_insert_statements
    result = connection.execute(
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1416, in execute
    return meth(
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\sql\elements.py", line 516, in _execute_on_connection
    return connection._execute_clauseelement(
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1639, in _execute_clauseelement
    ret = self._execute_context(
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1848, in _execute_context
    return self._exec_single_context(
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1988, in _exec_single_context
    self._handle_dbapi_exception(
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 2343, in _handle_dbapi_exception
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1969, in _exec_single_context
    self.dialect.do_execute(
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\default.py", line 922, in do_execute
    cursor.execute(statement, parameters)
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 591, in execute
    self._adapt_connection.await_(
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 125, in await_only
    return current.driver.switch(awaitable)  # type: ignore[no-any-return]
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 185, in greenlet_spawn
    value = await result
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 569, in _prepare_and_execute
    self._handle_exception(error)
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 520, in _handle_exception
    self._adapt_connection._handle_exception(error)
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 808, in _handle_exception
    raise translated_error from error
sqlalchemy.exc.IntegrityError: (sqlalchemy.dialects.postgresql.asyncpg.IntegrityError) <class 'asyncpg.exceptions.UniqueViolationError'>: повторювані значення ключа порушують обмеження унікальності "items_code_key"
DETAIL:  Ключ (code)=(1a1a1a) вже існує.
[SQL: INSERT INTO items (name, code) VALUES ($1::VARCHAR, $2::VARCHAR) RETURNING items.id]
[parameters: ('Item', '1a1a1a')]
(Background on this error at: https://sqlalche.me/e/20/gkpj)
2023-12-31 09:43:21,233 | INFO     | apscheduler._schedulers.async_:917 - Job 9350e21c-059b-429f-be53-6af64f6696df was cancelled
2023-12-31 09:43:21,234 | INFO     | apscheduler._schedulers.async_:917 - Job c36d50fc-1146-46c3-81ca-d3f0b66982fb was cancelled
2023-12-31 09:43:21,240 | ERROR    | apscheduler._schedulers.async_:693 - Scheduler crashed
  + Exception Group Traceback (most recent call last):
  |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 667, in run_until_stopped
  |     async with create_task_group() as task_group:
  |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\anyio\_backends\_asyncio.py", line 664, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 889, in _process_jobs
    |     await wakeup_event.wait()
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\anyio\_backends\_asyncio.py", line 1621, in wait
    |     await self._event.wait()
    |   File "C:\Python310\lib\asyncio\locks.py", line 214, in wait
    |     await fut
    | asyncio.exceptions.CancelledError: Cancelled by cancel scope 1dd93cd0b50
    | 
    | During handling of the above exception, another exception occurred:
    | 
    | Exception Group Traceback (most recent call last):
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 863, in _process_jobs
    |     async with AsyncExitStack() as exit_stack:
    |   File "C:\Python310\lib\contextlib.py", line 714, in __aexit__
    |     raise exc_details[1]
    |   File "C:\Python310\lib\contextlib.py", line 697, in __aexit__
    |     cb_suppress = await cb(*exc_details)
    |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\anyio\_backends\_asyncio.py", line 664, in __aexit__
    |     raise BaseExceptionGroup(
    | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
    +-+---------------- 1 ----------------
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 557, in _prepare_and_execute
      |     self._rows = await prepared_stmt.fetch(*parameters)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\prepared_stmt.py", line 176, in fetch
      |     data = await self.__bind_execute(args, 0, timeout)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\prepared_stmt.py", line 241, in __bind_execute
      |     data, status, _ = await self.__do_execute(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\prepared_stmt.py", line 230, in __do_execute
      |     return await executor(protocol)
      |   File "asyncpg\protocol\protocol.pyx", line 207, in bind_execute
      | asyncpg.exceptions.UniqueViolationError: повторювані значення ключа порушують обмеження унікальності "items_code_key"
      | DETAIL:  Ключ (code)=(1a1a1a) вже існує.
      | 
      | The above exception was the direct cause of the following exception:
      | 
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1969, in _exec_single_context
      |     self.dialect.do_execute(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\default.py", line 922, in do_execute
      |     cursor.execute(statement, parameters)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 591, in execute
      |     self._adapt_connection.await_(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 125, in await_only
      |     return current.driver.switch(awaitable)  # type: ignore[no-any-return]
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 185, in greenlet_spawn
      |     value = await result
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 569, in _prepare_and_execute
      |     self._handle_exception(error)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 520, in _handle_exception
      |     self._adapt_connection._handle_exception(error)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 808, in _handle_exception
      |     raise translated_error from error
      | sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.IntegrityError: <class 'asyncpg.exceptions.UniqueViolationError'>: повторювані значення ключа порушують обмеження унікальності "items_code_key"
      | DETAIL:  Ключ (code)=(1a1a1a) вже існує.
      | 
      | The above exception was the direct cause of the following exception:
      | 
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 915, in _run_job
      |     retval = await job_executor.run_job(func, job)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\executors\async_.py", line 22, in run_job
      |     retval = await retval
      |   File "D:\PyProg\PizzaBotAPI\apscheduler_test.py", line 67, in job_2
      |     await db.commit()
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\ext\asyncio\session.py", line 1011, in commit
      |     await greenlet_spawn(self.sync_session.commit)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 192, in greenlet_spawn
      |     result = context.switch(value)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1969, in commit
      |     trans.commit(_to_root=True)
      |   File "<string>", line 2, in commit
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\state_changes.py", line 139, in _go
      |     ret_value = fn(self, *arg, **kw)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1256, in commit
      |     self._prepare_impl()
      |   File "<string>", line 2, in _prepare_impl
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\state_changes.py", line 139, in _go
      |     ret_value = fn(self, *arg, **kw)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1231, in _prepare_impl
      |     self.session.flush()
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 4312, in flush
      |     self._flush(objects)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 4447, in _flush
      |     with util.safe_reraise():
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\langhelpers.py", line 146, in __exit__
      |     raise exc_value.with_traceback(exc_tb)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 4408, in _flush
      |     flush_context.execute()
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\unitofwork.py", line 466, in execute
      |     rec.execute(self)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\unitofwork.py", line 642, in execute
      |     util.preloaded.orm_persistence.save_obj(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\persistence.py", line 93, in save_obj
      |     _emit_insert_statements(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\persistence.py", line 1226, in _emit_insert_statements
      |     result = connection.execute(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1416, in execute
      |     return meth(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\sql\elements.py", line 516, in _execute_on_connection
      |     return connection._execute_clauseelement(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1639, in _execute_clauseelement
      |     ret = self._execute_context(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1848, in _execute_context
      |     return self._exec_single_context(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1988, in _exec_single_context
      |     self._handle_dbapi_exception(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 2343, in _handle_dbapi_exception
      |     raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1969, in _exec_single_context
      |     self.dialect.do_execute(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\default.py", line 922, in do_execute
      |     cursor.execute(statement, parameters)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 591, in execute
      |     self._adapt_connection.await_(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 125, in await_only
      |     return current.driver.switch(awaitable)  # type: ignore[no-any-return]
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 185, in greenlet_spawn
      |     value = await result
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 569, in _prepare_and_execute
      |     self._handle_exception(error)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 520, in _handle_exception
      |     self._adapt_connection._handle_exception(error)
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 808, in _handle_exception
      |     raise translated_error from error
      | sqlalchemy.exc.IntegrityError: (sqlalchemy.dialects.postgresql.asyncpg.IntegrityError) <class 'asyncpg.exceptions.UniqueViolationError'>: повторювані значення ключа порушують обмеження унікальності "items_code_key"
      | DETAIL:  Ключ (code)=(1a1a1a) вже існує.
      | [SQL: INSERT INTO items (name, code) VALUES ($1::VARCHAR, $2::VARCHAR) RETURNING items.id]
      | [parameters: ('Item', '1a1a1a')]
      | (Background on this error at: https://sqlalche.me/e/20/gkpj)
      | 
      | During handling of the above exception, another exception occurred:
      | 
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 947, in _run_job
      |     await self.event_broker.publish(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\eventbrokers\asyncpg.py", line 176, in publish
      |     raise SerializationError(
      | apscheduler.SerializationError: Serialized event object exceeds 7999 bytes in size
      +------------------------------------
2023-12-31 09:43:21,305 | ERROR    | uvicorn.error:134 - Traceback (most recent call last):
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\starlette\routing.py", line 714, in lifespan
    await receive()
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\uvicorn\lifespan\on.py", line 137, in receive
    return await self.receive_queue.get()
  File "C:\Python310\lib\asyncio\queues.py", line 159, in get
    await getter
asyncio.exceptions.CancelledError: Cancelled by cancel scope 1dd93bea0e0

So after this all schedules stop executing

How can we reproduce the bug?

Here is a minimal code example:

apscheduler_test.py

import asyncio
import datetime
import json
import logging
import typing
from contextlib import asynccontextmanager

import uvicorn
from apscheduler import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
from apscheduler.triggers.interval import IntervalTrigger
from fastapi import FastAPI, Response
from fastapi.middleware import Middleware
from pydantic import BaseModel
from sqlalchemy import Column, BigInteger, String
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import declarative_base
from starlette.types import ASGIApp, Scope, Receive, Send

logger = logging.getLogger(__name__)

class CustomSerializer(json.JSONEncoder):
    def default(self, obj: typing.Any) -> typing.Any:
        if isinstance(obj, (datetime.date, datetime.datetime)):
            return obj.isoformat()
        elif isinstance(obj, BaseModel):
            return obj.dict()

def database_json_serializer(v):
    return json.dumps(v, indent=2, ensure_ascii=False, cls=CustomSerializer)

engine = create_async_engine(
    "postgresql+asyncpg://postgres:{passwd}@{host}:{port}/{database}",
    pool_size=20,
    max_overflow=60,
    json_serializer=database_json_serializer
)
Session = async_sessionmaker(autocommit=False, autoflush=False, bind=engine, class_=AsyncSession)
Base = declarative_base()

class Item(Base):
    __tablename__ = "items"
    id = Column(BigInteger, autoincrement=True, primary_key=True)
    name = Column(String, nullable=False)
    code = Column(String, unique=True, nullable=False)

async def job_1():
    logger.info("JOB_1 EXECUTE START")
    await asyncio.sleep(5)
    logger.info("JOB_1 EXECUTE END")

async def job_2():
    logger.info("JOB_2 EXECUTE START")

    async with Session() as db:
        new_obj = Item(name="Item", code="1a1a1a")
        db.add(new_obj)
        await db.commit()
        await db.refresh(new_obj)

    await asyncio.sleep(5)
    logger.info("JOB_2 EXECUTE END")

async def job_3():
    logger.info("JOB_3 EXECUTE START")
    await asyncio.sleep(10)
    logger.info("JOB_3 EXECUTE END")

class SchedulerMiddleware:
    def __init__(
        self,
        app: ASGIApp,
        scheduler: AsyncScheduler,
    ) -> None:
        self.app = app
        self.scheduler = scheduler

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] == "lifespan":
            async with self.scheduler:
                await self.scheduler.add_schedule(
                    job_1,
                    IntervalTrigger(minutes=1),
                    id="job_1"
                )
                await self.scheduler.add_schedule(
                    job_2,
                    IntervalTrigger(minutes=1),
                    id="job_2"
                )
                await self.scheduler.add_schedule(
                    job_3,
                    IntervalTrigger(minutes=1),
                    id="job_3"
                )

                await self.scheduler.start_in_background()
                await self.app(scope, receive, send)
        else:
            await self.app(scope, receive, send)

@asynccontextmanager
async def lifespan(_: FastAPI):
    logger.info("LIFESPAN EVENT")
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    yield

data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
scheduler = AsyncScheduler(data_store, event_broker)
app = FastAPI(lifespan=lifespan, middleware=[
    Middleware(SchedulerMiddleware, scheduler=scheduler)
])

@app.get("/")
async def route_main():
    return Response(status_code=202)

if __name__ == "__main__":
    uvicorn.run(
        "apscheduler_test:app",
        port=5000,
        host="localhost",
        reload=False,
        proxy_headers=False,
        log_config="logging.json"
    )

logging.json

{
  "version": 1,
  "disable_existing_loggers": false,
  "formatters": {
    "default": {
      "format": "%(asctime)s | %(levelname)-8s | %(name)s:%(lineno)s - %(message)s"
    },
    "basic": {
      "()": "uvicorn.logging.DefaultFormatter",
      "format": "%(asctime)s | %(levelname)-8s | %(name)s - %(message)s"
    }
  },
  "handlers": {
    "console_handler": {
      "class": "logging.StreamHandler",
      "level": "INFO",
      "formatter": "default",
      "stream": "ext://sys.stdout"
    }
  },
  "loggers": {
    "root": {
      "level": "INFO",
      "handlers": [
        "console_handler"
      ]
    }
  }
}
agronholm commented 6 months ago

I need you to minimize this. That is, to remove everything not needed to reproduce the issue.

vadympop commented 6 months ago
engine = create_async_engine("postgresql+asyncpg://...")
Session = async_sessionmaker(autocommit=False, autoflush=False, bind=engine, class_=AsyncSession)
Base = declarative_base()

class Item(Base):
    __tablename__ = "items"
    id = Column(BigInteger, autoincrement=True, primary_key=True)
    code = Column(String, unique=True, nullable=False)

async def job_1():
    print("JOB_1 EXECUTE START")
    await asyncio.sleep(5)
    print("JOB_1 EXECUTE END")

async def job_2():
    print("JOB_2 EXECUTE START")
    async with Session() as db:
        new_obj = Item(code="1a1a1a")
        db.add(new_obj)
        await db.commit()

    print("JOB_2 EXECUTE END")

class SchedulerMiddleware:
    def __init__(self, app: ASGIApp, scheduler: AsyncScheduler) -> None:
        self.app = app
        self.scheduler = scheduler

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] == "lifespan":
            async with self.scheduler:
                await self.scheduler.add_schedule(job_1, IntervalTrigger(minutes=1), id="job_1")
                await self.scheduler.add_schedule(job_2, IntervalTrigger(minutes=1), id="job_2")
                await self.scheduler.start_in_background()
                await self.app(scope, receive, send)
        else:
            await self.app(scope, receive, send)

@asynccontextmanager
async def lifespan(_: FastAPI):
    print("LIFESPAN EVENT")
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    yield

data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
scheduler = AsyncScheduler(data_store, event_broker)
app = FastAPI(lifespan=lifespan, middleware=[Middleware(SchedulerMiddleware, scheduler=scheduler)])

In this code we're simulating sqalchemy error(ex. sqlalchemy.exc.IntegrityError: (sqlalchemy.dialects.postgresql.asyncpg.IntegrityError) <class 'asyncpg.exceptions.UniqueViolationError'>), after throwing this error the scheduler stops working.

agronholm commented 6 months ago

Will it not trigger the error if job 2 doesn't use the database?

vadympop commented 6 months ago

If, for example, job 2 throws a ValueError, the scheduler will continue work

vadympop commented 6 months ago

But if job 2 will throw error like this raise ValueError("aaasfsdafsghw45hj6tr5wj6rthewh"*10000) will be the same error

agronholm commented 6 months ago

So, in other words, the database stuff isn't required, just raising a ValueError will trigger the problem?

vadympop commented 6 months ago

I think it's because the thrown error text is too long apscheduler.SerializationError: Serialized event object exceeds 7999 bytes in size

     | During handling of the above exception, another exception occurred:
      | 
      | Traceback (most recent call last):
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 947, in _run_job
      |     await self.event_broker.publish(
      |   File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\eventbrokers\asyncpg.py", line 176, in publish
      |     raise SerializationError(
      | apscheduler.SerializationError: Serialized event object exceeds 7999 bytes in size
      +------------------------------------
2023-12-31 09:43:21,305 | ERROR    | uvicorn.error:134 - Traceback (most recent call last):
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\starlette\routing.py", line 714, in lifespan
    await receive()
  File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\uvicorn\lifespan\on.py", line 137, in receive
    return await self.receive_queue.get()
  File "C:\Python310\lib\asyncio\queues.py", line 159, in get
    await getter
asyncio.exceptions.CancelledError: Cancelled by cancel scope 1dd93bea0e0

So if we can reproduce it without db, we can minimize code to this:

async def job_1():
    print("JOB_1 EXECUTE START")
    await asyncio.sleep(5)
    print("JOB_1 EXECUTE END")

async def job_2():
    print("JOB_2 EXECUTE START")
    raise ValueError("aaasfsdafsghw45hj6tr5wj6rthewh"*10000)

class SchedulerMiddleware:
    def __init__(self, app: ASGIApp, scheduler: AsyncScheduler) -> None:
        self.app = app
        self.scheduler = scheduler

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] == "lifespan":
            async with self.scheduler:
                await self.scheduler.add_schedule(job_1, IntervalTrigger(minutes=1), id="job_1")
                await self.scheduler.add_schedule(job_2, IntervalTrigger(minutes=1), id="job_2")
                await self.scheduler.start_in_background()
                await self.app(scope, receive, send)
        else:
            await self.app(scope, receive, send)

@asynccontextmanager
async def lifespan(_: FastAPI):
    print("LIFESPAN EVENT")
    yield

engine = create_async_engine("postgresql+asyncpg://...")
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
scheduler = AsyncScheduler(data_store, event_broker)
app = FastAPI(lifespan=lifespan, middleware=[Middleware(SchedulerMiddleware, scheduler=scheduler)])
agronholm commented 6 months ago

Right – so in fact, the web framework shouldn't be required to reproduce this.

vadympop commented 6 months ago

Yes, but if it's an error such as sqlalchemy error, then it stops the entire scheduler

agronholm commented 6 months ago

I think I need to remove exception tracebacks and messages from the event structure.

agronholm commented 6 months ago

Ellipsizing them might be a less intrusive change. Could you test with the limit-jobreleased-size branch to see if the problem is gone? I'm done for today.

vadympop commented 6 months ago

limit-jobreleased-size branch is working correctly with this type of error. Thank you!