GSA / data.gov

Main repository for the data.gov service
https://data.gov
Other
595 stars 92 forks source link

[SPIKE] implement job scheduler in flask app #4683

Closed rshewitt closed 1 week ago

rshewitt commented 5 months ago

User Story

In order to schedule harvest jobs, datagov wants to integrate a task scheduler to our existing flask app

Acceptance Criteria

[ACs should be clearly demoable/verifiable whenever possible. Try specifying them using BDD.]

Manual

Automated

Background

[Any helpful contextual notes or links to artifacts/evidence, if needed]

Security Considerations (required)

[Any security concerns that might be implicated in the change. "None" is OK, just be explicit here!]

Sketch

[Notes or a checklist reflecting our understanding of the selected approach]

jbrown-xentity commented 5 months ago

https://pypi.org/project/Flask-APScheduler/ should be what we use to implement this...

rshewitt commented 5 months ago

sqlachemy job store for apscheduler tables. seems redundant to include our existing job table. looks like custom metadata for jobs and tasks will be supported in v4.0. v4.0 progress track. looks like they have a couple of v4 pre-releases

rshewitt commented 5 months ago

maybe we can store our job results in the return value here?

rshewitt commented 5 months ago

running jobs immediately

rshewitt commented 5 months ago
# a way to start a job now
scheduler_object.get_job(job_id ="my_job_id").modify(next_run_time=datetime.datetime.now())
rshewitt commented 5 months ago

we're anticipating running the harvests as tasks in cloudfoundry. how does flask-apscheduler know when a job is complete if cf is doing the work? do we stream the cf task logs inside the job until the cf task status changes telling flask-apscheduler the task is complete? the executor isn't actually doing the work so what exactly is being multiprocessing/multithreaded?

rshewitt commented 5 months ago

here's a mockup of a flask-apscheduler job...

cf_handler = CFHandler() # cf interface
app = create_app() # flask app
scheduler = create_scheduler(app)

@scheduler.task("interval", id="do_job_1", seconds=60)
def job1(source_data):
    app_guuid = "0192301923"
    cf_handler.start_task( app_guuid, "python harvest.py {source_data}", "test-job" )
    # ^ this will return the submitted task when it's been registered by cf.
    # this job would most likely complete unless told to wait until something happens... 

# keep the job open until the task status has changed to "SUCCEEDED". basic polling.
@scheduler.task("interval", id="do_job_2", seconds=60)
def job2(source_data):
    app_guuid = "0192301923"
    task = cf_handler.start_task( app_guuid, "python harvest.py {source_data}", "test-job" )
    while task["status"] != "SUCCEEDED":
        task = cf_handler.get_task( task["guuid"] )
        sleep(15)

would we want to implement our own scheduling event?

rshewitt commented 5 months ago

implementing a custom executor

jbrown-xentity commented 5 months ago

we're anticipating running the harvests as tasks in cloudfoundry. how does flask-apscheduler know when a job is complete if cf is doing the work? do we stream the cf task logs inside the job until the cf task status changes telling flask-apscheduler the task is complete? the executor isn't actually doing the work so what exactly is being multiprocessing/multithreaded?

It would be on the "job" task processor to send a post request back to the flask API, letting it know that it was done processing a job. After accepting this, other things might occur: email notification, kicking off new jobs, etc. That does require an API route being defined; that should be considered a follow up addition/feature. Tagging @btylerburton for awareness or weighing in...

btylerburton commented 5 months ago

Routes and order subject to change, but it's defined in Steps 23 & 24 here

Screenshot 2024-04-08 at 6 36 55 PM

https://raw.githubusercontent.com/GSA/datagov-harvesting-logic/main/docs/diagrams/mermaid/dest/etl_pipeline-1.svg

rshewitt commented 5 months ago

job processing workflow in apscheduler

what we want to do

harvest_source = HarvestSource( data...) 
harvest_source.get_records_changes()
harvest_source.synchronize_records()

harvest_source.submit_job_complete() # as an example

#...
def submit_job_complete(self, data...):
  requests.post(url, body=data) # post to our flask app

#...meanwhile, in our flask app
@mod.route('/harvest_jobs/result', methods=['POST'])
def harvest_job_result():
    # here's where the job processing workflow from before comes into play...
    # we need to ensure a JobExecutionEvent(EVENT_JOB_EXECUTED, ...) event is dispatched to the proper listener(s).
    # this is where I have pause and things start to breakdown in a way where we're starting to call the things apscheduler is supposed to handle

my conclusion, i want to have apscheduler be responsible for its things. i don't want to call things we're not supposed to call. i could be thinking of this wrong though! any thoughts would be appreciated.

btylerburton commented 5 months ago

this is where I have pause and things start to breakdown in a way where we're starting to call the things apscheduler is supposed to handle

If I'm following your question, the flask app would call the DB here for info that the task has logged. Step 27 & 28 above.

rshewitt commented 5 months ago

apscheduler has a datastore to keep track of tasks, jobs, and schedules. would this not replace our job table in the harvest db?

btylerburton commented 5 months ago

IMO the Job Table needs to exit as persistent storage for historical metrics.

Given the above questions, it feels like a good time to ask exactly what we're planning to use the scheduler for.

I have:

We can delegate simple queuing to a redis instance, but it's worth listing all the things we want appscheduler to be responsible for first before we write it off.

rshewitt commented 5 months ago

apscheduler supports sqlalchemy for persistent storage in postgres. (as an example). i'm not sure what level of persistence this supports though ( e.g. does it retain historic job runs )

rshewitt commented 4 months ago

draft pr. so far i haven't been able to get the scheduler to start on container start in a non-hacky way. at this point, i've been able to get it to work as a healthcheck.

btylerburton commented 4 months ago

we can park this work for the time being since we're not moving forward with apscheduler.