taskiq-python / taskiq-fastapi

FastAPI integration for taskiq
MIT License
24 stars 2 forks source link

Integration is broken for schedulers #3

Open s3rius opened 1 year ago

s3rius commented 1 year ago

When starting scheduler, it doesn't import application and doesn't set dependency context.

Original issue: https://github.com/taskiq-python/taskiq/issues/142

nicognaW commented 1 year ago

The removal of importing application also causes [context](context: Context = TaskiqDepends()) not to work.

broker.available_tasks is filled when a function with @broker.task is loaded (imported), not importing the application causes for task in self.broker.available_tasks.values(): be executed before functions are imported (broker.available_tasks is filled), with no extra arguments provided (like the default in FastAPI-Template).

And then causes task_signatures to be empty.

My current work-around is to pass the full module name containing tasks to the worker cmd args (modules), since I have only a few modules include tasks.

I tried --fs-discover but didn;'t work, throws something like

  File "D:\source\crazy_diamond_py\.venv\Lib\site-packages\taskiq\cli\worker\run.py", line 127, in start_listen
    import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
  File "D:\source\crazy_diamond_py\.venv\Lib\site-packages\taskiq\cli\utils.py", line 90, in import_tasks
    import_from_modules(modules)
  File "D:\source\crazy_diamond_py\.venv\Lib\site-packages\taskiq\cli\utils.py", line 66, in import_from_modules
    import_module(module)
  File "C:\Users\nicog\AppData\Local\Programs\Python\Python311\Lib\importlib\__init__.py", line 121, in import_module
    raise TypeError(msg.format(name))
TypeError: the 'package' argument is required to perform a relative import for '.venv.Lib.site-packages.s3transfer.tasks

I believe this is because I'm using Python 3.11 in my project, so I didn't go deeper.

I'm fine with the current work-around, but it'd be great if we could come out some solutions for these problems with the compatibility of multi-broker. If you have some direction or ideas but don't have the time to research it, I'd like to contribute.

s3rius commented 1 year ago

Hi, @nicognaW. Relative imports doesn't work, because taskiq imports your modules as is and has no propper python-path for each tasks file. Consider removing all relative imports from your projects.

nicognaW commented 1 year ago

Hi, @nicognaW. Relative imports doesn't work, because taskiq imports your modules as is and has no propper python-path for each tasks file. Consider removing all relative imports from your projects.

Hi, I confirmed that I'm not using any relative imports in my own code, it's possible that one of my dependencies does though. Today when I cloned the project on another macOS machine, the error didn't occur, and everything went fine.

Not sure if relevant, there's one more differences between the Windows and macOS environment is that I use in-project venv option with poetry on the Windows machine, and for macOS the venv is created at something like /Users/{username}/Library/Caches/pypoetry/virtualenvs/crazy-diamond-py-E-Drn1To-py3.11.

Anyway, I personally prefer using the modules args, passing the full module name rather than with -fsd searching the fs for a pattern.

nicognaW commented 1 year ago

Another side-effect with FastAPI-Template is that when workers instrument the OTEL, the process will crash at calling app.url_path_for, since the routers are not registered.

s3rius commented 1 year ago

That's really weird. At first I decided to separate tasks from the project and import them using modules as arguments to taskiq. I created a project and defined a task using async_shared_broker.

from taskiq import async_shared_broker

@async_shared_broker.task
async def my_task(a: int) -> None:
    print("lol", a)

And then I created another project which depepnds on the first one.

from taskiq import async_shared_broker
from taskiq_redis import ListQueueBroker

broker = ListQueueBroker("redis://localhost")
async_shared_broker.default_broker(broker)

And finally, I created a main.py where I called a task.

import asyncio

from taskslib.tasks import my_task

from tkqtest.tkq import broker

async def main():
    await broker.startup()
    await my_task.kiq(a=1)
    await broker.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

I started worker with this command:

taskiq worker tkqtest.tkq:broker taskslib.tasks

And it worked out. I tried setting poetry config virtualenvs.in-project to true and false, but no luck. I couldn't reproduce the problem. Maybe it incorrectly resolve names. Can you try renaming .venv to something different without dot?

nicognaW commented 1 year ago

I'm sorry if I missexpressed anything, but the import issue is with --fs-discover not with manually specifying the task modules.

Manually specify the task modules like you did:

taskiq worker tkqtest.tkq:broker taskslib.tasks

didn't produce the error.

And reviewing my stacktraces, I realized that one of my dependencies s3transfer has a file also called task.py which triggered taskiq cli to think it was a task module that needed to be imported (with the default --tasks-pattern set to task.py), and it tried to import, but the package is located in my .venv folder, and the import error was thrown.

So overall it's not an issue for separating tasks from the project, it's just task.py is too common, and letting the taskiq cli missimport modules from unintended third-party packages.

haophancs commented 10 months ago

Problem

I was running my FastAPI project with taskiq integrated (generated from fastapi_template). And I added a taskiq-scheduler instance like this, but it failed to send tasks (like nothing happened).

# api_service/deploy/docker-compose.yml
  api: &main_app
    build:
      context: .
      dockerfile: ./deploy/Dockerfile
      target: prod
    image: api_service:${API_SERVICE_VERSION:-latest}
    restart: always
    env_file:
    - .env
    networks:
    - default

  taskiq-worker:
    <<: *main_app
    labels: []
    command:
    - taskiq
    - worker
    - api_service.tkq:broker
    - --fs-discover
    - --tasks-pattern
    - "schedule_*.py"

  taskiq-scheduler:
    <<: *main_app
    labels: []
    command:
    - taskiq
    - scheduler
    - api_service.tkq:scheduler
    - --fs-discover
    - --tasks-pattern
    - "schedule_*.py"
    depends_on:
      api:
        condition: service_started
      taskiq-worker:
        condition: service_started

# ...

Solution

I observed that it cannot detect my scheduled tasks in this way. After that, I tried two experiments and SUCCESSFULLY made the scheduler active:

# api_service/api_service/services/scheduler/__main__.py
import asyncio

from taskiq import TaskiqScheduler
from taskiq.cli.scheduler.args import SchedulerArgs
from taskiq.cli.scheduler.run import run_scheduler
from taskiq.schedule_sources import LabelScheduleSource

from api_service.tkq import broker

scheduler_tkq = TaskiqScheduler(
    broker=broker,
    sources=[LabelScheduleSource(broker)],
    refresh_delay=2,
)

async def main() -> None:
    """Initialize Taskiq scheduler."""
    await run_scheduler(
        SchedulerArgs(
            scheduler=scheduler_tkq,
            modules=[],
            fs_discover=True,
            tasks_pattern="schedule_*.py",
        ),
    )

if __name__ == "__main__":
    asyncio.run(main())
# api_service/deploy/docker-compose.yml
 # ...
   taskiq-scheduler:
    image: api_service:${API_SERVICE_VERSION:-latest}
    restart: "always"
    command:
    - /usr/local/bin/python
    - -m
    - api_service.services.scheduler
    volumes:
      # Adds current directory as volume.
    - .:/app/src/
    depends_on:
      api:
        condition: service_started
      taskiq-worker:
        condition: service_started
 # ...

Packages version:

# ...
[[package]]
name = "fastapi"
version = "0.89.1"
description = "FastAPI framework, high performance, easy to learn, fast to code, ready for production"
optional = false
python-versions = ">=3.7"
files = [
    {file = "fastapi-0.89.1-py3-none-any.whl", hash = "sha256:f9773ea22290635b2f48b4275b2bf69a8fa721fda2e38228bed47139839dc877"},
    {file = "fastapi-0.89.1.tar.gz", hash = "sha256:15d9271ee52b572a015ca2ae5c72e1ce4241dd8532a534ad4f7ec70c376a580f"},
]

[[package]]
name = "taskiq"
version = "0.8.6"
description = "Distributed task queue with full async support"
optional = false
python-versions = ">=3.8.1,<4.0.0"
files = [
    {file = "taskiq-0.8.6-py3-none-any.whl", hash = "sha256:1801f5979793b39da55a1b9bf64cb89ad9c082125fdbc63ac5a7823294dedc7e"},
    {file = "taskiq-0.8.6.tar.gz", hash = "sha256:6493098da7b77c5ec34ee97a6ee2fd369f52f72d7a61f3f676d53a982221fd4a"},
]

[[package]]
name = "taskiq-dependencies"
version = "1.3.0"
description = "FastAPI like dependency injection implementation"
optional = false
python-versions = ">=3.8.1,<4.0.0"
files = [
    {file = "taskiq_dependencies-1.3.0-py3-none-any.whl", hash = "sha256:e260bd8976b9190ac6d625a9c57bf9f43361afef3f9da585db4f8e2a141bda0f"},
    {file = "taskiq_dependencies-1.3.0.tar.gz", hash = "sha256:9cbad8700325db895ac9d3442c8d7d97e4e2c3335910b52d5000227847b2f9dd"},
]

[[package]]
name = "taskiq-fastapi"
version = "0.3.0"
description = "FastAPI integration for taskiq"
optional = false
python-versions = ">=3.8.1,<4.0.0"
files = [
    {file = "taskiq_fastapi-0.3.0-py3-none-any.whl", hash = "sha256:93eae839c0df9f24d5dcaef9c617b21fbe2396ce0490dacbb39c6ca37be3a997"},
    {file = "taskiq_fastapi-0.3.0.tar.gz", hash = "sha256:3beb52c389ebee528be6579794fde06809f3435fcd3af867f25add9b3f32576b"},
]

[[package]]
name = "taskiq-redis"
version = "0.4.0"
description = "Redis integration for taskiq"
optional = false
python-versions = ">=3.8.1,<4.0.0"
files = [
    {file = "taskiq_redis-0.4.0-py3-none-any.whl", hash = "sha256:61a01b8076a41730c1a1b7aa659f3bf703e4e7eb53a1e049bee02f0fa17177ae"},
    {file = "taskiq_redis-0.4.0.tar.gz", hash = "sha256:7b22d2028965878b9a0567b4eaf7ab939fbd985226d0936c1407846e9d5c523d"},
]

# ...
s3rius commented 10 months ago

@haophancs, seems like the problem is with imports, not sure what was the initial problem.

Can you provide a little bit more context? Debug logs for example and how you define the scheduler in your tkq module?

haophancs commented 10 months ago

@s3rius We can see that in the beginning, I set the taskiq-scheduler instance's entry command as

taskiq scheduler api_service.tkq:scheduler --fs-discover --tasks-pattern "schedule_*.py"

It just shows two lines of initial logs:

2023-10-31 02:29:03 [2023-10-30 19:29:03,234][INFO   ][run:run_scheduler:100] Starting scheduler.
2023-10-31 02:29:03 [2023-10-30 19:29:03,234][INFO   ][run:run_scheduler:102] Startup completed.

And nothing else, even when it's time to do the task determined by cron. Therefore I feel the scheduler cannot detect the scheduled tasks. So I tried two solutions above and it works (I see the log line about sending task).

This is my tkq.py file:

# api_service/api_service/tkq.py
import taskiq_fastapi
from taskiq import InMemoryBroker
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend

from api_service.settings import settings

result_backend = RedisAsyncResultBackend(
    redis_url=str(settings.redis_url.with_path("/1")),
)
broker = ListQueueBroker(
    str(settings.redis_url.with_path("/1")),
).with_result_backend(result_backend)

if settings.environment.lower() == "pytest":
    broker = InMemoryBroker()

taskiq_fastapi.init(
    broker,
    "api_service.app.application:get_app",
)

# if we define a separate module for the scheduler as in my second solution,
# the scheduler will not be here but the __main__.py file of the module.
scheduler = TaskiqScheduler(
    broker=broker,
    sources=[LabelScheduleSource(broker)],
    refresh_delay=settings.scheduler_refresh_delay,
)

And the task file:

# api_service/api_service/services/scheduler/schedules/schedule_example.py
from api_service.tkq import broker

@broker.task(schedule=[{"cron": "* * * * *"}])
async def example_task() -> None:
     message = "this is a heavy task"

In my project now I surely prefer my second solution: just define the separate module and give it a __main__.py to run the scheduler.

haophancs commented 10 months ago

@s3rius Oh I got it, if wanna use the CLI command instead of the programmatically way, we need to declare the scheduler BEFORE calling taskiq_fastapi.init. Because in the beginning, I declared it after initializing taskiq_fastapi, it cannot detect the scheduled tasks.

Now the implementation below can make the scheduler done right.

# api_service/api_service/tkq.py
import taskiq_fastapi
from taskiq import InMemoryBroker, TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend

from api_service.settings import settings

result_backend = RedisAsyncResultBackend(
    redis_url=str(settings.redis_url.with_path("/1")),
)

broker = ListQueueBroker(
    str(settings.redis_url.with_path("/1")),
).with_result_backend(result_backend)

if settings.environment.lower() == "pytest":
    broker = InMemoryBroker()

# before calling taskiq_fastapi.init(...)
scheduler = TaskiqScheduler(
    broker=broker,
    sources=[LabelScheduleSource(broker)],
    refresh_delay=settings.scheduler_refresh_delay,
)

taskiq_fastapi.init(
    broker,
    "api_service.app.application:get_app",
)

And then just run an instance for it

# api_service/deploy/docker-compose.yml
  taskiq-scheduler:
    <<: *main_app
    labels: []
    command:
    - taskiq
    - scheduler
    - api_service.tkq:scheduler
    - --fs-discover
    - --tasks-pattern
    - "schedule_*.py"
    depends_on:
      api:
        condition: service_started
      taskiq-worker:
        condition: service_started
s3rius commented 10 months ago

Wow! Thanks for finding this out. Actually, i have no idea why this is happening, because by design there should be no such constraints as declaring scheduler before calling init. I'll take a look. At least it would be nice to note this behavior somewhere in docs.

XChikuX commented 4 months ago

Just curios. Has the issue been triaged to the root cause?

bgervan commented 1 week ago

The issue is the pattern, it grabs third party packages tasks.py file and drops an exectio with it. After changed the file to taskssomething.py and added the `--tasks-pattern 'tasks*.py'` to the worker, it finally started as expected.