agronholm / apscheduler

Task scheduling library for Python
MIT License
6.3k stars 712 forks source link

Stateful jobs #265

Open agronholm opened 6 years ago

agronholm commented 6 years ago

Certain kinds of jobs need to persist state to be passed to the next run. This obviously limits the concurrency of the job to 1 instance at the time.

fredthomsen commented 6 years ago

I wanted to help out and had this as a need for a project that I was working on as well. If this were to be undertaken a few issues crossed my mind with getting to the job's state from the invoked function and all the solutions that came to mind felt ugly:

alexanderhawl commented 3 years ago

So how to pass the former state to next scheduled job?

agronholm commented 3 years ago

This is an open issue which means that the feature is not done. It has also been marked for the 4.0 milestone, which is where you can expect this to be done.

dariosm commented 3 years ago

I'm stuck with an issue I thought somehow I was causing using the wrong scheduler setup, but it could be related to this one and could be partially answered here.

I guess my use case is very simple: keeping the state of the arguments for the job's target function. I'm trying to create a very simple job like this, which using the default memory backend will successfully print argument id and contents (each time of the count value increased by 1).

# Job's target function
def inc(_obj: Dict):
    print(f"id={id(_obj)}, value={_obj}")
    _obj["count"] += 1

# Scheduler setup with defaults
scheduler = BackgroundScheduler(daemon=True)

# Object argument to pass to the target function
task_args = {"count": 0}

# Create the job
job: Job = scheduler.add_job(
      inc,
      trigger="interval",
      args=[task_args],
      name="test_job",
      **{"seconds": 3},
  )

By just switching the jobstore to mongo I lose the state which I thought it'd be kept by means of the mongodb jobstore:

   def update_job(self, job):
        changes = {
            'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
            'job_state': Binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
        }
        result = self.collection.update_one({'_id': job.id}, {'$set': changes})

Maybe this is by design, and the point to have stateful jobs in v4 as mentioned will cover this use case. But it seems like mongodb implements job updates in the db (which could be restricted to next_run_time, and that would justify the job "statelessness" I'm experiencing in my tests, but update_job pickles and save the entire job state, including job arguments).

rafalkrupinski commented 1 year ago

I've implemented stateful jobs by simply installing a listener on EVENT_JOB_EXECUTED that reschedules the job with a new parameter value. There's also a modify function in Job class, that should work too. All you need is a protocol to pass the last result back to the function. I just use a parameter last_result: Optional[T]