To support large numbers of active dependencies, don't poll for conditions.
Add a mechanism to block until a run transitions, for a certain subset of runs. For the existing set of condition types, limiting this to runs of a single job ID is sufficient. We can mostly re-use the existing RunStore.query_live queuing mechanism, with the addition of further filters.
Split the Condition base into two subtypes: RunCondition and PolledCondition. The former will use query_live.
with run_store.query_live(job_id=job_id, args=args, state=state) as queue:
run = await queue.get()
log.info(f"dependency satisfied: {run}")
To support large numbers of active dependencies, don't poll for conditions.
RunStore.query_live
queuing mechanism, with the addition of further filters.Condition
base into two subtypes:RunCondition
andPolledCondition
. The former will usequery_live
.