dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.24k stars 1.42k forks source link

Implement a default telemetry provider parallel to the default logger #11191

Open mikeatlas opened 1 year ago

mikeatlas commented 1 year ago

What's the use case?

I'd like to have easy access to a globally-defined telemetry (observability metrics) provider in the same way that OpExecutionContext provides loggers to every Op.

While Asset Observations are nice for the data space, they're not flexible enough nor exportable easily to providers such as New Relic, Prometheus, DataDog, and so on for more generic metrics not specifically related to assets.

Ideas of implementation

I'd like to see OTel https://opentelemetry.io/docs/instrumentation/python/ be implemented as a telemetry provider since it makes it easy to plug OTLP backends, for example DataDog: https://docs.datadoghq.com/opentelemetry/otel_tracing/

Additional information

The default metrics provider could be a NoopMetricsProvider such that it doesn't bloat the Context when not used.

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

mikeatlas commented 1 year ago

From Dagster Slack by @markfickett

10:12 AM I hooked up OpenTelemetry trace context propagation in Dagster so that we can get traces of a job's @ops across multiple threads / CPUs. I'm considering making the effort to open-source it, would folks be interested in that?

markfickett commented 1 year ago

Thanks Mike; Form Energy's library is now open sourced.

rob-apella commented 1 year ago

a generic suite of metrics collected and published automatically from the infra level to the op level, would go a really long way in making sure our Dagster deployments are up and running correctly / meeting SLAs. currently, (I am pretty sure) developers have to write application code to set up alerts, which is error prone and bloats the repo.

as a short list of metrics and even implementation, I would point to apache-airflow[statsd] - https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/metrics.html

these are generic enough to cover all job-specific failures or related execution metrics, and clues of infra failures like scheduler delay and heartbeats.

with statsd it's very easy to then publish metrics to Datadog (via DogStatsD) where teams can configure alerts + oncall pages

aaaaahaaaaa commented 8 months ago

Here is our take on this. We prefer the idea of monkeypatching Dagster to support any job and op out of the box.

We target 2 things:

Notes:

import logging
from collections.abc import Callable, Iterator
from functools import wraps
from typing import TYPE_CHECKING, Any, Union

from .utils.telemetry import get_tracer, increment

if TYPE_CHECKING:
    from dagster._core.definitions.decorators.op_decorator import DecoratedOpFunction
    from dagster._core.definitions.op_definition import OpDefinition
    from dagster._core.events import DagsterEvent
    from dagster._core.execution.context.system import PlanOrchestrationContext
    from dagster._core.execution.plan.plan import ExecutionPlan

logger = logging.getLogger(__name__)
tracer = get_tracer()

def _compute_fn_wrapper(self: "OpDefinition") -> Union[Callable, "DecoratedOpFunction"]:
    from dagster._core.definitions.decorators.op_decorator import DecoratedOpFunction

    if isinstance(self._compute_fn, DecoratedOpFunction):
        return DecoratedOpFunction(_instrument_op(self._compute_fn.decorated_fn, op=self))
    return _instrument_op(self._compute_fn, op=self)

def _instrument_op(compute_fn: Callable, op: "OpDefinition") -> Callable:
    @wraps(compute_fn)
    def instrumented_fn(*args: Any, **kwargs: Any) -> Any:
        tags = {"op_name": op.name}
        with tracer.start_as_current_span("compute_fn", attributes=tags):
            logger.debug("Instrumented compute_fn start")
            increment("dagster.compute_fn_start", attributes=tags)
            result = compute_fn(*args, **kwargs)
            increment("dagster.compute_fn_end", attributes=tags)
            logger.debug("Instrumented compute_fn end")
            return result

    return instrumented_fn

def _instrument_job_execution_iterator(wrapped_fn: Callable[..., Iterator["DagsterEvent"]]) -> Callable:
    @wraps(wrapped_fn)
    def instrumented_fn(
        job_context: "PlanOrchestrationContext",
        execution_plan: "ExecutionPlan",
    ) -> Iterator["DagsterEvent"]:
        tags = {
            **job_context.event_tags,
            **{k.replace("dagster/", "job_"): v for k, v in job_context.run_tags.items() if k.startswith("dagster/")},
            **{k: v for k, v in job_context.run_tags.items() if not re.match(r"^(dagster/|\.dagster/)", k)},
        }

        with tracer.start_as_current_span("job_execution_iterator", attributes=tags):
            logger.debug("Instrumented job_execution_iterator start")
            for event in wrapped_fn(job_context, execution_plan):
                tags = {
                    **tags,
                    "asset_key": event.asset_key.to_user_string() if event.asset_key else None,
                    "event_type": event.event_type_value,
                    "partition": event.partition,
                    "step_key": event.step_key,
                    "step_kind": event.step_kind_value,
                }
                increment(
                    f"dagster.{event.event_type.value.lower()}",
                    attributes={k: v for k, v in tags.items() if v is not None},
                )
                yield event
            logger.debug("Instrumented job_execution_iterator end")

    return instrumented_fn

def patch_compute_fn_property() -> None:
    from dagster._core.definitions.op_definition import OpDefinition

    logger.debug("Patching compute_fn property")
    setattr(OpDefinition, "compute_fn", property(_compute_fn_wrapper))

def patch_job_execution_iterator() -> None:
    from unittest.mock import patch

    from dagster._core.execution import api

    logger.debug("Patching job_execution_iterator")
    fn = getattr(api, "job_execution_iterator")
    instrumented_fn = _instrument_job_execution_iterator(fn)
    patch("dagster._core.execution.api.job_execution_iterator", instrumented_fn).start()
    patch("dagster._core.execution.execute_in_process.job_execution_iterator", instrumented_fn).start()

def patch() -> None:
    patch_job_execution_iterator()
    patch_compute_fn_property()

Simply call patch() as early as possible in your program.

Use at your own risk, no doubt this is highly discouraged by the Dagster team.