open-telemetry / opentelemetry-python

OpenTelemetry Python API and SDK
https://opentelemetry.io
Apache License 2.0
1.73k stars 607 forks source link

Clarification: Implementation of custom LogRecordProcessor's #3806

Open ddluke opened 5 months ago

ddluke commented 5 months ago

Hey folks :)

I've been eyeballing this project and the otel specification for months, since sooner or later we want to replace our own telemetry sdks (build on tools like structlog and pydantic) in our distributed cloud MLOps platform with otel. I feel this is the right thing to do.

I know the logs sdk is still not stable, but I have started to do hands on experiments nonetheless, by recreating the otel provided docker compose stack with my own apps to get a feeling on how logs, metrics and traces are defined and utilised by otel. This helps me to already layout the roadmap for migrating to otel and prepare all user stories for this epic.

One thing that we do not want to change when moving to otel, is the way how our structured logs are defined, as that has proven of great value across all components of our platform (restful services, lambda functions, containerised apps, batch processing and training jobs, scheduler environments, realtime inference endpoints, etc.).

We currently model our log messages (or rather the log bodies) using pydantic, and emit them by passing instantiated pydantic models to our logging sdk, somewhat similar to this:

from pydantic import BaseModel

class JobInitialized(BaseModel):
    configuration: dict[str, str]

logger.info(JobInitialized(configuration={"foo": "bar"}))

This way of logging is established across all platform components and has been adopted by all people involved (Data Scientists, Data Engineers, etc.). It enables a very high degree of consistency and allows us to do advanced operations, like sending all of our logs to a data catalogue and query it via sql (to mention one use case).

After some hours of reading the docs, reverse engineering the logs sdk and iterating over my own docker compose stack, I feel like the correct approach to add support for pydantic based log messages, is using a LogRecordProcessor.

The following snipped roughly demonstrates what we might want to do:

import logging

from uuid import uuid4

from opentelemetry._logs import set_logger_provider
from opentelemetry.attributes import BoundedAttributes
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.sdk._logs import LogData
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs import LoggingHandler
from opentelemetry.sdk._logs import LogRecordProcessor
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.resources import Resource
from pydantic import BaseModel

class PydanticLogProcessor(LogRecordProcessor):
    def emit(self, log_data: LogData) -> LogData:
        """open question: is the return type annotation correct? mypy at least does not complain about it"""
        body = log_data.log_record.body
        attributes = log_data.log_record.attributes
        if isinstance(body, BaseModel):
            # serialize the model to a json serializable dict
            log_data.log_record.body = body.model_dump(mode="json")

            if isinstance(attributes, BoundedAttributes):
                # attach additional derived attributes
                updated_attributes = attributes.copy()
                updated_attributes.update({"log.uid": str(uuid4()), "log.type": body.__class__.__name__})
                log_data.log_record.attributes = BoundedAttributes(attributes=updated_attributes)
        return log_data

    def shutdown(self) -> None:
        """open question: clarify usage of shutdown on processors which only emit to the logging pipeline w/o export"""
        pass

    def force_flush(self, timeout_millis: int = 30000) -> None:
        """open question: clarify usage of force_flush on processors which only emit to the logging pipeline w/o export"""
        pass

logger_provider = LoggerProvider(
    resource=Resource.create(attributes={"service.namespace": "foo", "service.name": "bar"})
)
set_logger_provider(logger_provider)
processor_chain = [
    PydanticLogProcessor(),
    BatchLogRecordProcessor(OTLPLogExporter(insecure=True)),
]
for processor in processor_chain:
    logger_provider.add_log_record_processor(processor)
handler = LoggingHandler(level=logging.DEBUG, logger_provider=logger_provider)

# Attach OTLP handler to root logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.addHandler(handler)

class JobInitialized(BaseModel):
    configuration: dict[str, str]

if __name__ == "__main__":
    logger = logging.getLogger("app")
    logger.info(JobInitialized(configuration={"job_name": "foo", "input": "foo/lorem", "output": "foo/ipsum"}))

If I spin up a local otel collector (similar to how it is configured by the logs sample in this repo), I do see an output like I would expect it:

Body: Map({"configuration":{"input":"foo/lorem","job_name":"foo","output":"foo/ipsum"}})
Attributes:
     -> code.filepath: Str(/foo/bar/simple.py)
     -> code.function: Str(<module>)
     -> code.lineno: Int(65)
     -> log.uid: Str(ce25f96f-325c-43b1-9f76-daea04fc54df)
     -> log.type: Str(JobInitialized)

This is great, but I do have some open questions:

Cheers and kind regards πŸš€

lzchen commented 2 months ago

@ddluke

This looks very promising. Apologies for the late response.

is a LogRecordProcessor the right weapon of choice to accomplish this? (as of now, I do not see that logging.Formatters are already supported, and honestly, using a LogRecordProcessor also feels a lot cleaner then messing with the untyped interfaces of formatters πŸ™„ 😬 )

This certainly looks like an acceptable way to achieve this behavior. Another way is to natively support logs that are built on pydantic in the Otel LoggingHandler itself. This can either be added to the already existing LoggingHandler or we can create a new handler every time we want to support other logging formats. The advantage with your proposal is that it is much more modularized and the LogRecordProcessor can be shipped as a separate component. This can also be seen as a "disadvantage" if users would not want to add any more components to their code and just want structured logs to be supported out of the box.

is the return type annotation for above processor defined correctly? (the base LogRecordProcessor does not have a return type annotation at all in it's emit method)

We are working on adding typing to our sdk. The emit method does not return anything. See this as well for a simple example. Processors are usually used to perform some sort of pre/post operation on the telemetry data.

are the shutdown and force_flush methods setup correctly for a log processors whose only purpose is to do some modifications to the log data and emit it to the pipeline?

Yes this looks right to me.

lzchen commented 2 months ago

Related feature request thread on adding structlog handler.