aiidateam / aiida-workgraph

Efficiently design and manage flexible workflows with AiiDA, featuring an interactive GUI, checkpoints, provenance tracking, and remote execution capabilities.
https://aiida-workgraph.readthedocs.io/en/latest/
MIT License
9 stars 5 forks source link

Add `monitor` task decorator to create tasks that to poll some state #235

Closed superstar54 closed 3 weeks ago

superstar54 commented 4 weeks ago

This PR introduces an monitor task decorator, which can be particularly useful for tasks that need to poll some state (e.g., the presence of a file, the state of another WorkGraph) at given intervals until a success criterion is met.

Possible use cases:

Note: while polling the state, the task will sleep for a given interval (default 1.0 second, can be changed by user), and relinquish control to the WorkGraph engine, so that it can run other tasks.

Example Usage:

Time monitor

A task wati until a given time.

@task.monitor()
def time_monitor(time):
    """Return True if the current time is greater than the given time."""
    return datetime.datetime.now() > time

# The task will wait until 2024-08-16, 10:54
monitor1 = wg.add_task(time_monitor, time=datetime.datetime(2024, 8, 16, 10, 54, 0))
add1 = wg.add_task(add, "add1",x = 1, y = 2)
add1.waiting_on.add(monitor1)

File monitor

Start a task until a file exits.

@task.monitor()
def file_monitor(filename):
    """Check if the file exists."""
    import os
    return os.path.exists(filename)

# The task will wait until the file exists, checking every 60 seconds.
monitor1 = wg.add_task(file_monitor, filepath="/tmp/test.txt", interval=60.0)
add1 = wg.add_task(add, "add1",x = 1, y = 2)
add1.waiting_on.add(monitor1)

Builtin tasks

I created some builtin Tasks: TaskMonitor, TimeMonitor and FileMonitor

task monitor

# wait for task "add2" from WorkGraph "test_task_monitor2", you can also use workgraph_pk for a running WorkGraph.
wg1.add_task("workgraph.task_monitor", workgraph_name="test_task_monitor2", task_name="add2")

time monitor

wg.add_task("workgraph.time_monitor", datetime=datetime.datetime.now() + datetime.timedelta(seconds=10))

file monitor

wg.add_task("workgraph.file_monitor", filepath="/tmp/test.txt")

General awaitable task

I also created an awaitable decorator to allow the user to take full control of the asyncio function.

import asyncio
from aiida_workgraph import WorkGraph, task
from aiida import load_profile

# Load the AiiDA profile
load_profile()

# Define an awaitable task using the new decorator
@task.awaitable()
async def awaitable_func(x, y):
    await asyncio.sleep(0.5)  # Simulate a delay for polling
    return x + y

# Create a WorkGraph and add the awaitable task to it
wg = WorkGraph(name="test_awaitable")
awaitable1 = wg.add_task(awaitable_func, "awaitable_func1", x=1, y=2)

# Run the WorkGraph
wg.run()

About the the asyncio

The awaitable task will let the WorkGraph go to the Waiting state. the task will relinquish control to the asyncio event loop, thus the WorkGraph can run other tasks. However, if there is a long-running calcfunction, the available task will wait for the calcfunction to finish before it can get the control to run the next next step.

TODO

codecov-commenter commented 4 weeks ago

Codecov Report

Attention: Patch coverage is 83.73016% with 41 lines in your changes missing coverage. Please review.

Project coverage is 79.59%. Comparing base (5937b88) to head (b9a77b4). Report is 36 commits behind head on main.

Files Patch % Lines
aiida_workgraph/engine/workgraph.py 65.88% 29 Missing :warning:
aiida_workgraph/decorator.py 65.00% 7 Missing :warning:
aiida_workgraph/executors/monitors.py 84.37% 5 Missing :warning:
Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #235 +/- ## ========================================== + Coverage 75.75% 79.59% +3.83% ========================================== Files 70 65 -5 Lines 4615 4802 +187 ========================================== + Hits 3496 3822 +326 + Misses 1119 980 -139 ``` | [Flag](https://app.codecov.io/gh/aiidateam/aiida-workgraph/pull/235/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=aiidateam) | Coverage Δ | | |---|---|---| | [python-3.11](https://app.codecov.io/gh/aiidateam/aiida-workgraph/pull/235/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=aiidateam) | `79.46% <83.73%> (+3.80%)` | :arrow_up: | | [python-3.12](https://app.codecov.io/gh/aiidateam/aiida-workgraph/pull/235/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=aiidateam) | `79.48% <83.73%> (?)` | | | [python-3.9](https://app.codecov.io/gh/aiidateam/aiida-workgraph/pull/235/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=aiidateam) | `79.52% <83.66%> (+3.78%)` | :arrow_up: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=aiidateam#carryforward-flags-in-the-pull-request-comment) to find out more.

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

superstar54 commented 3 weeks ago

@edan-bainglass , thanks for for the comment!

The Calcjob monitor and the WorkGraph monitor are different in terms of the scope of monitoring.

The Monitor in AiiDA currently works on the Calcjob. It is called within the CalcJob's process, so it can modify its output. This is intra-process monitoring.

The Monitor in WorkGraph is a task. The monitor task can monitor any events (e.g. time, file changes, etc.) as long as there is a API. Of course, it can also monitor the AiiDA processes using AiiDA API. It can fetch the state of other AiiDA processes, and it can kill/pause/play the process, but it can not modify the data (outputs) of the process. This is inter-process monitoring.

One interesting future (current?) task for a monitor would be to trigger other jobs if some condition is met.

This is exactly what the monitor task does.

The trick is how we should keep track of these actions in the provenance.

The provenance is not tracked as the monitor task is not a aiida process, and it can not be a process, otherwise it will store too much unuseful data. I believe we need to create a special data type for the monitor task to store the provenance, because AiiDA provenance requires AiiDA data,

The monitor task is actually implemented using the awatiable feature similar to the awaitable decorator, but we hide the asyncio from the user. The user only needs to write the monitor task as a normal function, and the WorkGraph will take care of the rest.

edan-bainglass commented 3 weeks ago

@superstar54 thanks for clarifying the various parts. Let's discuss next week the submission of other jobs via a monitor. This is quite interesting.

As for the PR, good to merge when you're ready.