materialsproject / jobflow

jobflow is a library for writing computational workflows.
https://materialsproject.github.io/jobflow
Other
93 stars 24 forks source link

Fix pass manager config for dynamic jobs #502

Closed FabiPi3 closed 6 months ago

FabiPi3 commented 10 months ago

Description & Motivation

I am writing dynamic workflows, basically using the replace feature of jobflow. I have a single starting job, which adds always a new job if some criterion is not yet fulfilled (In principle following the fibonacci example at https://github.com/materialsproject/jobflow/blob/main/examples/fibonacci.py). Now I want to use a manager to execute the workflow (specifially jobflow-remote), and I have to specify some manager_config's. Now I want that every step in my workflow uses different configurations, so I somehow have to specify them.

I know that there are ways to do that, for example set pass_manager_config=True when specifying configs or even use the response_manager_config to specify new configs for replace jobs. But I don't know how to do it for say 5 steps in the workflow, where every step uses different configs. And I find it very implicit and around the corner to specify this all beforehand (and sometimes I don't even know before execution).

What I would like to do is to set this in the job itself. I am not sure whether this breaks one of the basic design rules of jobflow. Here is a stupid example to show this:

@job
def replace_example(a, threshold):
    if a >= threshold:
        return "Finished!"

    new_job = replace_example(a + 1)
    new_job.name = f"replace_job_{a+1}"
    # new config depend on input value 'a'
    new_config = JobConfig(manager_config={"resources": {"abc": a+1}})
    new_job.update_config(config=new_config)
    return Response("Keep going...", replace=new_job)

But this does not work. The reason is, as far as I found out, the last function of src/jobflow/core/job.py, namely pass_manager_config(), which ends with the following lines of code:

# update manager config
for ajob in all_jobs:
    ajob.config.manager_config = deepcopy(manager_config)

This overwrites the manager_config of all new jobs with the old manager_config. Here is where my suggestion comes into play, I would like to change this into:

# update manager config
for ajob in all_jobs:
    ajob.config.manager_config = manager_config | ajob.config.manager_config

Which would instead merge the new config with the old one. Now my idea seems to work.

Tests

While running the tests of jobflow for my new changes, I found an issue in the tests. Consider the following (minimal) example of tests:

from jobflow import Job, Response, JobConfig

def add(a, b=5):
    return a + b

def replace_flow():
    from jobflow import Flow

    job = Job(add, function_args=(1,))
    flow = Flow([job], output=job.output)
    return Response(replace=flow)

def test_1(memory_jobstore):
    manager_config = {"abc": 1}
    pass_config = JobConfig(manager_config=manager_config, pass_manager_config=True)

    test_job = Job(replace_flow, config=pass_config)
    response = test_job.run(memory_jobstore)
    for j in response.replace:
        assert j.config.manager_config == manager_config

def test2(memory_jobstore):
    manager_config = {"abc": 1}
    manager_config2 = {"abc": 2}
    response_config = JobConfig(
        manager_config=manager_config, response_manager_config=manager_config2
    )

    test_job = Job(replace_flow, config=response_config)
    response = test_job.run(memory_jobstore)
    for j in response.replace:
        assert j.config.manager_config == manager_config2

With the old version of overwriting the manager_config, everything worked fine. With my new changes, this happened: Running ONLY ONE of the two tests independently, both work. But if I execute BOTH jobs in a single pytest run, the second job fails.

The reason seems to be in the store_inputs job, which is a trick to make the replace with a flow working. For whatever reason I do not understand, this job has an non-empty manager_config in the second run (which of course was ignored in the first version of the code, but now it leads to an error).

More in detail: looking in the function prepare_replace in src/jobflow/core/job.py line 1319 and adding a print here:

store_output_job = store_inputs(replace.output)
print(store_output_job.config.manager_config)

should always give an empty dict back. But if I run pytest tests/core/test_job.py -k test_job_config I see something in the dictionary:

{}
{}
{'abc': 1}

And I have no idea why. Now the fix I propose here and what worked locally is to remove the @job decorator from the store_inputs function and convert it explicitly into a Job using the same JobConfig. See the changes for that. What do you think?

In any case please provide some feedback on my initial example, or suggest some improvement on how I could achieve my goal.

Checklist

FabiPi3 commented 10 months ago

Maybe @utf or @gpetretto would be interested

codecov[bot] commented 10 months ago

Codecov Report

All modified and coverable lines are covered by tests :white_check_mark:

Project coverage is 99.86%. Comparing base (eda2a65) to head (393e71d). Report is 14 commits behind head on main.

:exclamation: Current head 393e71d differs from pull request most recent head 72081f9. Consider uploading reports for the commit 72081f9 to get more accurate results

Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #502 +/- ## ========================================== + Coverage 99.42% 99.86% +0.44% ========================================== Files 21 20 -1 Lines 1564 1511 -53 Branches 425 414 -11 ========================================== - Hits 1555 1509 -46 + Misses 9 2 -7 ``` | [Files](https://app.codecov.io/gh/materialsproject/jobflow/pull/502?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=materialsproject) | Coverage Δ | | |---|---|---| | [src/jobflow/core/job.py](https://app.codecov.io/gh/materialsproject/jobflow/pull/502?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=materialsproject#diff-c3JjL2pvYmZsb3cvY29yZS9qb2IucHk=) | `100.00% <100.00%> (ø)` | | ... and [7 files with indirect coverage changes](https://app.codecov.io/gh/materialsproject/jobflow/pull/502/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=materialsproject)