DataBiosphere / toil

A scalable, efficient, cross-platform (Linux/macOS) and easy-to-use workflow engine in pure Python.
http://toil.ucsc-cgl.org/.
Apache License 2.0
901 stars 240 forks source link

RuntimeError: This job was passed promise that wasn't yet resolved when it ran. #4587

Closed vaheg55 closed 1 year ago

vaheg55 commented 1 year ago

I'm getting a weird error in my workflow, where a follow-on job is being run before all of its required promises have been fulfilled. This is a minimal example that reproduces the error:

from toil.common import Toil
from toil.job import Job
from toil.fileStores import FileID
import os
os.environ['NUMEXPR_MAX_THREADS'] = '1'
import shutil
import logging
import pprint
import numpy as np
import pandas as pd

SMILES_NEEDED = 10
CONFIGURATIONS_TO_COLLECT = 10
CONFIGURATION_JOBS = 1
TARGET_KEYS = ["EGB"]
WORK_DIR = os.path.join(os.getcwd(), "temp")
SMILES_LIST = [ "FC1C2C3C4C1C4(F)CC23F",
                "CC1C2C3C4C1C4(C)NC23C",
                "CC12NC3(C)C4C(N)C1C2C34",
                "CC12NC3(C)C4C(O)C1C2C34",
                "CC12NC3(C)C4C(F)C1C2C34",
                "CC1C2C3C4C1C4(C)OC23C",
                "CC12OC3(C)C4C(N)C1C2C34",
                "CC12OC3(C)C4C(O)C1C2C34",
                "CC12OC3(C)C4C(F)C1C2C34",
                "CN1C2C3C4C1C4(C)CC23C",
                "CN1C2C3C4C1C4(N)CC23C",
                "CN1C2C3C4C1C4(O)CC23C",
                "CN1C2C3C4C1C4(F)CC23C",
                "CN1C2C3C4C1C4(N)CC23N",
                "CN1C2C3C4C1C4(O)CC23N",
                "CN1C2C3C4C1C4(F)CC23N",
                "CN1C2C3C4C1C4(O)CC23O",
                "CN1C2C3C4C1C4(F)CC23O",
                "CN1C2C3C4C1C4(F)CC23F",
                "CN1C2C3C4C1C4(C)NC23C",
                "CN1C2C3C4C1C4(C)OC23C",
                "CC1C2C3C4N1C4(C)CC23C",
                "CC1C2C3C4N1C4(C)CC23N",
                "CC1C2C3C4N1C4(C)CC23O",
                "CC1C2C3C4N1C4(C)CC23F",
                "CC1C2C3N4C1C4(C)CC23C" ]

def run(dir):

    parser = Job.Runner.getDefaultArgumentParser()
    options = parser.parse_args()
    options.logLevel = "INFO"
    options.clean = "always"
    options.cleanWorkDir = "always"
    options.logFile = os.path.join('toil.log')

    continueLoop = True
    loopCount = 1
    CSS = []

    with Toil(options) as toil:
        jstart = Job.wrapJobFn(start)
        j0 = jstart
        for loopCount in range(2):
            print(loopCount)
            smilesPass = np.random.choice(SMILES_LIST, SMILES_NEEDED)

            j1 = Job.wrapJobFn(stage_configuration_cycle, smilesPass, loopCount)
            j0.addFollowOn(j1)
            j0 = j1

        result = toil.start(jstart)
        print(result)

def start(job, *args, **kwargs):
    job.log('START HERE')
    return 'return start here'

def stage_configuration_cycle(
        job,
        smilesPass,
        loopCount,
):

    j2 = Job.wrapJobFn(stage_sample_new_configurations, smilesPass)

    j3 = Job.wrapJobFn(stage_ensemble_evaluate, j2.rv(), loopCount)

    job.addChild(j2)

    j2.addFollowOn(j3)

    return j3.rv()

def stage_sample_new_configurations(
        job,
        smiles_sampled,
):

    samples_per_job = len(smiles_sampled) // CONFIGURATION_JOBS

    jobs = []
    for i in range(CONFIGURATION_JOBS):
        from_ix = i * samples_per_job
        to_ix = min(from_ix + samples_per_job, len(smiles_sampled))
        j = Job.wrapJobFn(stage_sample_from_smiles_for_configuration_search, smiles_sampled[from_ix:to_ix])
        job.addChild(j)
        jobs += [j]

    j2 = Job.wrapJobFn(stage_merge_files, [j.rv() for j in jobs])
    job.addFollowOn(j2)
    return j2.rv()

def stage_ensemble_evaluate(
        job,
        fID,
        loopCount
):

    job.log(f"DONE {fID}")
    # job.fileStore.exportFile(fID, f'file://{WORK_DIR}/export_{loopCount}.out')

def stage_sample_from_smiles_for_configuration_search(
        job,
        smiles
):

    temp_output_dir = os.path.abspath(job.fileStore.getLocalTempDir())

    output_files = generate_configurations_from_smiles(
        smiles,
        TARGET_KEYS,
        output_dir=temp_output_dir)

    output = os.path.join(temp_output_dir, 'final.h5')

    with open(output, 'a') as output_fh:
        for sFilename in output_files:
            with open(sFilename, 'r') as fh:
                output_fh.write(fh.read())

    newDatasetID = job.fileStore.writeGlobalFile(output)
    return newDatasetID

def stage_merge_files(
        job,
        FileIDs
):

    localFileIDs = [job.fileStore.readGlobalFile(fID) for fID in FileIDs]

    temp_output_dir = os.path.abspath(job.fileStore.getLocalTempDir())

    output = os.path.join(temp_output_dir, 'merged.h5')

    with open(output, 'a') as output_fh:
        for sFilename in localFileIDs:
            with open(sFilename, 'r') as fh:
                output_fh.write(fh.read()+'\n')

    newDatasetID = job.fileStore.writeGlobalFile(output)
    return newDatasetID

def generate_configurations_from_smiles(
        smileStrings,
        target_keys,
        output_dir
):

    output_files = []

    for i, smileStr in enumerate(smileStrings):

        output_file = os.path.join(output_dir, f"{i}_{smileStr}.h5")
        with open(output_file, 'w') as fh:
            fh.write(f'{i}_{smileStr}')
        output_files += [output_file]

    return output_files

if __name__ == '__main__':

    run(WORK_DIR)

This is the error message:

RuntimeError: This job was passed promise files/no-job/file-03d0573598954fba893ed6c35d8cda9a/stream that wasn't yet resolved when it ran. The job 'stage_merge_files' kind-stage_merge_files/instance-ibxxxhwr v0 that fulfills this promise hasn't yet finished. This means that there aren't enough constraints to ensure the current job always runs after 'stage_merge_files' kind-stage_merge_files/instance-ibxxxhwr v0. Consider adding a follow-on indirection between this job and its parent, or adding this job as a child/follow-on of 'stage_merge_files' kind-stage_merge_files/instance-ibxxxhwr v0.

┆Issue is synchronized with this Jira Story ┆Issue Number: TOIL-1395

adamnovak commented 1 year ago

I think the problem is that stage_ensemble_evaluate gets added as a follow-on of stage_sample_new_configurations, and tries to use the return value of stage_sample_new_configurations. But stage_sample_new_configurations returns the return value of a different follow-on of itself, namely stage_merge_files. So once stage_sample_new_configurations runs, Toil sees stage_ensemble_evaluate and stage_merge_files as both equally ready to run, and will arbitrarily pick one to run first. But actually, to work properly, stage_merge_files has to run first.

I think the simplest way to fix this might be to change stage_sample_new_configurations so that stage_merge_files runs as part of its children, and not as a follow-on, since stage_merge_files is conceptually inside stage_sample_new_configurations. So change:

j2 = Job.wrapJobFn(stage_merge_files, [j.rv() for j in jobs])
job.addFollowOn(j2)
return j2.rv()

To something like:

j2 = Job.wrapJobFn(stage_merge_files, [j.rv() for j in jobs])
for j in jobs:
    j.addFollowOn(j2)
job.addChild(j2)
return j2.rv()

Then stage_merge_files is a child of stage_sample_new_configurations, but still constrained to happen after the jobs it is merging, and the return value of stage_sample_new_configurations will be available to its follow-ons.

Does that help?

vaheg55 commented 1 year ago

Yep, I think that fixed it. Thanks.