agronholm / apscheduler

Task scheduling library for Python
MIT License
6.18k stars 704 forks source link

AsyncpgEventBroker crashes #828

Closed hexdecimal16 closed 4 months ago

hexdecimal16 commented 9 months ago

Things to check first

Version

v4.0.04a

What happened?

  + Exception Group Traceback (most recent call last):
  |   File "/usr/local/lib/python3.10/dist-packages/apscheduler/_schedulers/async_.py", line 667, in run_until_stopped
  |     async with create_task_group() as task_group:
  |   File "/usr/local/lib/python3.10/dist-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/usr/local/lib/python3.10/dist-packages/apscheduler/_schedulers/async_.py", line 889, in _process_jobs
    |     await wakeup_event.wait()
    |   File "/usr/local/lib/python3.10/dist-packages/anyio/_backends/_asyncio.py", line 1641, in wait
    |     await self._event.wait()
    |   File "/usr/lib/python3.10/asyncio/locks.py", line 214, in wait
    |     await fut
    | asyncio.exceptions.CancelledError: Cancelled by cancel scope 7fe68f79d270
    |
    | During handling of the above exception, another exception occurred:
    |
    | Exception Group Traceback (most recent call last):
    |   File "/usr/local/lib/python3.10/dist-packages/apscheduler/_schedulers/async_.py", line 863, in _process_jobs
    |     async with AsyncExitStack() as exit_stack:
    |   File "/usr/lib/python3.10/contextlib.py", line 714, in __aexit__
    |     raise exc_details[1]
    |   File "/usr/lib/python3.10/contextlib.py", line 697, in __aexit__
    |     cb_suppress = await cb(*exc_details)
    |   File "/usr/local/lib/python3.10/dist-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
    |     raise BaseExceptionGroup(
    | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
    +-+---------------- 1 ----------------
      | Traceback (most recent call last):
      |   File "/usr/local/lib/python3.10/dist-packages/apscheduler/_schedulers/async_.py", line 966, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('bfa56c75-c7ac-4d72-bb56-1064327f3553')
      +------------------------------------
12/22/2023 09:41:25 - INFO - apscheduler._schedulers.sync - Stream finished

How can we reproduce the bug?

 with Scheduler(data_store, event_broker) as scheduler:
            self.schedule_jobs(scheduler, start_time)

            scheduler.run_until_stopped()
agronholm commented 9 months ago

I'll need a complete reproducing example.

hexdecimal16 commented 9 months ago
from sqlalchemy.ext.asyncio import create_async_engine
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
from apscheduler import Scheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger

        self.db_url = f"postgresql+asyncpg://{db_user}:{password}@{db_host}:{db_port}/{db_name}"
        engine = create_async_engine(self.db_url)
        data_store = SQLAlchemyDataStore(engine)
        event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
        start_time = datetime.strptime("2021-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")

        with Scheduler(data_store, event_broker) as scheduler:
            self.schedule_jobs(scheduler, start_time)

            scheduler.run_until_stopped()

        def schedule_jobs(self, scheduler: Scheduler, start_time: datetime) -> None:
            scheduler.add_schedule(....)
            scheduler.add_schedule(....)
agronholm commented 9 months ago

Does that snippet run for you?

hexdecimal16 commented 9 months ago

you need to actually add some tasks and db variables.

This is the full main.py file for project:

from src.utils.logger import Logger
from src.scripts.news_scrapper import NewsScrapper
from src.scripts.news_sentiment import NewsSentiment
from src.scripts.news_decay import NewsDecay
from src.scripts.stock_fundamental import StockFundamental
from src.scripts.news_sentiment_history import NewsSentimentHistory
from src.scripts.stock_technical import StockTechnical

from sqlalchemy.ext.asyncio import create_async_engine
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
from apscheduler import Scheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger

import os
from dotenv import load_dotenv
from datetime import datetime

def run_news_scrapper():
    ns = NewsScrapper()
    ns.run()

def run_news_sentiment():
    nss = NewsSentiment()
    nss.run()

def run_news_decay():
    nd = NewsDecay()
    nd.run()

def run_news_sentiment_history():
    nsh = NewsSentimentHistory()
    nsh.run()

def run_stock_fundamental():
    sf = StockFundamental()
    sf.run()

def run_stock_technical():
    st = StockTechnical()
    st.run()

class Main:
    def __init__(self) -> None:
        # load .env file
        load_dotenv()
        self.logger = Logger.get_logger('main')
        self.scheduler = None
        self.db_url = self.get_db_url()

    def get_db_url(self) -> str:
        db_user = os.getenv("DB_USER", "postgres")
        password = os.getenv("DB_PASSWORD", "root")
        db_name = os.getenv("DB_DATABASE", "sy")
        db_host = os.getenv("DB_HOST", "localhost")
        db_port = os.getenv("DB_PORT", "5432")
        return f"postgresql+asyncpg://{db_user}:{password}@{db_host}:{db_port}/{db_name}"

    def run(self) -> None:
        self.logger.info("Starting main.py")
        # Initialize scheduler
        engine = create_async_engine(self.db_url)
        data_store = SQLAlchemyDataStore(engine)
        event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
        start_time = datetime.strptime("2021-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")

        with Scheduler(data_store, event_broker) as scheduler:
            self.schedule_jobs(scheduler, start_time)

            scheduler.run_until_stopped()

    def schedule_jobs(self, scheduler: Scheduler, start_time: datetime) -> None:
        scheduler.add_schedule(
            run_news_scrapper,
            IntervalTrigger(hours=2, start_time=start_time),
            id="run_news_scrapper",
        )
        scheduler.add_schedule(
            run_news_sentiment,
            IntervalTrigger(hours=3, start_time=start_time),
            id="run_news_sentiment",
        )
        scheduler.add_schedule(
            run_news_decay,
            CronTrigger(hour=22, minute=30, start_time=start_time),
            id="run_news_decay",
        )
        scheduler.add_schedule(
            run_news_sentiment_history,
            CronTrigger(hour=0, minute=30, start_time=start_time),
            id="run_news_sentiment_history",
        )
        scheduler.add_schedule(
            run_stock_fundamental,
            CronTrigger(hour=9, minute=30, start_time=start_time),
            id="run_stock_fundamental",
        )
        scheduler.add_schedule(
            run_stock_technical,
            CronTrigger(hour=18, minute=30, start_time=start_time),
            id="run_stock_technical",
        )

if __name__ == "__main__":
    m = Main()
    m.run()
agronholm commented 9 months ago

What I need is a minimal working example which:

  1. Is as short as possible
  2. Runs
  3. Reproduces the issue

Can you do that for me?

agronholm commented 4 months ago

Closing due to lack of responses.