dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.68k stars 1.47k forks source link

Utilities to load jobs, schedules, and sensors #12359

Open cjduffett opened 1 year ago

cjduffett commented 1 year ago

What's the use case?

When creating a @repository or code location Definitions, the load_assets_from_package_module() utility makes it easy to recursively search an entire Python package for all assets defined within:

from my_project import my_assets

my_defs = Definitions(
    assets=load_assets_from_package_module(my_assets),
)

However, no equivalent utilities exist for loading jobs, schedules, or sensors. Instead, we must rely on explicit imports like:

from my_project.my_jobs import a_job, b_job, c_job
from my_project.my_sensors import a_sensor, b_sensor, c_sensor
# etc.

my_defs = Definitions(
    jobs=[a_job, b_job, c_job, ...],
    sensors=[a_sensor, b_sensor, c_sensor],
    # etc.
)

Instead of relying on explicit imports, it would be really helpful to have utilities that can also recursively load jobs, schedules, and sensors from Python packages like:

from my_project import my_assets, my_jobs, my_schedules, my_sensors

my_defs = Definitions(
    assets=load_assets_from_package_module(my_assets),
    jobs=load_jobs_from_package_module(my_jobs),
    schedules=load_schedules_from_package_module(my_schedules),
    sensors=load_sensors_from_package_module(my_sensors),
)

This would allow my team to regularly create new jobs, schedules, and sensors in an existing package without having to:

  1. Update imports in __init__.py files
  2. Make changes to the module where the Definitions are defined

Ideas of implementation

Add 3 new utility functions to the Dagster public API:

Implement similar to load_assets_from_package_module(), but search for job/schedule/sensor definitions instead of asset definitions.

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

yuhan commented 1 year ago

That's a great idea! Will start a RFC PR soon 🫡

illyakaynov commented 1 year ago

Is there any update on this issue?

aaaaahaaaaa commented 1 year ago

Quick workaround for schedules in the meantime:

from types import ModuleType
from typing import Generator, Sequence, Set

from dagster import ScheduleDefinition
from dagster._core.definitions.load_assets_from_modules import _find_modules_in_package

def _find_schedules_in_module(module: ModuleType) -> Generator[ScheduleDefinition, None, None]:
    for attr in dir(module):
        value = getattr(module, attr)
        if isinstance(value, ScheduleDefinition):
            yield value
        elif isinstance(value, list) and all(isinstance(el, ScheduleDefinition) for el in value):
            yield from value

def load_schedules_from_package_module(package_module: ModuleType) -> Sequence[ScheduleDefinition]:
    schedule_ids: Set[int] = set()
    schedules: Sequence[ScheduleDefinition] = []
    for module in _find_modules_in_package(package_module):
        for schedule in _find_schedules_in_module(module):
            if schedule_id := id(schedule) not in schedule_ids:
                schedule_ids.add(schedule_id)
                schedules.append(schedule)
    return schedules
caelan-schneider commented 1 year ago

Here's a utility I wrote that can be extended to any type of Dagster object:

# utils/dagster_import_utils.py

import os
import warnings
from dagster import *

warnings.filterwarnings("ignore", category=ExperimentalWarning)

def _find_instance_in_module(file: str, types: List[type]) -> Dict[str, object]:
    if file.endswith(".py"):
        module_name = file[:-3].replace("/", ".")
        module = importlib.import_module(module_name)
        return {k: v for k, v in module.__dict__.items() if not k.startswith("__") and type(v) in types}

def recursively_find_instances(root: str, types: Sequence[type]) -> Sequence[object]:
    res = {}

    def recurse(path: str):
        if os.path.isfile(path):
            instances = _find_instance_in_module(path, types)
            if instances:
                nonlocal res
                res = res | instances
            return
        else:
            for child in os.listdir(path):
                if not child.startswith("__"):
                    recurse(f"{path}/{child}")

    recurse(root)
    return list(res.values())

And here's how I use it:

# definitions.py

from dagster import Definitions, AssetsDefinition, ScheduleDefinition, SensorDefinition, JobDefinition
from dagster._core.definitions.partitioned_schedule import UnresolvedPartitionedAssetScheduleDefinition
from dagster_docker import docker_executor
from utils.dagster_import_utils import recursively_find_instances

# pipelines is the base directory where all my assets, schedules, etc. are defined 

dagster_assets = recursively_find_instances(root="pipelines", types=(AssetsDefinition,))
dagster_schedules = recursively_find_instances(root="pipelines", types=(ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition))
dagster_sensors = recursively_find_instances(root="pipelines", types=(SensorDefinition,))
dagster_jobs = recursively_find_instances(root="pipelines", types=(JobDefinition,))

defs = Definitions(
    assets=dagster_assets,
    schedules=dagster_schedules,
    sensors=dagster_sensors,
    jobs=dagster_jobs,
    resources={...},
    executor=docker_executor
)