Netflix / dispatch

All of the ad-hoc things you're doing to manage incidents today, done for you, and much more!
Apache License 2.0
4.95k stars 488 forks source link

refactor(plugins/aws): error handling, logging, types, and early returns #5117

Closed wssheldon closed 3 weeks ago

kevgliss commented 3 weeks ago

I'm not convinced we are handling issues at the right level. Instead, I think we should let the caller more gracefully handle the thread's failures/exceptions. e.g. in the consume cli function:

import time
from threading import Thread, Event
import logging
import signal

from dispatch.common.utils.cli import install_plugins
from dispatch.project import service as project_service
from dispatch.plugin import service as plugin_service
from dispatch.organization.service import get_all as get_all_organizations
from dispatch.database.core import get_session, get_organization_session

install_plugins()
with get_session() as session:
    organizations = get_all_organizations(db_session=session)

log = logging.getLogger(__name__)

# Replace manager dictionary with an Event
running = Event()
running.set()

workers = []

def _run_consume_with_exception_handling(plugin_slug, organization_slug, project_id, running):
    while running.is_set():
        try:
            _run_consume(plugin_slug, organization_slug, project_id, running)
        except Exception as e:
            log.error(f"Exception in thread for plugin {plugin_slug}: {e}", exc_info=True)
            time.sleep(1)  # Optional: Add a small delay before retrying

def start_worker(plugin_slug, organization_slug, project_id, running):
    t = Thread(
        target=_run_consume_with_exception_handling,
        args=(plugin_slug, organization_slug, project_id, running),
        daemon=True,  # Set thread to daemon
    )
    t.start()
    return t

for organization in organizations:
    with get_organization_session(organization.slug) as session:
        projects = project_service.get_all(db_session=session)
        for project in projects:
            plugins = plugin_service.get_active_instances(
                db_session=session, plugin_type="signal-consumer", project_id=project.id
            )

            if not plugins:
                log.warning(
                    f"No signals consumed. No signal-consumer plugins enabled. Project: {project.name}. Organization: {project.organization.name}"
                )

            for plugin in plugins:
                log.debug(f"Consuming signals for plugin: {plugin.plugin.slug}")
                for _ in range(5):  # TODO add plugin.instance.concurrency
                    worker = start_worker(plugin.plugin.slug, organization.slug, project.id, running)
                    workers.append(worker)

def terminate_processes(signum, frame):
    print("Terminating main process...")
    running.clear()  # stop all threads
    for worker in workers:
        worker.join()

signal.signal(signal.SIGINT, terminate_processes)
signal.signal(signal.SIGTERM, terminate_processes)

# Keep the main thread running
while True:
    if not running.is_set():
        print("Main process terminating.")
        break
    time.sleep(1)
wssheldon commented 3 weeks ago

@kevgliss Agreed will add these improvements as well.

wssheldon commented 3 weeks ago

@kevgliss https://github.com/Netflix/dispatch/pull/5118