python-arq / arq

Fast job queuing and RPC in python with asyncio and redis.
https://arq-docs.helpmanual.io/
MIT License
2.18k stars 175 forks source link

Future plan for Arq #437

Open samuelcolvin opened 8 months ago

samuelcolvin commented 8 months ago

Arq was the first real open source project I ever created, back in 2016. That was long before Pydantic, FastAPI, ParamSpec, or even Redis Streams.

I remember a sense of incredulity that I couldn't find an async variant of rq (which I was helping to maintain at the time), surely I wasn't the only person wanting to queue jobs in async code? Apparently at the time I was.

Fast forward eight years, and I'm definitely not the only person trying to queue jobs in an async world.

Hence my incredulity has only grown - there's still no ubiquitous queuing library for async Python, and despite neglect, Arq still seems to work well for lots of people, I've used it in every role I've had since, and for the most part it just works.


That said, Arq needs some love, and since we're now using it at Pydantic, I think we should have the resources to provide that love later this year. This is a rough plan of what I propose to do.

Feedback very welcome from all, but especially @JonasKs and @pydantic-maintainers (who apparently I can't tag here :-()

In summary I want to significantly refactor the internals, and update the API in a backwards compatible way.

1. ParamSpec and type safety 🚧

The most important change we should make is to make Arq typesafe using ParamSpec and Concatenate, I have a partially working demonstration of how this will work below.

We'll provide the same or similar method for enqueuing a job via it's name for cases where the worker code is not accessible where jobs are enqueued.

We should be able to do this while still supporting the current API to make migration as easy as possible.

2. Redis Streams 🚀

The second most important change is to adopt Redis streams which read like they were designed for exactly this application, they mean we can effectively guarantee only one execution while still being resilient to unexpected shutdown (Jobs shutdown during execution will be rerun later).

This should be possible without breaking the current API at all.

3. Avoid sorted set for immediate jobs 🚀

Current Arq is slower than it should be because it uses a sorted set to queue all jobs, the sorted set provides two things:

The idea would be to only use the sorted set for jobs scheduled to run in the future, then use the logic demonstrated by SAQ to take jobs off the sorted set when they're ready to be run and add them to the stream.

Jobs which are enqueued without a delay can be added to the stream immediately, which should significantly improve performance for this very common case.

This should be possible without breaking the current API at all.

4. Avoid polling 🇵🇱

Mostly for latency reasons it would be nice to avoid polling, the idea would be:

5. OpenTelemetry 🔭

Observability is close to our hearts at Pydantic, so it would be nice to have optional support for OpenTelemetry, or perhaps just hook points to implement your own observability.

This should be possible without breaking the current API at all.

6. DAG - Task Dependency Graph 📈

The idea is to allow one or more jobs to be triggered by one or more previous jobs.

See the then() and start_with() methods in the partial implementation below.

This should be possible without breaking the current API at all.

7. CLI, settings and worker.run improvements 🏃

We can mostly just copy uvicorn, we should also remove the very ugly WorkerSettings and configure the worker via simple function.

We should also fix reload logic to use watchfiles.

This can be done such that existing code still works, with or without deprecation warnings.

8. Separate the backend ↔️

We should separate the Redis logic to make it easier to provide alternative backends, an in memory backend for testing would be especially useful for unit testing applications.

This can be done such that existing code still works, with or without deprecation warnings.

9. Better documentation 📖

Documentation should move to mkdocs material and mkdocstrings, and be improved significantly.

10. Moving repo to Pydantic 🏢

To provide the resources for this work, we should move Arq to the Pydantic organization, and the docs to arq.pydantic.dev or similar.

Have I missed anything?


API Sketch

Here's a sketch of how I see the new type-safe API working, together with a partial implementation:

Example Usage:

from __future__ import annotations as _annotations

from dataclasses import dataclass
from typing import AsyncIterator
from contextlib import asynccontextmanager

from arq import FunctionContext, WorkerApp

from httpx import AsyncClient

@dataclass
class MyWorkerDeps:
    """
    Type safe way of defining the dependencies of the worker functions.

    E.g. HTTP client, database connection, settings. etc.
    """
    http_client: AsyncClient

@asynccontextmanager
async def my_worker_lifespan() -> AsyncIterator[MyWorkerDeps]:
    async with AsyncClient() as http_client:
        yield MyWorkerDeps(http_client)

worker_app = WorkerApp(lifespan=my_worker_lifespan)

@worker_app.register
async def foo(ctx: FunctionContext[MyWorkerDeps], url: str) -> int:
    # ctx.deps here is of type MyWorkerDeps, that's enforced by static typing
    # FunctionContext will also provide access to a redis connection, retry count,
    # even results of other jobs etc.
    r = await ctx.deps.http_client.get(url)
    r.raise_for_status()
    print(f'{url}: {r.text[:80]!r}...')
    return len(r.text)

async def main() -> None:
    async with worker_app:
        # these two are equivalent, param spec means the arguments are type safe
        await foo.enqueue('https://microsoft.com').start()
        await foo.enqueue('https://microsoft.com')
        # same, delayed by 10 seconds with 5 second timeout
        await foo.enqueue('https://microsoft.com').start(delay_by=10, timeout=5)

        # call foo directly in the same process
        print('length:', await foo.direct('https://github.com'))

if __name__ == '__main__':
    import asyncio

    asyncio.run(main())
Partial Implementation ```py from __future__ import annotations as _annotations from dataclasses import dataclass, KW_ONLY from datetime import timedelta from typing import ParamSpec, TypeVar, Generic, Concatenate, overload, Self, Any, AsyncIterator, Generator from collections.abc import Awaitable, Callable from contextlib import asynccontextmanager, AbstractAsyncContextManager from arq.connections import RedisSettings WD = TypeVar('WD') # worker dependencies P = ParamSpec('P') # worker function parameters R = TypeVar('R') # worker function return type PNext = ParamSpec('PNext') RNext = TypeVar('RNext') @dataclass class Job(Generic[R]): """ Represents a job that has been enqueued, could be deferred, queued, running or finished. """ id: str async def status(self) -> str: ... async def result(self) -> R: ... @dataclass class PendingJob(Generic[WD, P, R]): """ Represents a job that has not been enqueued yet. """ deferred_deps: Callable[[], WD] func: Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]] args: tuple[Any, ...] kwargs: dict[str, Any] _: KW_ONLY timeout: timedelta | int | None = None def __await__(self) -> Generator[None, None, Job]: return self.start().__await__() async def start(self, *, delay_by: timedelta | int | None = None) -> Job: # also other kwargs delay_until, timeout, retry etc. print(f'starting job {self.func.__name__}(args={self.args}, kwargs={self.kwargs}) {delay_by=}') return Job(id='123') def then(self, *on_success: WorkerFunction[WD, PNext, RNext] | PendingJob[WD, PNext, RNext] ) -> PendingJob[WD, PNext, RNext]: """ also takes all kwargs from `start()` TODO - AFAIK there's no way to enforce that `PNext` is `(R,)` - e.g. that the return value of the first function is the input to the second function. The even more complex case is where you have multiple jobs triggering a job, where I'm even more sure full type safety is impossible. I would therefore suggest that subsequent jobs are not allowed to take any arguments, and instead access the results of previous jobs via `FunctionContext` """ ... def start_with(self, *also_start: WorkerFunction[WD, P, R]) -> WorkerFunction[WD, P, R]: # also takes all kwargs from `start()`, I think this can be entirely type safe ... @dataclass class WorkerFunction(Generic[WD, P, R]): deferred_deps: Callable[[], WD] func: Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]] timeout: timedelta | int | None = None def enqueue(self, *args: P.args, **kwargs: P.kwargs) -> PendingJob[WD, P, R]: return PendingJob(self.deferred_deps, self.func, args, kwargs, timeout=self.timeout) async def direct(self, *args: P.args, **kwargs: P.kwargs) -> R: return await self.func(FunctionContext(self.deferred_deps()), *args, **kwargs) @dataclass class FunctionContext(Generic[WD]): """ Context provided to worker functions, contains deps but also a connection, retry count etc. """ deps: WD @asynccontextmanager async def none_lifespan() -> AsyncIterator[None]: yield None Unset = object() class WorkerApp(Generic[WD]): def __init__( self, *, redis_settings: RedisSettings | str = 'redis://localhost', lifespan: Callable[[], AbstractAsyncContextManager[WD]] = none_lifespan, ): self.redis_settings = redis_settings self.lifespan = lifespan() self.deps: WD | Unset = Unset # type: ignore @overload def register(self, func: Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]], /) -> WorkerFunction[WD, P, R]: ... @overload def register( self, *, timeout: timedelta | int ) -> Callable[[Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]]], WorkerFunction[WD, P, R]]: ... def register( self, func: Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]] | None = None, *, timeout: timedelta | int | None = None ) -> WorkerFunction[WD, P, R] | Callable[[Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]]], WorkerFunction[WD, P, R]]: if func is None: return lambda func: WorkerFunction(self._deferred_deps, func) return WorkerFunction(self._deferred_deps, func, timeout=timeout) def _deferred_deps(self) -> WD: if self.deps is Unset: raise RuntimeError('WorkerApp is not started') return self.deps async def startup(self) -> None: self.deps = await self.lifespan.__aenter__() async def shutdown(self, *args) -> None: await self.lifespan.__aexit__(*args) async def __aenter__(self) -> Self: await self.startup() return self async def __aexit__(self, *args: Any) -> None: await self.shutdown(*args) async def run_queued_jobs(self) -> list[Any]: """ Run jobs already in the queue, useful for testing. Returns a list of jobs results. """ ... ```
ddanier commented 8 months ago

Great to hear all of that. I would very much like arq to actually get better and better. We are using arq in many of our projects.

One additional thing might be to allow some kind of locking for the jobs. I could image this to work like the ressource_group in GitLab (see https://docs.gitlab.com/ee/ci/resource_groups/) which allows only one job to run for each ressource group. This could allow users to easily ensure no two or more jobs try to change the same thing and by doing so solve many of the race conditions one could have when using async tasks.

samuelcolvin commented 8 months ago

That's very easily done in redis, I guess no reason for us not to add a utility.

tiangolo commented 8 months ago

We'll provide the same or similar method for enqueuing a job via it's name for cases where the worker code is not accessible where jobs are enqueued.

I love this part! That's the main reason I had gone for Celery over RQ and others (many years ago, long before FastAPI). That required weird tricks in how to define the Celery code. Having something that takes this into account first-class sounds amazing.


It would be great if it could also support regular def functions on top of async functions, like Starlette/FastAPI, running them on a thread worker (anyio.to_thread.run_sync()). I think this would simplify migrating sync codebases, or adopting it for mainly sync codebases.

dhruv-ahuja commented 8 months ago

It would be great if it could also support regular def functions on top of async functions, like Starlette/FastAPI, running them on a thread worker (anyio.to_thread.run_sync())

I support this statement heavily. We have a synchronous codebase at work for a current project requirement using Fastapi. We had to go with rq for the queuing solution (Celery was a bit too heavy for our use case), which works great but I would have loved to go for arq to ensure future proofing with async support, but this wasn't possible.

Kludex commented 8 months ago

I would like to see automatic documentation with AsyncAPI.

SlavaSkvortsov commented 8 months ago

Thank you very much for working on arq - it's a great project, we've been using it a lot and I'm happy it's getting some love <3

It would be great to have an out of the box priorities for the jobs. Currently, we're using this solution. We can't adapt the approach with different queues because we need to have the same limited number of concurrent jobs and sometimes execute some urgent jobs ahead of the others

samuelcolvin commented 8 months ago

I think you can run stream consumers on different streams with different priorities, so that should be possible.

rednafi commented 8 months ago

One thing I’d love to see is first party support for a UI to see job status, worker conditions, ability to start or stop enqueued jobs, search jobs, filter jobs by workers, delete jobs, etc.

Maybe FastUI can be used to build a robust solution for that.

A lot of the time, using Celery-like tools is a pain because there’s no well supported first-party UI for monitoring and taking interactive actions. If arq can tackle that status quo, it’ll be awesome.

frankie567 commented 8 months ago

Thank you @samuelcolvin for the renewed efforts on this project 🙏 As @birkjernstrom mentioned on Twitter, we use Arq as our worker backend for Polar.

One of the aspect I would love to see is a middleware-like feature. Most of the time, we have logic/behaviors to execute before/after a job is run (logging, context management...). In the current version, we have access to on_job_start and on_job_end hooks, but one of the downside is that they don't give access to the task arguments.

Currently, we workaround this by implementing decorators we apply on our worker tasks: https://github.com/polarsource/polar/blob/98f4696e95755e93019b0c657c6b08dff64ea02a/server/polar/worker.py#L150-L189

A middleware approach would be very neat and allow us to wrap any kind of logic without having to touch the tasks themselves, similar to what we can do with web frameworks like Starlette.

samuelcolvin commented 8 months ago

Yup, totally agree - middleware makes lots of sense and should be fairly easy to implement compared to some of the other stuff here.

samuelcolvin commented 8 months ago

@rednafi I agree on dashboards, they're something we're thinking about lots at Pydantic.

epicwhale commented 8 months ago

Really thanks for the renewed interest and roadmap! :)

My Wishlist:

samuelcolvin commented 8 months ago
  • Better IDE / type hinting. I tend to have a bunch of # type: ignore around my arq codebase.

Agreed, I think that's covered above - I think we can even do that for jobs where the worker code is not in the search path of the queuing logic.

  • Unique Jobs (without using keep_result = 0): a straight forward way to prevent queueing a job if it's a) already in the queue or b) under execution. But OKAY to queue it if it's 'completed' or has a result saved in redis. This is useful for the Cron tasks actually (say I have a job that I want to run every 5 mins). Right now I have to use keep_result = 0, as a work around or find a way to clear the job result before it can be queued again. There's also more context on this confusion (and suggested documentation) in Cannot enqueue job with a _job_id again, despite execution of the job being fully completed? #432

Makes sense, I think that's usable.

  • An easy way to invoke a job during development via CLI, with easy VS Code debug support. Right now I use jupyter notebooks to trigger a job during development.

But how would you define the arguments? I guess we could either define them via a python file, or as JSON in the command line? Maybe you could create a separate issue to discus this?

  • Starting a Worker without it executing the cron_jobs registered inside it. This is useful during development, where I'm working on a single job - but I don't want the crons to run in the midst of my development / debugging.

Yup, seems pretty easy - I guess we allow you to define separate WorkerApps (see the example above), then run just one of them, with that you could put all your cron jobs in a app and not run that during development.

JonasKs commented 8 months ago

I'm really, really excited about this! Thanks for all the effort throughout the years @samuelcolvin, and to everyone helping out in issues or with PRs.

I think you're pretty spot on with these ideas, in addition to what @frankie567 writes about the middleware. I'd also like to add that while we have queues today, the queue name is not used for e.g. storage of results, causing confusing behavior.

I'm also very much in favor of OpenTelemetry + a dashboard/API (#297), though I don't know if a dashboard nessescarily needs to be shipped with arq.

As for the API - it looks really clean. 👏

Blaise-93 commented 8 months ago

Thank you Colvin for trying to boost strap this project, it is really a great project. I would love to see it have a documentation. Async worker ought not to be feature rich like celery, we need library that is simple and does the work effectively and efficiently.

epicwhale commented 8 months ago

Few more:

KShah707 commented 8 months ago

Would be awesome if you could first skim through the backlog of approved / tested PRs waiting for merge @samuelcolvin. There are some really simple but high-value ones like #378 that would probably help a lot of people.

samuelcolvin commented 8 months ago

There are some really simple but high-value ones like https://github.com/samuelcolvin/arq/pull/378 that would probably help a lot of people.

Done. I'm working through PRs now.

samuelcolvin commented 8 months ago

v0.26.0b1 is released, please try it, I'll release v0.26 at the end of the week, see #441.

iamlikeme commented 8 months ago

Job uniqueness in arq is opt-in, i.e. you opt-in by crafting deterministic job ids. In my team we heavily rely on job uniqueness (in a huge majority of cases we do not want concurrent runs of the same worker function with the same arguments) - to the extent that we wanted it to be opt-out. We accomplish this by generating default job_ids, something along these lines:

@worker_app.register(
    job_id="foo:{a}"  # template string, may refer to arguments passed to the worker function
)
async def foo(ctx: FunctionContext, a: str, b: int) -> int:
    ...

I think this is simple and generally useful. I'm not sure if this is the right thread for this sort of feature requests, so sorry if that is not appropriate here.

[@JonasKs ] I'm also very much in favor of OpenTelemetry + a dashboard/API (https://github.com/samuelcolvin/arq/issues/297), though I don't know if a dashboard nessescarily needs to be shipped with arq.

I wholeheartedly agree. I've once made an attempt to make a generic dashboard for arq and I found it problematic that pickle is used as the default serializer, because unpickling is reliable only in an environment where the same version of arq is installed. If default serialization was e.g. to JSON it would be much easier to write a dashboard that can be shipped separately from arq.

cmangla commented 8 months ago

@samuelcolvin Many thanks for creating arq and for leading the project over the years.

I am glad to see the wide-ranging uses arq finds and your plan for new features and refinements. However, I do hope that arq remains relatively light-weight and easy to get started with. I used it for the first time during an internship, when I was short on time. The documentation for arq is concise and thorough, and it has few moving parts. I hope that we can, broadly, maintain these qualities.

cmangla commented 7 months ago

In summary I want to significantly refactor the internals

I have an offhand suggestion. In cPython 3.13, we will get a Python API for "Multiple Interpreters" as per PEP 734. This will allow us to launch interpreters in separate OS threads, in a single OS process. The interpreters don't share a GIL, so that gives us multi-core parallelism. It should be possible for arq to use this facility to launch multiple workers, each in a separate interpreter OS thread, in a single process. Worker would then have the same multi-core parallelism that they otherwise have in separate processes. The advantage would be memory saving. For example, in an ML application, if each worker needs to pre-precess the same dataset and keep it resident in memory, that could be a common global object between workers on the same machine.

We should separate the Redis logic to make it easier to provide alternative backends, an in memory backend for testing would be especially useful for unit testing applications.

Another offhand suggestion. For the testing backend, we could use PEP 734 to launch a worker and job-submitter in the same process in separate interpreters, then use the inter-interpreter sharing facilities of PEP 734 for a rudimentary in-memory backend.

waza-ari commented 7 months ago

First of all, thanks for the project and dedication! It's taken for granted quite often, but it takes a lot of time to maintain OS projects.

One suggestion would be custom exception handlers, similar to what FastAPI offers. We're exclusively using structlog for JSON based log output, and custom error handlers would help in:

JonasKs commented 7 months ago

We should also try to make health checks to be k8s-friendly. The health check for the worker should probably only check whether the worker can connect to the backend and query the task queue, and not be bound to tasks being executed.

Today, a health check is recorded only if:

This setup fails if:

We found this behavior this week, where our setup is:

espdev commented 7 months ago

@samuelcolvin Thank you

v0.26.0b1 is released, please try it, I'll release v0.26 at the end of the week, see #441.

What are the plans for the release?

samuelcolvin commented 7 months ago

Oh 🤦 , I'll do the release today.

So sorry, all I can say is, it's been a busy few weeks.

Wh1isper commented 6 months ago

Having just discovered this very good project! Thank you!

In my scenario, I would prioritize the following two needs:

Are they in the refactoring plan? Maybe I can try to contribute some pr without interfering refactoring

gaby commented 6 months ago

How does this compare to something like FastStream (formerly Propan) ?

https://github.com/airtai/faststream

espdev commented 6 months ago

@gaby

They are different tools for different purposes and applications.

Graeme22 commented 6 months ago

Another thing that would be nice to have: cron jobs that accept typical cron patterns (eg "*/5", "9-13"), as the sets are a bit repetitive.

evgenii-moriakhin commented 6 months ago

Very excited that you want to improve this library. For asynchronous python, such simple task execution libraries are not enough (and you can't deploy giants like Temporal in any project)

Wh1isper commented 6 months ago

Having just discovered this very good project! Thank you!

In my scenario, I would prioritize the following two needs:

Are they in the refactoring plan? Maybe I can try to contribute some pr without interfering refactoring

Inspired by this project, I developed brq for my own needs based on redis stream(a simple task queue library that can be used with a redis cluster without the watch/pipeline command).

Lancetnik commented 6 months ago

@gaby

They are different tools for different purposes and applications.

  • arq is a library for job queuing to run background and scheduled tasks
  • FastStream is a pub/sub framework to build asynchronous services with event-driven architecture

But now we are talking about

We'll provide the same or similar method for enqueuing a job via it's name for cases where the worker code is not accessible where jobs are enqueued.

I would like to see automatic documentation with AsyncAPI.

Separate the backend

These things are makes the ARQ the same thing with FastStream, so why we should make it? If we are talking about the tool for asynchronous services - we already have the one. If we are talking about tasks framework - arq is a good choice. Should we mixed these cases?

samuelcolvin commented 4 months ago

Quick update on this:

As some of you might have seen, I've moved arq to a new organization python-arq to make it easier and more attractive for those using arq to collaborate on making it better.

In particular @aaazzam and the Prefect team are big users of arq and will be contributing a lot to the improvements I outlined above.

More to come...

Blaise-93 commented 4 months ago

That is a great decision, Colvin.

We pray that arq shall succeed.

On Fri, 19 Jul 2024, 04:54 Samuel Colvin, @.***> wrote:

Quick update on this:

As some of you might have seen, I've moved arq to a new organization python-arq to make it easier and more attractive for those using arq to collaborate on making it better.

In particular @aaazzam https://github.com/aaazzam and the Prefect https://www.prefect.io/ team are big users of arq and will be contributing a lot to the improvements I outlined above.

More to come...

— Reply to this email directly, view it on GitHub https://github.com/python-arq/arq/issues/437#issuecomment-2238065918, or unsubscribe https://github.com/notifications/unsubscribe-auth/AN5K7K2DUFZBBJIFR2UIPODZNCEYTAVCNFSM6AAAAABE2OS3ZWVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDEMZYGA3DKOJRHA . You are receiving this because you commented.Message ID: @.***>

hubert-springbok commented 3 months ago

HI @samuelcolvin , I'm late to the party here, but very glad to see reneved effort on ARQ.

We've decided to use ARQ over a year ago when I realised that running even fairly modest LLM pipelines in sync workers is a lost cause. Here are some of our learnings and hacks that could be incorporated into ARQ. I'd be happy to contribute some time on the side this year.

Priority of CRON jobs

We shot our foot once as follows:

  1. Create an ARQ CRON job that paces some resource consumption, e.g. a quota on LLM tokens
  2. Create some ARQ tasks that consume the resource

The system works nicely until... max_jobs is hit. At that point the cron job would no longer be admitted and so the quota would not be restored, resulting in a deadlock that was quite nasty to debug.

This kind of scenario could be avoided with either of:

We got the named queues variety to work, but later decided on a different approach. I still have the code, tho.

Dependency injection

Using FastAPI we ended up with a bunch of async def get_X for all kinds of resources, many of which are also used in the background worker, e.g. database, redis, validators etc. In order to reuse all this code, I've gutted the FastAPI dependency injection mechanism to support arbitrary functions via a pattern like:

injector = MyCustomInjector()

@injector.inject()
async def background_task(ctx, db: DB = Depends(get_db)):
    ...

We're currently using this solution and I have the code. It's ugly in places (I hacked Body to work for regular ARQ args), but with minor changes to FastAPI and ARQ could become a powerful interface joining the pydantic-FastAPI-ARQ stack together.

Floby commented 2 months ago

Hi there :wave: Thank you for all the work put into Arq already and for this map to an even better job queue.

At my company (Back Market), we're looking to get out of rabbitmq, and therefore more or less celery altogether. With that perspective, my feedback on your points, if i were to pick which has the most chances of getting us to switch to it instead of celery.

OpenTelemetry

We're currently running with datadog for our observability needs, which maintains an integration with celery and rabbitmq. DataDog has support for OpenTelemetry though, and we absolutely need observability on these, most importantly to properly autoscale our workers and monitor our eventual consistency. We're also relying more and more on distirbuted tracing for troubleshooting.

Moving repo to Pydantic

Yours and the Pydantic name are strong markers of quality! it certainly would help to more explicitly showcase they belong in the same universe as Arq.

Avoid Polling

While there's always some polling happening, the default 500ms of the current implementation can become an issue, especially when they compound accross services. I personally don't think that's much of a deal, but I can see the polling become an issue as the number of workers grow.

Separate the backend

especially regarding an in-memory backend, this would allow us to shift left on some tests (or... just have them, because celery doesn't make it easy). However, using testcontainers to spin up a redis instance isn't that much of a hassle either.


Conversly, these are the items I feel wouldn't really make a difference in our adoption

Type Safety

While we run a completely typed stack, we also wrap a bunch our adopted tools in our base library for our services. so in the end we can implement that typing there.

DAG

we already use Airflow for more advanced workflows

worker improvements

As stated above, we'd probably wrap it one way or another, so this doesn't really make a difference for us.


What's the best approach to start contributing towards the goals outlined here? :rocket:

jonashaag commented 1 month ago

A note for people interested in Arq mainly because of its "pessimistic execution" property:

arq v0.16 has what I’m calling “pessimistic execution”: jobs aren’t removed from the queue until they’ve either succeeded or failed. If the worker shuts down, the job will be cancelled immediately and will remain in the queue to be run again when the worker starts up again (or run by another worker which is still running).

It looks like Celery will get something similar in the next release: https://github.com/celery/celery/pull/9213

gaby commented 1 month ago

@samuelcolvin I thought arq was moving into pydantic org, is that still happening?

aviau commented 2 weeks ago

Redis Cluster support would also be a priority for our use cases. We are running ARQ at a pretty big scale at this point, millions of execution per day, and currently the way to scale ARQ for us will be to create multiple redis instances.