LSSTDESC / gen3_workflow

Development code for a Gen3-based DRP pipeline implementation
BSD 3-Clause "New" or "Revised" License
5 stars 3 forks source link

ParslJob not always updated #59

Closed dladams closed 7 months ago

dladams commented 7 months ago

I recently modified the g3wfpipe app in desc-gen3-prod to process tasks with the following code:

if doProc1:
    logmsg()
    monexpUpdate = False
    statlogmsg('Fetching workflow QG.')
    get_pg()
    if not haveQG:
        statlogmsg("ERROR: Quantum graph not found.")
        sys.exit(1)
    statlogmsg('Starting new workflow')
    tasks = pg.values()
    ntask = len(tasks)
    endpoints = [task for task in tasks if not task.dependencies]
    for task in endpoints:
        task.get_future()
    ndone = 0
    nfail = 0
    remtasks = tasks
    while True:
        newrems = []
        for task in remtasks:
            tstat = task.status
            if tstat in ('succeeded', 'failed'):
                if tstat == 'failed': nfail += 1
                ndone += 1
            else:
                if tstat not in ('pending', 'scheduled', 'running'):
                    logmsg(f"WARNING: Unexpected task status: {tstat}")
                newrems += [task]
        remtasks = newrems
        msg = f"Finished {ndone} of {ntask} tasks."
        if nfail:
            msg += f" {nfail} failed."
        statlogmsg(msg)
        update_monexp()
        if len(remtasks) == 0: break
        time.sleep(10)

I create a list of ParslJob objects, remove them from the list when they reach the 'succeeded' or 'failed' state, and exit when the list is empty. This usually works well but, when the node is very busy, I find a few of my ParlsJob objects are not updated and remain in one of ('pending', 'scheduled', 'running') even the tasks have completed. I see that by looking at the parsl monitor DB or by looking at ParslGraph.df in a new job in the same directory.

Is this a defect in gen3_workflow or is it a feature and I should not use ParslJob objects like this?

Should I instead maintain a list of task instance (job) names and use the each name with ParslGraph to retrieve its status?

Thank you.

jchiang87 commented 7 months ago

That ParslJob.status property is really just meant as a fallback if the info from the parsl monitoring monitoring.db isn't available. It has only been used in anger for small workloads since it accesses the log files to ascertain the status. For large workflows, this isn't likely to scale, and it's intended that the monitoring db would be used for individual job status. In fact, I had been thinking of removing the log file route for the job status because of this scaling issue.

dladams commented 7 months ago

So maybe my problem occurs because the log files don't have the expected format? This might be because I started adding monitoring data to the log files.

But, in any case, you are advising I get the status values from the monitoring DB using ParslGraph rather than ParslJob.

For example, to get the status for the task instance with name tnam:

pg._update_status()
tstats = pg.df.set_index('job_name').status.to_dict()    # or dict(zip(df.job_name, df.status))
tstat = tstats[tnam]

Right?

jchiang87 commented 7 months ago

Yes, something like that would be better than using the ParslJob.status attribute.

dladams commented 7 months ago

I am having problems with the first line in the above status retrieval:

2023-12-06 12:35:05: Starting new workflow
Traceback (most recent call last):
  File "/pscratch/sd/d/dladams/descprod-out/jobs/job001364/./local/desc-gen3-prod/bin/g3wfpipe-run.py", line 264, in 
    pg._update_status()
  File "/pscratch/sd/d/dladams/descprod-out/jobs/job001364/gen3_workflow/python/desc/gen3_workflow/parsl_service.py", line 433, in _update_status
    df = query_workflow(self.config['outputRun'],
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/pscratch/sd/d/dladams/descprod-out/jobs/job001364/gen3_workflow/python/desc/gen3_workflow/query_workflow.py", line 60, in query_workflow
    raise FileNotFoundError(f'workflow {workflow_name}'
FileNotFoundError: workflow u/dladams/ccd/20231206T203431Znot in {db_file}

I am chasing that down but I see that line 61 in query_workflow.py:

                                    'not in {db_file}')

should be replaced with

                                    f' not in {db_file}')

to obtain the intended error message.

dladams commented 7 months ago

I checked and the value for db_file is fine. The indicated workflow is not yet entered in the workflow table. I suppose I need to wait some time after launching tasks (making ParslJob.get_future()) calls for the workflow to appear in the table. I put the call to pg._update_status() inside a try block and wait 10 seconds and then try again if an exception is raised in that method.

The next time I tried, I did not have the problem. The exception was not even raised in my try block.

dladams commented 7 months ago

All working now. I ended up looping over names and checking status with ParslGraph (monitoring DB). When a task reaches exec_done, I then use ParslJob to retrieve succeeded/failed (from the log file) but allow that to fail. The second check is triggered by a flag that I may make a run time option.