agronholm / apscheduler

Task scheduling library for Python
MIT License
6.1k stars 698 forks source link

APScheduler 4.0 progress tracking #465

Open agronholm opened 3 years ago

agronholm commented 3 years ago

I'm opening this issue as an easy way to interested parties to track development progress of the next major APScheduler release (v4.0).

Terminology changes in v4.0

The old term of "Job", as it was, is gone, replaced by the following concepts which are closer to the terminology used by Celery:

Also, the term "executor" is now being changed to "worker".

Notice that the terminology may still change before the final release!

Planned major changes

v4.0 is a ground-up redesign that aims to fix all the long-standing flaws found in APScheduler over the years.

Checked boxes are changes that have already been implemented.

Potential extra features I would like to have:

You will notice that I have dropped a number of features from master. Some I may never add back to v4.0, even if requested, but do voice your wishes in this issue (and this issue only – I will summarily close such requests in new tickets). Others have been removed only temporarily to give me space for the redesign.

Features on the chopping block

Being on the chopping block does not mean the feature will be gone forever! It may return in subsequent minor release or even before the 4.0 final release if I deem it feasible to implement on top of the new architecture.

agronholm commented 3 years ago

The master branch is now in a state where both the async and sync schedulers work, albeit with a largely incomplete feature set. Next I will focus on getting the first implementation of shareable data stores, based on asyncpg. I've made some progress on that a while back but got sidetracked by other projects, particularly AnyIO.

codingadvocate commented 3 years ago

Regarding Twisted scheduler on the chopping block for APScheduler v4.

My main OSS project is a multi-process app, that spins up many Twisted reactors in those processes, where several of the sub-processes use APScheduler inside the reactor (https://github.com/opencontentplatform/ocp). What would be a safe replacement scheduler if the twisted version is being removed?

agronholm commented 3 years ago

So you run multiple schedulers? Are you sharing job stores among them?

The main reason I'm thinking of dropping (explicit) Twisted support is because it carries a heavy burden of legacy with it. I will play around with it and see if I can make it work at least with the asyncio reactor. If it can be made to work with a small amount of glue, I will take it off the chopping block.

codingadvocate commented 3 years ago

Yes, it runs multiple instances of the schedulers - with their own independent job stores.

I understand the need for software redesigns, and I'm certainly not pushing back or trying to make more work for you. Just trying to understand what the recommendation would be. Maybe I could fall back to using APS' BackgroundScheduler since I don't spin it up until after the reactors are running? Either way, I saw the note and want to ensure I follow whatever happens on that one.

Either way, thank you for the solid project.

agronholm commented 3 years ago

Are the jobs you run typically asynchronous (returning Deferreds) or synchronous (run in threads)?

codingadvocate commented 3 years ago

The initial setup with creating job definitions is synchronous. Any updates to previous job definitions or newly created jobs (stored/managed in a DB) occur regularly in an asynchronous manner (LoopingCall that returns a Deferred). And all the work with job runtime (execution/management/reporting/cleanup) occurs in non-reactor threads.

agronholm commented 3 years ago

Ok, so it sounds like the actual job target functions are synchronous, correct? Then you would be able to make do with the synchronous scheduler, yes?

codingadvocate commented 3 years ago

If you're saying so, then yes. I defer to your knowledge there. I selected with TwistedScheduler since the user guide choosing-the-right-scheduler section said to do so when building a Twisted application.

I apologize for compounding the response with a question, but it's related. How is the thread pool and thread count handled if I use something other than the TwistedScheduler? Will the job run inside Twisted's thread pool, or inside BackgroundScheduler's thread pool? Do I need to extend both?

Does constructing the BackgroundScheduler with an explicit max_workers count (example below), do anything when it's running inside the Twisted's reactor?

self.scheduler = BackgroundScheduler({ 'apscheduler.executors.default': { 'class': 'apscheduler.executors.pool:ThreadPoolExecutor', 'max_workers': '25' } })

agronholm commented 3 years ago

Will the job run inside Twisted's thread pool, or inside BackgroundScheduler's thread pool? Do I need to extend both?

The sync scheduler (including 3.x's BackgroundScheduler knows nothing about Twisted's thread pool. The Twisted scheduler in 3.x differs from BackgroundScheduler only in that its default executor uses the Twisted reactor's internal thread pool. It doesn't even have async support!

I want to provide first class async support in APScheduler 4.x. If I can do that with Twisted without having to create an entire ecosystem of Twisted specific components, then I'm open to doing that.

agronholm commented 3 years ago

I just added a few items to description:

thedrow commented 3 years ago

What do you think about adding optional OpenTelemetry support?

agronholm commented 3 years ago

I am open to it, but only as soon as their API stabilizes. As it stands, every beta release breaks backward compatibility. I have more important issues to work on. I don't think v4.0 will have OpenTelemetry support but I will consider adding it to a minor update release once they are in GA.

agronholm commented 3 years ago

A lot of progress has been made on the core improvements of v4.0. Vast code refactorings have taken place. The data store system is really taking shape now.

I've added "Failure resilience for persistent data stores" to the task list. It's one of the most frequent deployment issues with APScheduler, so I'm making sure that it's adequately addressed in v4.0.

I'm not sure what to do with the event system. I may rip it out entirely until I can figure out exactly how it should work. I know users will want to know when a job completes or a misfire occurs etc., so it will be implemented in some form at least before the first release.

I will post another comment when I've pushed these changes to the repository.

agronholm commented 3 years ago

I hit a snag with the synchronous version of the scheduler. I tried to use the AnyIO blocking portal system to run background tasks but I had to conclude that it won't work that way. I have an idea for that though.

jykae commented 3 years ago

@agronholm do you have any estimate when 4.0 would be released?

agronholm commented 3 years ago

I had hoped at least for an alpha at this point, but the design problems in the sync version killed the momentum I had. I have not done any significant F/OSS development since. I am still committed to getting 4.0 done, but due to pressure at work I don't think I can work on it before Christmas holidays.

williamwwwww commented 3 years ago

@agronholm How will you make the jobstore can be shared among multiple schedulers?

agronholm commented 3 years ago

@agronholm How will you make the jobstore can be shared among multiple schedulers?

By coordination and notifications shared between schedulers. Notifications are optional but recommended, and without notifications the schedulers will periodically check for due schedules. How all this works is specific to each store implementation.

ahmet2mir commented 3 years ago

Hello @agronholm

Impressive task list and thanks for apscheduler.

By big christmas whish is "locking" (probably the idea of persistent storage)

I use apscheduler on several web nodes each node had some workers.

Today, I inherit scheduler, store etc to add locking.

Instead of using add_job I call queue_job, create an event, everyone wakeup, the first taking the job lock it (using NX with redis + redlock algorithm). When the job pass a certain time, I mark the job as "dead" and our alerting tell us the dead job.

For me it's mandatory that a Task never belong to a worker, the job must be in queue then another worker or himself could process that task.

To achieve it I added in redis (like jobs and running keys) "ready", "locked", "dead", "failed", "done"

I'm a big fan of Sidekiq (and also Faktory)

And I will be very happy with something like

In the "main"

def myfunc(x, y):
    print(x, y)

scheduler = Scheduler(...)
# register myfunc as a valid callable to avoid pickle on func
scheduler.register('myfunc', myfunc)
scheduler.start()

Then in code

# note that myfunc is in string
job = scheduler.queue('myfunc', kwargs={"x": 1, "y": 2})
print(job.status) # ready - no one process it
...
print(job.status) # pending - someone process it
...
print(job.status) # done - success

Why not Celery ?

I don't wan't to setup full celery/flower stuffs, my tasks are simple and I'm a bit lazy to repackage an entire app or split into small libs some line of codes just to allow celery running my code (and also split config, creds etc) I prefer using celery when necessary.

Don't know if I'm clear (not native english)

agronholm commented 3 years ago

@ahmet2mir APScheduler 4.0 already has the proper synchronization mechanisms in place.

What's still missing is the synchronous API. I've come to a realization that I cannot simply copy the async API and remove the async keywords because cancellation isn't going to work with the sync API, and AnyIO's BlockingPortal mechanism (as it is currently) is inadequate for cases where you need to start background tasks. I must address this issue first and then come back to finish the basic APScheduler 4.0 API.

agronholm commented 3 years ago

While 4.0 is being worked on, I've gone back to the 3.x branch for a bit and fixed a number of bugs and other annoyances.

agronholm commented 3 years ago

Tests on async/sync workers (formely: executors) are passing now, but the sync worker tests are strangely slow and I want to get to the bottom of that before moving forward.

agronholm commented 3 years ago

Slowness in worker tests resolved: it was a race condition in which the notification about the newly added job was sent before the listener was in place, causing the data store to wait for the 1 second timeout to expire before checking for new jobs again.

I'll move on to completing the synchronous scheduler code now. I'm also very close to releasing AnyIO v2.1.0 which is a critical dependency for APScheduler 4.

thedrow commented 3 years ago

I can't wait...

agronholm commented 3 years ago

Tests for both sync and async schedulers pass, but the tests run into delays caused by the new schedule/job notifications not working as intended, plus the sync scheduler tests are causing lots of "Task exception was never retrieved" errors outside of the actual tests which I will have to investigate. I'm considering making an alpha release once these issues have been ironed out.

thedrow commented 3 years ago

That would be very helpful.

agronholm commented 3 years ago

After hours of debugging, I finally figured out that I was needlessly creating a new task group in the worker's run() method and overwriting the outer task group as a worker attribute. The odd errors went away after I fixed that.

agronholm commented 3 years ago

I've just pushed a big batch of changes that implement data store sharing on PostgreSQL (via asyncpg) and MongoDB (via motor). There are a lot of rough edges but at least the whole test suite passes now (at least locally – CI seems to have some troubles). In the coming days I'll try to polish the code base to the point where I can at least make an alpha release.

Feel free to try it out, but you'll have to look at the test suite for some idea on how to use it since I haven't updated the docs yet. Also, the database schema will change before the final release (tasks accounting is not currently done) so expect to have to throw out your schedules and jobs.

thedrow commented 3 years ago

Is master now usable?

agronholm commented 3 years ago

Usable in the sense that basic functionality works, but I wouldn't rely on it for anything remotely important.

thedrow commented 3 years ago

jumpstarter is not production-ready yet.

christopherpickering commented 3 years ago

Maybe I missed it in the thread, but when you publish will it have a diff package name on pipy? Thanks!

agronholm commented 3 years ago

It will have the same name, so if you're concerned, pin your apscheduler dependencies to < 4.

agronholm commented 3 years ago

On another note, CI runs now work again. It was a bit of a head scratcher but turns out the culprit for the CI runs freezing was freezegun v1.1.0. I didn't see this locally because v1.1.0 was freshly released and I hadn't updated my dependencies lately. Pinning to v1.0.0 solved the problem.

m3nu commented 3 years ago

We use APScheduler in a Qt app to schedule backups. Works well and I plan to revamp that integration soon. So would be good if QTimer could stay. The 3.x implementation was only a few lines, given that QTimer is quite high level.

Is it still feasible to use QTimer in a Scheduler subclass and would you consider merging a PR for it? Or would APS 4 work just as well in a Qt app?

agronholm commented 3 years ago

It won't be a priority but I will definitely consider this use case.

huangsam commented 3 years ago

Looking forward to the fix for https://github.com/agronholm/apscheduler/issues/285 landing in v4.0 (or some other release) πŸ‘

agronholm commented 3 years ago

It's been a while since the last update. My development time has mostly been spent on improving the AnyIO project, and this work has yielded the much awaited 3.0 release which will also benefit APScheduler.

My next step is to refactor the current postgresql data store into a new SQLAlchemy 1.4 based store which would work not only with postgresql but also with mysql and sqlite. I will also implement task accounting (keeping track of how many running jobs per task there are, and ensuring that the limits are respected).

One big decision ahead is how to support data stores using synchronous APIs. It may not make a lot of sense to have the synchronous scheduler use async behind scenes if the data store is fully synchronous. And supporting only the above database backends would leave a lot of users in the cold.

thedrow commented 3 years ago

You can use a thread pool.

samh194 commented 3 years ago

Hi @agronholm , is there any update on this release or on #256 in particular?

agronholm commented 3 years ago

Hi @agronholm , is there any update on this release or on #256 in particular?

Initial work has landed on job store sharing, but it's lacking critical components:

I've started work on task accounting last week. The idea is that schedules and jobs are linked to "tasks". A "task" contains some configuration parameters like maximum concurrency, statefulness etc., and links to a callable. When you create a schedule or a job, you need to pass to the method either:

  1. the ID of a previously created task
  2. a callable object
  3. a textual reference (modulename:variablename) to a callable (does not need to be importable on the scheduler process if a persistent job store is used)

When a worker acquires a job, the task gets its counter incremented, and when the job is released, the counter gets decremented. The data store will ensure that the total concurrency of a task never goes above the limit.

I'm still working to figure out the exact semantics involved here, like what happens when a worker tries to acquire a task that has all its concurrency slots taken.

agronholm commented 3 years ago

Also, to simplify the implementation of shared data stores, I have decided not to require them to deliver events beyond the current process. This decision could be changed if there is high demand for it.

agronholm commented 3 years ago

Also, I'm now trying out a design where synchronous and asynchronous data stores have separate interfaces, and if you use a synchronous data store with an async scheduler, it will just wrap it with threads. I may have been trying too hard to deduplicate the code between sync/async, but if I make the synchronous scheduler independent of the async scheduler, things might get easier. I'm not sure this will be the final design choice that I'll take but it's something I'm exploring now.

agronholm commented 3 years ago

Tonight I managed to (hopefully) nail down the new event dispatch system which also has tests with 100% coverage. The work on sync/async schedulers and workers is also coming along nicely. I will commit all the new work to master in a big push once it's in a coherent state.

agronholm commented 3 years ago

There is a problem that is rapidly becoming apparent: the majority of shared data stores do not support any mechanism with which to be notified of external changes. This is important for job store sharing to fully work as intended.

Of the job stores implemented so far, PostgreSQL (either directly via asyncpg, or SQLAlchemy) and MongoDB are capable of providing at least some level of notifications. Even it has limitations in its notification mechanism: it can only deliver messages shorter than 8000 bytes, making it impractical for universal event delivery since such a system would have to cope with essentially arbitrary sized events. MongoDB, on the other hand, only supports this when configured to be used as a replica set.

These shortcomings made me consider again the idea of having a "side channel" for broadcasting events. I can think of at least 3 different services which might be suitable for delivering events to all interested parties:

As for cases like PostgreSQL, I'm planning to have the store optionally emit a "limited" event that would wake up the scheduler on an external change. That would allow users to use a shared job store without having to run another service just for getting notifications to work.

agronholm commented 3 years ago

The data store tests are passing again after my latest batch of changes, and the event dispatch system is shaping up really well. I even managed to make the PostgreSQL data store relay its own events through the database server as asynchronous notifications, making it quite suitable for use without an external messaging system. I will certainly tweak the event system further to make sure the whole system is as observable as it reasonably can be.

My goal is also to do away with the PostgreSQL job store (currently in master only) entirely in favor of the async SQLAlchemy store. I would add its notification features to the SQLAlchemy store which would then be used if a compatible driver was detected.

On another note, I decided to rely on attrs over dataclasses. I tried really hard to love dataclasses, but seeing that you cannot have ANY optional fields in superclasses when subclasses have mandatory fields is just a showstopper problem for me (dataclasses cannot force keyword arguments, unlike attrs).

Once I get the rest of the test suite to pass again I will push the changes to Github.

agronholm commented 3 years ago

The entire test suite passes now and I have pushed the latest changes to Github. There is an intermittent failure related to the memory job store in test_remove_job() when being wrapped as an async datastore. I'm tracking down this annoying Heisenbug.

The API currently revolves around context managers, but I'm going to try to refactor it to be more convenient when used in environments that are less context manager friendly.

I didn't get task accounting done in this batch, but it will be my next focus now that the event dispatch system is in a better shape.

EDIT: the race condition has been fully fixed now.

agronholm commented 3 years ago

Just fixed a bunch of bugs and got CI to pass on all supported Python versions and platforms (except for Windows+Py3.10 where psycopg2 won't compile).

agronholm commented 3 years ago

I took a little detour of adding MySQL and SQLite to the testing matrix, and I'm glad I did because it revealed a bunch of problems. MySQL's timestamp columns don't support fractional seconds by default, and I had to refactor the SQLAlchemy data store a bit to add a workaround for this particular vendor (i.e. using a MySQL specific timestamp type that enables fractional seconds) . As for SQLite, it trashed my latest attempt to enable the SQLAlchemy store to forego the operation of marking schedules as "acquired" for a scheduler because it did not honor the row level locking. The context manager based schedule acquisition API might have been problematic for some other data store implementations anyway. But these issues have been sorted out now and the test suite passes again for all data stores. As a side note, MySQL is considerably slower here than the other back-ends and I would not recommend it to anyone.

With that out of the way, I was able to add the first implementation of task accounting. Data stores will now keep track of the number of running jobs per task, and not give workers jobs that would raise the total number for that task above its maximum concurrent job limit. This code will need a lot more tests and polish before I can say it's finished, but the basic functionality is there.

I've also refactored most of the classes to use attrs and all of them to use Python 3.10 style type annotations. I hope this doesn't inconvenience anyone (the library still remains Python 3.7+ compatible).

There are still a couple of unsolved problems with task accounting:

  1. How would the system allow tasks to have callables that aren't addressable as modulename:varname? Lambdas and local functions (functions inside functions) fall into this category. Normally when a task definition is added to the data store, this addition is communicated to other data stores but there is no way to automatically replicate the same task -> function mapping on data stores in other processes. The use case is important for local data stores, however, and it has to share the same API with persistent data stores.
  2. What should happen to jobs when their tasks are updated with different parameters? Should the existing jobs be removed? If the new specification allows fewer concurrent jobs, should we cancel queued jobs? Starting from the oldest or the newest? This is a harder problem than the previous one.
agronholm commented 2 years ago

I've now implemented an event broker system which should allow any persistent data store to be shared safely. The following implementations (in addition to the minimal local-only implementation) are present in the current code:

This should be the last major component that was missing from the v4.0 design. From here on out it's just a matter of implementing the promised features, tinkering with the design and polishing the outcome. The documentation will also be largely rewritten but only after the code has more or less settled down.

I've done some work on persistent data stores, too. Each field now corresponds to one column in SQLAlchemy, or one BSON field in a MongoDB document. I made this change to enable more granular updates and deserialization error tracking. The downside is more frequent schema updates when/if more fields are added.