samuelcolvin / arq

Fast job queuing and RPC in python with asyncio and redis.
https://arq-docs.helpmanual.io/
MIT License
1.96k stars 162 forks source link

Future plan for Arq #437

Open samuelcolvin opened 2 months ago

samuelcolvin commented 2 months ago

Arq was the first real open source project I ever created, back in 2016. That was long before Pydantic, FastAPI, ParamSpec, or even Redis Streams.

I remember a sense of incredulity that I couldn't find an async variant of rq (which I was helping to maintain at the time), surely I wasn't the only person wanting to queue jobs in async code? Apparently at the time I was.

Fast forward eight years, and I'm definitely not the only person trying to queue jobs in an async world.

Hence my incredulity has only grown - there's still no ubiquitous queuing library for async Python, and despite neglect, Arq still seems to work well for lots of people, I've used it in every role I've had since, and for the most part it just works.


That said, Arq needs some love, and since we're now using it at Pydantic, I think we should have the resources to provide that love later this year. This is a rough plan of what I propose to do.

Feedback very welcome from all, but especially @JonasKs and @pydantic-maintainers (who apparently I can't tag here :-()

In summary I want to significantly refactor the internals, and update the API in a backwards compatible way.

1. ParamSpec and type safety 🚧

The most important change we should make is to make Arq typesafe using ParamSpec and Concatenate, I have a partially working demonstration of how this will work below.

We'll provide the same or similar method for enqueuing a job via it's name for cases where the worker code is not accessible where jobs are enqueued.

We should be able to do this while still supporting the current API to make migration as easy as possible.

2. Redis Streams πŸš€

The second most important change is to adopt Redis streams which read like they were designed for exactly this application, they mean we can effectively guarantee only one execution while still being resilient to unexpected shutdown (Jobs shutdown during execution will be rerun later).

This should be possible without breaking the current API at all.

3. Avoid sorted set for immediate jobs πŸš€

Current Arq is slower than it should be because it uses a sorted set to queue all jobs, the sorted set provides two things:

The idea would be to only use the sorted set for jobs scheduled to run in the future, then use the logic demonstrated by SAQ to take jobs off the sorted set when they're ready to be run and add them to the stream.

Jobs which are enqueued without a delay can be added to the stream immediately, which should significantly improve performance for this very common case.

This should be possible without breaking the current API at all.

4. Avoid polling πŸ‡΅πŸ‡±

Mostly for latency reasons it would be nice to avoid polling, the idea would be:

5. OpenTelemetry πŸ”­

Observability is close to our hearts at Pydantic, so it would be nice to have optional support for OpenTelemetry, or perhaps just hook points to implement your own observability.

This should be possible without breaking the current API at all.

6. DAG - Task Dependency Graph πŸ“ˆ

The idea is to allow one or more jobs to be triggered by one or more previous jobs.

See the then() and start_with() methods in the partial implementation below.

This should be possible without breaking the current API at all.

7. CLI, settings and worker.run improvements πŸƒ

We can mostly just copy uvicorn, we should also remove the very ugly WorkerSettings and configure the worker via simple function.

We should also fix reload logic to use watchfiles.

This can be done such that existing code still works, with or without deprecation warnings.

8. Separate the backend ↔️

We should separate the Redis logic to make it easier to provide alternative backends, an in memory backend for testing would be especially useful for unit testing applications.

This can be done such that existing code still works, with or without deprecation warnings.

9. Better documentation πŸ“–

Documentation should move to mkdocs material and mkdocstrings, and be improved significantly.

10. Moving repo to Pydantic 🏒

To provide the resources for this work, we should move Arq to the Pydantic organization, and the docs to arq.pydantic.dev or similar.

Have I missed anything?


API Sketch

Here's a sketch of how I see the new type-safe API working, together with a partial implementation:

Example Usage:

from __future__ import annotations as _annotations

from dataclasses import dataclass
from typing import AsyncIterator
from contextlib import asynccontextmanager

from arq import FunctionContext, WorkerApp

from httpx import AsyncClient

@dataclass
class MyWorkerDeps:
    """
    Type safe way of defining the dependencies of the worker functions.

    E.g. HTTP client, database connection, settings. etc.
    """
    http_client: AsyncClient

@asynccontextmanager
async def my_worker_lifespan() -> AsyncIterator[MyWorkerDeps]:
    async with AsyncClient() as http_client:
        yield MyWorkerDeps(http_client)

worker_app = WorkerApp(lifespan=my_worker_lifespan)

@worker_app.register
async def foo(ctx: FunctionContext[MyWorkerDeps], url: str) -> int:
    # ctx.deps here is of type MyWorkerDeps, that's enforced by static typing
    # FunctionContext will also provide access to a redis connection, retry count,
    # even results of other jobs etc.
    r = await ctx.deps.http_client.get(url)
    r.raise_for_status()
    print(f'{url}: {r.text[:80]!r}...')
    return len(r.text)

async def main() -> None:
    async with worker_app:
        # these two are equivalent, param spec means the arguments are type safe
        await foo.enqueue('https://microsoft.com').start()
        await foo.enqueue('https://microsoft.com')
        # same, delayed by 10 seconds with 5 second timeout
        await foo.enqueue('https://microsoft.com').start(delay_by=10, timeout=5)

        # call foo directly in the same process
        print('length:', await foo.direct('https://github.com'))

if __name__ == '__main__':
    import asyncio

    asyncio.run(main())
Partial Implementation ```py from __future__ import annotations as _annotations from dataclasses import dataclass, KW_ONLY from datetime import timedelta from typing import ParamSpec, TypeVar, Generic, Concatenate, overload, Self, Any, AsyncIterator, Generator from collections.abc import Awaitable, Callable from contextlib import asynccontextmanager, AbstractAsyncContextManager from arq.connections import RedisSettings WD = TypeVar('WD') # worker dependencies P = ParamSpec('P') # worker function parameters R = TypeVar('R') # worker function return type PNext = ParamSpec('PNext') RNext = TypeVar('RNext') @dataclass class Job(Generic[R]): """ Represents a job that has been enqueued, could be deferred, queued, running or finished. """ id: str async def status(self) -> str: ... async def result(self) -> R: ... @dataclass class PendingJob(Generic[WD, P, R]): """ Represents a job that has not been enqueued yet. """ deferred_deps: Callable[[], WD] func: Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]] args: tuple[Any, ...] kwargs: dict[str, Any] _: KW_ONLY timeout: timedelta | int | None = None def __await__(self) -> Generator[None, None, Job]: return self.start().__await__() async def start(self, *, delay_by: timedelta | int | None = None) -> Job: # also other kwargs delay_until, timeout, retry etc. print(f'starting job {self.func.__name__}(args={self.args}, kwargs={self.kwargs}) {delay_by=}') return Job(id='123') def then(self, *on_success: WorkerFunction[WD, PNext, RNext] | PendingJob[WD, PNext, RNext] ) -> PendingJob[WD, PNext, RNext]: """ also takes all kwargs from `start()` TODO - AFAIK there's no way to enforce that `PNext` is `(R,)` - e.g. that the return value of the first function is the input to the second function. The even more complex case is where you have multiple jobs triggering a job, where I'm even more sure full type safety is impossible. I would therefore suggest that subsequent jobs are not allowed to take any arguments, and instead access the results of previous jobs via `FunctionContext` """ ... def start_with(self, *also_start: WorkerFunction[WD, P, R]) -> WorkerFunction[WD, P, R]: # also takes all kwargs from `start()`, I think this can be entirely type safe ... @dataclass class WorkerFunction(Generic[WD, P, R]): deferred_deps: Callable[[], WD] func: Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]] timeout: timedelta | int | None = None def enqueue(self, *args: P.args, **kwargs: P.kwargs) -> PendingJob[WD, P, R]: return PendingJob(self.deferred_deps, self.func, args, kwargs, timeout=self.timeout) async def direct(self, *args: P.args, **kwargs: P.kwargs) -> R: return await self.func(FunctionContext(self.deferred_deps()), *args, **kwargs) @dataclass class FunctionContext(Generic[WD]): """ Context provided to worker functions, contains deps but also a connection, retry count etc. """ deps: WD @asynccontextmanager async def none_lifespan() -> AsyncIterator[None]: yield None Unset = object() class WorkerApp(Generic[WD]): def __init__( self, *, redis_settings: RedisSettings | str = 'redis://localhost', lifespan: Callable[[], AbstractAsyncContextManager[WD]] = none_lifespan, ): self.redis_settings = redis_settings self.lifespan = lifespan() self.deps: WD | Unset = Unset # type: ignore @overload def register(self, func: Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]], /) -> WorkerFunction[WD, P, R]: ... @overload def register( self, *, timeout: timedelta | int ) -> Callable[[Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]]], WorkerFunction[WD, P, R]]: ... def register( self, func: Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]] | None = None, *, timeout: timedelta | int | None = None ) -> WorkerFunction[WD, P, R] | Callable[[Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]]], WorkerFunction[WD, P, R]]: if func is None: return lambda func: WorkerFunction(self._deferred_deps, func) return WorkerFunction(self._deferred_deps, func, timeout=timeout) def _deferred_deps(self) -> WD: if self.deps is Unset: raise RuntimeError('WorkerApp is not started') return self.deps async def startup(self) -> None: self.deps = await self.lifespan.__aenter__() async def shutdown(self, *args) -> None: await self.lifespan.__aexit__(*args) async def __aenter__(self) -> Self: await self.startup() return self async def __aexit__(self, *args: Any) -> None: await self.shutdown(*args) async def run_queued_jobs(self) -> list[Any]: """ Run jobs already in the queue, useful for testing. Returns a list of jobs results. """ ... ```
ddanier commented 2 months ago

Great to hear all of that. I would very much like arq to actually get better and better. We are using arq in many of our projects.

One additional thing might be to allow some kind of locking for the jobs. I could image this to work like the ressource_group in GitLab (see https://docs.gitlab.com/ee/ci/resource_groups/) which allows only one job to run for each ressource group. This could allow users to easily ensure no two or more jobs try to change the same thing and by doing so solve many of the race conditions one could have when using async tasks.

samuelcolvin commented 2 months ago

That's very easily done in redis, I guess no reason for us not to add a utility.

tiangolo commented 2 months ago

We'll provide the same or similar method for enqueuing a job via it's name for cases where the worker code is not accessible where jobs are enqueued.

I love this part! That's the main reason I had gone for Celery over RQ and others (many years ago, long before FastAPI). That required weird tricks in how to define the Celery code. Having something that takes this into account first-class sounds amazing.


It would be great if it could also support regular def functions on top of async functions, like Starlette/FastAPI, running them on a thread worker (anyio.to_thread.run_sync()). I think this would simplify migrating sync codebases, or adopting it for mainly sync codebases.

dhruv-ahuja commented 2 months ago

It would be great if it could also support regular def functions on top of async functions, like Starlette/FastAPI, running them on a thread worker (anyio.to_thread.run_sync())

I support this statement heavily. We have a synchronous codebase at work for a current project requirement using Fastapi. We had to go with rq for the queuing solution (Celery was a bit too heavy for our use case), which works great but I would have loved to go for arq to ensure future proofing with async support, but this wasn't possible.

Kludex commented 2 months ago

I would like to see automatic documentation with AsyncAPI.

SlavaSkvortsov commented 2 months ago

Thank you very much for working on arq - it's a great project, we've been using it a lot and I'm happy it's getting some love <3

It would be great to have an out of the box priorities for the jobs. Currently, we're using this solution. We can't adapt the approach with different queues because we need to have the same limited number of concurrent jobs and sometimes execute some urgent jobs ahead of the others

samuelcolvin commented 2 months ago

I think you can run stream consumers on different streams with different priorities, so that should be possible.

rednafi commented 2 months ago

One thing I’d love to see is first party support for a UI to see job status, worker conditions, ability to start or stop enqueued jobs, search jobs, filter jobs by workers, delete jobs, etc.

Maybe FastUI can be used to build a robust solution for that.

A lot of the time, using Celery-like tools is a pain because there’s no well supported first-party UI for monitoring and taking interactive actions. If arq can tackle that status quo, it’ll be awesome.

frankie567 commented 2 months ago

Thank you @samuelcolvin for the renewed efforts on this project πŸ™ As @birkjernstrom mentioned on Twitter, we use Arq as our worker backend for Polar.

One of the aspect I would love to see is a middleware-like feature. Most of the time, we have logic/behaviors to execute before/after a job is run (logging, context management...). In the current version, we have access to on_job_start and on_job_end hooks, but one of the downside is that they don't give access to the task arguments.

Currently, we workaround this by implementing decorators we apply on our worker tasks: https://github.com/polarsource/polar/blob/98f4696e95755e93019b0c657c6b08dff64ea02a/server/polar/worker.py#L150-L189

A middleware approach would be very neat and allow us to wrap any kind of logic without having to touch the tasks themselves, similar to what we can do with web frameworks like Starlette.

samuelcolvin commented 2 months ago

Yup, totally agree - middleware makes lots of sense and should be fairly easy to implement compared to some of the other stuff here.

samuelcolvin commented 2 months ago

@rednafi I agree on dashboards, they're something we're thinking about lots at Pydantic.

epicwhale commented 2 months ago

Really thanks for the renewed interest and roadmap! :)

My Wishlist:

samuelcolvin commented 2 months ago
  • Better IDE / type hinting. I tend to have a bunch of # type: ignore around my arq codebase.

Agreed, I think that's covered above - I think we can even do that for jobs where the worker code is not in the search path of the queuing logic.

  • Unique Jobs (without using keep_result = 0): a straight forward way to prevent queueing a job if it's a) already in the queue or b) under execution. But OKAY to queue it if it's 'completed' or has a result saved in redis. This is useful for the Cron tasks actually (say I have a job that I want to run every 5 mins). Right now I have to use keep_result = 0, as a work around or find a way to clear the job result before it can be queued again. There's also more context on this confusion (and suggested documentation) in Cannot enqueue job with a _job_id again, despite execution of the job being fully completed?Β #432

Makes sense, I think that's usable.

  • An easy way to invoke a job during development via CLI, with easy VS Code debug support. Right now I use jupyter notebooks to trigger a job during development.

But how would you define the arguments? I guess we could either define them via a python file, or as JSON in the command line? Maybe you could create a separate issue to discus this?

  • Starting a Worker without it executing the cron_jobs registered inside it. This is useful during development, where I'm working on a single job - but I don't want the crons to run in the midst of my development / debugging.

Yup, seems pretty easy - I guess we allow you to define separate WorkerApps (see the example above), then run just one of them, with that you could put all your cron jobs in a app and not run that during development.

JonasKs commented 2 months ago

I'm really, really excited about this! Thanks for all the effort throughout the years @samuelcolvin, and to everyone helping out in issues or with PRs.

I think you're pretty spot on with these ideas, in addition to what @frankie567 writes about the middleware. I'd also like to add that while we have queues today, the queue name is not used for e.g. storage of results, causing confusing behavior.

I'm also very much in favor of OpenTelemetry + a dashboard/API (#297), though I don't know if a dashboard nessescarily needs to be shipped with arq.

As for the API - it looks really clean. πŸ‘

Blaise-93 commented 2 months ago

Thank you Colvin for trying to boost strap this project, it is really a great project. I would love to see it have a documentation. Async worker ought not to be feature rich like celery, we need library that is simple and does the work effectively and efficiently.

epicwhale commented 2 months ago

Few more:

KShah707 commented 1 month ago

Would be awesome if you could first skim through the backlog of approved / tested PRs waiting for merge @samuelcolvin. There are some really simple but high-value ones like #378 that would probably help a lot of people.

samuelcolvin commented 1 month ago

There are some really simple but high-value ones like https://github.com/samuelcolvin/arq/pull/378 that would probably help a lot of people.

Done. I'm working through PRs now.

samuelcolvin commented 1 month ago

v0.26.0b1 is released, please try it, I'll release v0.26 at the end of the week, see #441.

iamlikeme commented 1 month ago

Job uniqueness in arq is opt-in, i.e. you opt-in by crafting deterministic job ids. In my team we heavily rely on job uniqueness (in a huge majority of cases we do not want concurrent runs of the same worker function with the same arguments) - to the extent that we wanted it to be opt-out. We accomplish this by generating default job_ids, something along these lines:

@worker_app.register(
    job_id="foo:{a}"  # template string, may refer to arguments passed to the worker function
)
async def foo(ctx: FunctionContext, a: str, b: int) -> int:
    ...

I think this is simple and generally useful. I'm not sure if this is the right thread for this sort of feature requests, so sorry if that is not appropriate here.

[@JonasKs ] I'm also very much in favor of OpenTelemetry + a dashboard/API (https://github.com/samuelcolvin/arq/issues/297), though I don't know if a dashboard nessescarily needs to be shipped with arq.

I wholeheartedly agree. I've once made an attempt to make a generic dashboard for arq and I found it problematic that pickle is used as the default serializer, because unpickling is reliable only in an environment where the same version of arq is installed. If default serialization was e.g. to JSON it would be much easier to write a dashboard that can be shipped separately from arq.

cmangla commented 1 month ago

@samuelcolvin Many thanks for creating arq and for leading the project over the years.

I am glad to see the wide-ranging uses arq finds and your plan for new features and refinements. However, I do hope that arq remains relatively light-weight and easy to get started with. I used it for the first time during an internship, when I was short on time. The documentation for arq is concise and thorough, and it has few moving parts. I hope that we can, broadly, maintain these qualities.

cmangla commented 1 month ago

In summary I want to significantly refactor the internals

I have an offhand suggestion. In cPython 3.13, we will get a Python API for "Multiple Interpreters" as per PEP 734. This will allow us to launch interpreters in separate OS threads, in a single OS process. The interpreters don't share a GIL, so that gives us multi-core parallelism. It should be possible for arq to use this facility to launch multiple workers, each in a separate interpreter OS thread, in a single process. Worker would then have the same multi-core parallelism that they otherwise have in separate processes. The advantage would be memory saving. For example, in an ML application, if each worker needs to pre-precess the same dataset and keep it resident in memory, that could be a common global object between workers on the same machine.

We should separate the Redis logic to make it easier to provide alternative backends, an in memory backend for testing would be especially useful for unit testing applications.

Another offhand suggestion. For the testing backend, we could use PEP 734 to launch a worker and job-submitter in the same process in separate interpreters, then use the inter-interpreter sharing facilities of PEP 734 for a rudimentary in-memory backend.

waza-ari commented 2 weeks ago

First of all, thanks for the project and dedication! It's taken for granted quite often, but it takes a lot of time to maintain OS projects.

One suggestion would be custom exception handlers, similar to what FastAPI offers. We're exclusively using structlog for JSON based log output, and custom error handlers would help in:

JonasKs commented 2 weeks ago

We should also try to make health checks to be k8s-friendly. The health check for the worker should probably only check whether the worker can connect to the backend and query the task queue, and not be bound to tasks being executed.

Today, a health check is recorded only if:

This setup fails if:

We found this behavior this week, where our setup is:

espdev commented 2 weeks ago

@samuelcolvin Thank you

v0.26.0b1 is released, please try it, I'll release v0.26 at the end of the week, see #441.

What are the plans for the release?

samuelcolvin commented 2 weeks ago

Oh 🀦 , I'll do the release today.

So sorry, all I can say is, it's been a busy few weeks.

Wh1isper commented 1 week ago

Having just discovered this very good project! Thank you!

In my scenario, I would prioritize the following two needs:

Are they in the refactoring plan? Maybe I can try to contribute some pr without interfering refactoring

gaby commented 1 week ago

How does this compare to something like FastStream (formerly Propan) ?

https://github.com/airtai/faststream

espdev commented 1 week ago

@gaby

They are different tools for different purposes and applications.

Graeme22 commented 4 days ago

Another thing that would be nice to have: cron jobs that accept typical cron patterns (eg "*/5", "9-13"), as the sets are a bit repetitive.

evgenii-moriakhin commented 10 hours ago

Very excited that you want to improve this library. For asynchronous python, such simple task execution libraries are not enough (and you can't deploy giants like Temporal in any project)