ihmeuw-scicomp / jobmon

Other
3 stars 6 forks source link

DAG Circularity Validations #107

Closed k-simpson closed 4 months ago

k-simpson commented 5 months ago

Presently there are no validations which assert that a set of tasks in a workflow are not circular.

Either at workflow bind-time or at launch-time, Jobmon should validate that the tasks provided are in fact acyclic.

Python version: 3.10.14

Jobmon versions:

# Name                    Version                   Build  Channel
jobmon                    3.2.4                    pypi_0    pypi
jobmon-core               3.2.5                    pypi_0    pypi
jobmon-installer-ihme     10.6.5                   pypi_0    pypi
jobmon-slurm              1.5.1                    pypi_0    pypi

Repro

import random

import pytest
from jobmon.client.task import Task
from jobmon.client.tool import Tool
from jobmon.client.workflow import Workflow

TOOL = Tool("dag-cycle-test")

"""Functions to create tasks and workflows for testing."""

def create_task(task_number: int, compute_resources: dict) -> Task:
    return TOOL.get_task_template(
        template_name="dummy_task",
        command_template="sleep {task_number}",
        node_args=["task_number"],
        op_args=[],
        task_args=[],
    ).create_task(
        name=f"task_{task_number}",
        compute_resources=compute_resources,
        task_number=task_number,
    )

def create_tasks(num_tasks: int) -> list[Task]:
    errors_dir, output_dir = cluster_tools.get_logs_dirs()

    return [
        create_task(
            task_number=i,
            compute_resources={
                "memory": "1G",
                "cores": 1,
                "runtime": "1m",
                "queue": "all.q",
                "project": "proj_fhseng",
                "stderr": "/tmp/errors",
                "stdout": "/tmp/output",
            },
        )
        for i in range(num_tasks)
    ]

def create_workflow() -> Workflow:
    return TOOL.create_workflow(
        name="test_workflow",
        default_cluster_name="slurm",
        workflow_args=f"dummy_wf_{random.randint(0, 100_000_000)}",
    )

"""Test functions."""

def set_and_verify_dependencies(t0: Task, t1: Task, t2: Task) -> None:
    """Create a cyclic set of dependencies between the tasks and verify that the cycle was
    created.
    """
    # Create a cycle:
    # t0 -> t1 -> t2
    #  ^----------'
    t0.add_downstream(t1)
    t1.add_downstream(t2)
    t2.add_downstream(t0)

    # t2 <- t0 -> t1
    assert t0.upstream_tasks == {t2}
    assert t0.downstream_tasks == {t1}

    # t0 <- t1 -> t2
    assert t1.upstream_tasks == {t0}
    assert t1.downstream_tasks == {t2}

    # t1 <- t2 -> t0
    assert t2.upstream_tasks == {t1}
    assert t2.downstream_tasks == {t0}

@pytest.mark.unit
def test_create_cycle_before_adding_tasks_to_workflow_then_bind() -> None:
    """Exercise a cycle created in the DAG **before** adding the tasks to the workflow.

    Expect workflow.bind() to raise an exception.
    """
    # Create 3 tasks and the workflow
    t0, t1, t2 = create_tasks(3)
    wf = create_workflow()

    # Set the dependencies between the tasks
    set_and_verify_dependencies(t0=t0, t1=t1, t2=t2)

    # Add the tasks to the workflow
    wf.add_tasks([t0, t1, t2])

    # Call bind, expect it to raise an Exception
    with pytest.raises(Exception):
        wf.bind()

@pytest.mark.unit
def test_create_cycle_after_adding_tasks_to_workflow_then_bind() -> None:
    """Exercise a cycle created in the DAG **after** adding the tasks to the workflow.

    Expect workflow.bind() to raise an exception.
    """
    # Create 3 tasks and the workflow
    t0, t1, t2 = create_tasks(3)
    wf = create_workflow()

    # Add the tasks to the workflow
    wf.add_tasks([t0, t1, t2])

    # Set the dependencies between the tasks
    set_and_verify_dependencies(t0=t0, t1=t1, t2=t2)

    # Call bind, expect it to raise an Exception
    with pytest.raises(Exception):
        wf.bind()

@pytest.mark.unit
def test_create_cycle_before_adding_tasks_to_workflow_then_run() -> None:
    """Exercise a cycle created in the DAG **before** adding the tasks to the workflow.

    Expect workflow.run() to raise an exception.
    """
    # Create 3 tasks and the workflow
    t0, t1, t2 = create_tasks(3)
    wf = create_workflow()

    # Set the dependencies between the tasks
    set_and_verify_dependencies(t0=t0, t1=t1, t2=t2)

    # Add the tasks to the workflow
    wf.add_tasks([t0, t1, t2])

    # Call run, expect it to raise an Exception
    with pytest.raises(Exception):
        wf.run()

@pytest.mark.unit
def test_create_cycle_after_adding_tasks_to_workflow_then_run() -> None:
    """Exercise a cycle created in the DAG **after** adding the tasks to the workflow.

    Expect workflow.run() to raise an exception.
    """
    # Create 3 tasks and the workflow
    t0, t1, t2 = create_tasks(3)
    wf = create_workflow()

    # Add the tasks to the workflow & call bind to validate
    wf.add_tasks([t0, t1, t2])

    # Set the dependencies between the tasks
    set_and_verify_dependencies(t0=t0, t1=t1, t2=t2)

    # Call run, expect it to raise an Exception
    with pytest.raises(Exception):
        wf.run()
k-simpson commented 5 months ago

Addressed by https://github.com/ihmeuw-scicomp/jobmon/pull/109