mara / mara-pipelines

A lightweight opinionated ETL framework, halfway between plain scripts and Apache Airflow
MIT License
2.07k stars 100 forks source link

Multiple root pipelines or pipelines collections #57

Open ghost opened 3 years ago

ghost commented 3 years ago

By design, there is one root pipeline where all other pipelines are added to. This might make sence when you just have one pipeline which just does one task, but I have several pipelines which I don't want to execute together. E.g. a pipeline for a daily refresh with several incremental refreshes, a pipeline to execute a complete full load, a pipeline running at a specific time to refresh on demand specific data areas etc.

I came up with the following ideas how this could be solved:

Has someone other ideas? Is there maybe a common way how to solve this I am not aware of?

jankatins commented 3 years ago

I've added an extra runner which looks to certain lables in the pipeline when executing:

        # It's after 04:00 to give the load jobs in DV a bit of time to download everything...
        labels={'run_mode': 'once_a_day', # other values: always, manual
                'run_after': '04:00:01',
                'run-nightly-stg': True}
    )

This is the runner:

import sys
import os

import click
import datetime

# data_integration_run -> the "head" pipeline which is started, e.g. this runner always starts the root pipeline
# data_integration_node_run -> individual pipelines/nodes which are run
# node_path -> array with path elements from the root pipeline. E.g. "[]" is the root pipeline,
#              "['operational_system']" for OS

__last_runtime_query = f'''
SELECT
  max(end_time)
FROM data_integration_node_run
WHERE succeeded
and node_path =  ARRAY [{"%s"}];
'''

__already_running_processes = '''
SELECT run_id, start_time, node_path
FROM data_integration_run
WHERE end_time IS NULL
  -- "self healing" in case a job is not closed. But long enough to not start
  -- if a legitimate run is still running
  AND start_time > now() - INTERVAL '12 hours'
ORDER BY start_time DESC
LIMIT 1
'''

def __debug(msg: str):
    if False or os.environ.get('DEBUG'):
        print(msg)

def _should_run_once_a_day(pipeline_id: str, run_after: str, start_ts: datetime.datetime):
    from mara_db.postgresql import postgres_cursor_context
    # We only look at today, if a pipeline was not successfully running yesterday,
    # this is ignored after midnight UTC
    run_after_time = datetime.time.fromisoformat(run_after).replace(tzinfo=datetime.timezone.utc)
    run_after_ts = datetime.datetime.combine(start_ts.date(), run_after_time)
    # No need to check anything if are not after the run_after time
    __debug(f'  start_ts:     {start_ts}')
    __debug(f'  run_after_ts: {run_after_ts}')
    if start_ts < run_after_ts:
        __debug(f'  not yet time -> should NOT run')
        return False

    # We might run, but should check if we already ran
    with postgres_cursor_context('mara') as cursor:  # type: psycopg2.extensions.cursor
        cursor.execute(__last_runtime_query, (pipeline_id,))
        last_run_ts = cursor.fetchone()[0]
        __debug(f'  last_run_ts:  {last_run_ts}')
        if last_run_ts is None:
            # we've never run the pipeline
            __debug(f'  never run -> should run')
            return True
        if last_run_ts > run_after_ts:
            # we already did a successful run
            __debug(f'  already ran today -> should NOT run')
            return False
    # no sucsessful run after the run_after time -> do a run
    __debug(f'  did not run yet -> should run')
    return True

def _ensure_no_other_running_etl():
    from mara_db.postgresql import postgres_cursor_context

    # We might run, but should check if we already ran
    with postgres_cursor_context('mara') as cursor:  # type: psycopg2.extensions.cursor
        cursor.execute(__already_running_processes)
        row = cursor.fetchone()
        if row is None:
            # no other ETL running
            return True
        else:
            import app.slack
            run_id, start_ts, node_path = row
            info = f'run_id: {run_id}, start_ts: {start_ts}, node_path: {node_path}'
            msg = f"Found already running ETL, aborting: ({info})"
            print(msg)
            app.slack.notify_slack_info(msg)
            # exit with a success number, one slack message is enough
            sys.exit(0)

@click.command()
@click.option('--overwrite-once-a-day', default=False, is_flag=True,
              help='Include all "once_a_day" pipelines in the run')
@click.option('--overwrite-manual', default=False, is_flag=True,
              help='Include all "manual" pipelines in the run')
@click.option('--overwrite-already-running-etl', default=False, is_flag=True,
              help="Don't check and abort, if an ETL is already running")
def run_root_pipeline(overwrite_once_a_day: bool, overwrite_manual: bool, overwrite_already_running_etl: bool):
    """Runs configured nodes in the root pipeline"""
    from mara_pipelines.ui.cli import run_pipeline
    from mara_pipelines import config, pipelines

    __debug(f'overwrite_once_a_day:          {overwrite_once_a_day}')
    __debug(f'overwrite_manual:              {overwrite_manual}')
    __debug(f'overwrite_already_running_etl: {overwrite_already_running_etl}')

    # we always take the root pipeline here but remove nodes which only need to run once a day after
    # a configured timestamp in UTCs

    # A pipline can have certain labels:
    # 'run_mode':'once_a_day', or 'manual', or 'always' (= default)
    # 'run_after':'01:00:00' -> if 'run_mode == 'once_a_day', then run it once after this time
    #                           time is in UTC

    # We use the start of the overall run, not the start of the individual pipeline
    start_ts = datetime.datetime.now(tz=datetime.timezone.utc).replace(microsecond=0)
    os.environ['PGAPPNAME'] = f'mara_etl_framework__{start_ts.isoformat()}'

    if not overwrite_already_running_etl:
        _ensure_no_other_running_etl()

    root = config.root_pipeline()

    # a list of nodes (= pipelines) to run selectively in the pipeline
    _nodes = set()

    # Check which nodes should be run
    for node in root.nodes.values():
        __debug(f'Node: {node.id}')
        if not isinstance(node, pipelines.Pipeline):
            # for now we would just run it...
            __debug(f'  Not pipeline -> running')
            _nodes.add(node)
            continue
        run_mode = node.labels.get('run_mode', 'always')
        if run_mode == 'always':
            __debug(f'  mode: always -> running')
            _nodes.add(node)
            continue
        elif run_mode == 'once_a_day':
            if overwrite_once_a_day:
                __debug(f'  mode: once_a_day + overwrite -> running')
                _nodes.add(node)
                continue
            run_after = node.labels.get('run_after', '00:00:00')
            if _should_run_once_a_day(str(node.id), run_after, start_ts):
                __debug(f'  mode: once_a_day + time -> running')
                _nodes.add(node)
            else:
                __debug(f'  mode: once_a_day + already_run_today -> NOT running')
                print(f"NOT running pipeline {node.id} (already ran today).")
            continue
        else:
            # this assumes that anything else is manual mode
            if overwrite_manual:
                __debug(f'  mode: manual + overwrite -> running')
                _nodes.add(node)
            else:
                __debug(f'  mode: manual -> NOT running')
                print(f"NOT running pipeline {node.id} (set to manual).")
            continue
    __debug(str([n.id for n in _nodes]))
    if not run_pipeline(root, _nodes, False):
        sys.exit(-1)

@click.command()
@click.option('--path', default='',
              help='The parent ids of of the pipeline to run, separated by comma. Example: "pipeline-id,sub-pipeline-id".')
@click.option('--nodes',
              help='IDs of sub-nodes of the pipeline to run, separated by comma. When provided, then only these nodes are run. Example: "do-this, do-that".')
@click.option('--with-upstreams', default=False, is_flag=True,
              help='Also run all upstreams of --nodes within the pipeline.')
@click.option('--only-with-label', default='',
              help='Only execute if a label of that name is present in the pipeline labels and the value truish.')
@click.option('--overwrite-already-running-etl', default=False, is_flag=True,
              help="Don't check and abort, if an ETL is already running")
def run(path, nodes, with_upstreams: bool, only_with_label: bool, overwrite_already_running_etl: bool):
    """Runs a pipeline or a sub-set of its nodes"""
    # copied from mara_pipelines/ui/cli.py with the addition of checking for already running ETLs and the
    # possibility to overwrite
    from mara_pipelines.ui.cli import run_pipeline
    from mara_pipelines import pipelines

    # --- ADDED: START ---- # compared to the upstream run command
    # Make sure we can identify this
    start_ts = datetime.datetime.now(tz=datetime.timezone.utc).replace(microsecond=0)
    os.environ['PGAPPNAME'] = f'mara_etl_framework__{start_ts.isoformat()}'

    if not overwrite_already_running_etl:
        _ensure_no_other_running_etl()
    # --- ADDED: END ----

    # the pipeline to run
    path = path.split(',')
    pipeline, found = pipelines.find_node(path)
    if not found:
        print(f'Pipeline {path} not found', file=sys.stderr)
        sys.exit(-1)
    if not isinstance(pipeline, pipelines.Pipeline):
        print(f'Node {path} is not a pipeline, but a {pipeline.__class__.__name__}', file=sys.stderr)
        sys.exit(-1)

    # a list of nodes to run selectively in the pipeline
    _nodes = set()
    for id in (nodes.split(',') if nodes else []):
        node = pipeline.nodes.get(id)
        if not node:
            print(f'Node "{id}" not found in pipeline {path}', file=sys.stderr)
            sys.exit(-1)
        else:
            _nodes.add(node)

    # --- ADDED: START ----
    if only_with_label:
        if with_upstreams:
            print(f'Cannot handle --only-with-label together with --with-upstreams.', file=sys.stderr)
            sys.exit(-1)

        potential_nodes = _nodes if _nodes else pipeline.nodes.values()
        _nodes = set()
        for n in potential_nodes:
            if isinstance(n, pipelines.Pipeline):
                if n.labels.get(only_with_label, False):
                    _nodes.add(n)
            else:
                _nodes.add(n)
        if not _nodes:
            print(f'No nodes found with label "{only_with_label}".')
            sys.exit(-1)
    __debug(f'pipeline: {pipeline.id}')
    __debug(f'nodes: {str([n.id for n in _nodes])}')
    # --- ADDED: END ----

    if not run_pipeline(pipeline, _nodes, with_upstreams, interactively_started=False):
        sys.exit(-1)

# [Some cli commands omitted, like a command to ensure that the ELT db exist or something to print an env file from the current local config so one can test docker stuff]

One can probably merge run and run run_root_pipeline to get all the commandline flags... If there is interest to get this functionality merged here, I can summit a PR.

ghost commented 3 years ago

I think we should work together to make a PR for this. I build already something similar: https://github.com/mara/mara-pipelines/compare/master...hz-lschick:add-run-label-filter

Instead of starting the main run_pipeline method with a custom _nodes list, I did adjustments directly in the mara_pipeline module. The core differences between our code is:

I certainly agree that my solution - adjusting the mara_pipelines module - has its downsides and I would love to work together to bring a label-filter execution option into the master branch. I would like to get feedback here from @martin-loetzsch what he thinks about this. Maybe we should create a separate issue for that.

But back to the topic: My main reason for this issue was that I would like to have a safer option in mara to manage multiple pipelines without e.g the option that someone can accidental execute a full-load and incremental load by running the root pipeline.

image

I miss here an alternate way instead of using one big root pipeline.