mage-ai / mage-ai

🧙 Build, run, and manage data pipelines for integrating and transforming data.
https://www.mage.ai
Apache License 2.0
8k stars 776 forks source link

[BUG] Multiple logging issues #5534

Open Rykarix opened 3 weeks ago

Rykarix commented 3 weeks ago

Mage version

Latest

Describe the bug

Issue 1

SERVER_LOGGING_FORMAT=json

does not save logs as json files. It saves them as log files.

Issue 2

The output of these "json logs" is broken. The following is not valid json format:

2024-11-01T14:28:53 {"cpu": 0.4287109375, "cpu_total": 16, "cpu_usage": 0.02679443359375, "memory": 4462.0, "memory_total": 19882.0, "memory_usage": 0.2244241022029977, "pipeline_run_id": 1, "pipeline_schedule_id": 3, "pipeline_uuid": "----", "hostname": "----", "level": "INFO", "message": "Pipeline ---- for run 1 in schedule 3 is alive.", "timestamp": 1730471333.578205, "uuid": "----"}

Issue 2.5

Weirdly enough, some logs shown in the docker-compose terminal show correctly formatted json output

Issue 3

There is no clear method of setting the server_logs folder apart from the pipeline_logs folder.

Issue 4

Errors are not being saved to logs properly.

For instance, I can see in the docker-compose up terminal that

{"timestamp": "2024-11-01 14:28:54,933", "level": "ERROR", "message": "Failed to start PipelineRun(id=1, pipeline_uuid=...

Yet this doesn't appear in logging.

Issue 5

There's no clear instructions, and probably even method, to extend your DX_LOGGER so that we can add our own streaming handlers on building a dockerfile.

To reproduce

Just set ENV SERVER_LOGGING_FORMAT=json and everything

In fact, here's my whole dockerfile:

FROM mageai/mageai:latest
ARG PIP=pip3
COPY ./projects /home/src/projects

RUN ${PIP} install --upgrade pip
RUN ${PIP} install -r /home/src/projects/requirements.txt

ENV DYNAMIC_BLOCKS_VERSION=2
ENV KERNEL_MANAGER=magic
ENV MEMORY_MANAGER_PANDAS_VERSION=2
ENV MEMORY_MANAGER_POLARS_VERSION=2
ENV MEMORY_MANAGER_VERSION=2
ENV VARIABLE_DATA_OUTPUT_META_CACHE=1
ENV REQUIRE_USER_AUTHENTICATION=1
ENV ACTIVE_DIRECTORY_ROLES_MAPPING='{"Mage.Viewer": "Viewer","Mage.Editor": "Editor","Mage.Admin": "Admin","Mage.Owner": "Owner"}'
ENV SERVER_LOGGING_FORMAT=json
ENV SERVER_VERBOSITY=WARNING

COPY ./docker-init.sh /etc/docker-init.sh
RUN chmod +x /etc/docker-init.sh

ENTRYPOINT ["/etc/docker-init.sh"]
#!/usr/bin/env bash
/bin/sh -c "/app/run_app.sh mage start /home/src/projects"

Expected behavior

Issue 1

Logs should be saved as .json not .log

Issue 2

Log outputs should have valid json formatting image

Issue 3

I expect non-pipeline related logging to be saved seperately to pipeline-related logging. For instance, if there's an error about user permissions or if there's a warning about something being insecure - that's related to mage as an application & its deployment. All of these logs should have a proper funnel

Issue 4

I expect all errors and warnings to be logged correctly based on ENV SERVER_VERBOSITY=WARNING

Issue 5

There should be a streamhandler that can connect to various services like elastic search, however since this is somewhat a feature request I'll add that into a seperate ticket

Screenshots

No response

Operating system

See "To reproduce"

Additional context

See "To reproduce"

Rykarix commented 2 weeks ago

Additional resources:

Lookup structlog if you want a library that can handle the json aspect of things for you.

Otherwise just write a file handler for it. I created one similar for another logging project:

# src/logs/formatters/json.py
"""Modified JSON Formatter inspired by Eric Moss.

github: https://github.com/erok77/esdemo
yt logging vid: https://youtu.be/_hhO56anumg

"""

import json
import logging
from datetime import UTC, datetime

def remove_none(d: dict) -> dict:
    """Removes None values from nested object."""
    try:
        if not isinstance(d, dict | list):
            return d
        if isinstance(d, list):
            return [v for v in (remove_none(v) for v in d) if v]
        return {k: v for k, v in ((k, remove_none(v)) for k, v in d.items()) if v}
    except Exception:
        logging.exception("Issue stripping None vals from nested object")

def format_traceback(record: logging.LogRecord) -> list:
    """Converts traceback into JSON-friendly format."""
    structured_traceback = []
    tb = record.exc_info[2]
    while tb:
        frame = tb.tb_frame
        structured_traceback.append(
            {
                "filename": frame.f_code.co_filename,
                "name": frame.f_code.co_name,
                "lineno": tb.tb_lineno,
                "locals": dict(frame.f_locals),
            },
        )
        tb = tb.tb_next
    return structured_traceback

class JsonFormatter(logging.Formatter):
    """JsonFormatter : extends logging.Formatter."""

    def get_exc_fields(self, record: logging.LogRecord) -> dict:
        """Returns execution/exception info for log record."""
        if record.exc_info:
            return {
                "exc_type": record.exc_info[0].__name__,
                "exc_message": str(record.exc_info[1]),
                "stack_trace": format_traceback(record),
            }
        return {"exc_info": record.exc_text}

    def format(
        self,
        record: logging.LogRecord,
        *args: str,
        **kw: dict,
    ) -> str:
        """Takes log record and returns JSON Structured Logging Format."""
        json_log_object = {
            "@timestamp": datetime.now(UTC).isoformat(),
            "error": {
                "stack_trace": self.get_exc_fields(record),
            },
            "event": {
                "created": datetime.fromtimestamp(record.created, tz=UTC).isoformat(),
                "module": record.module,
            },
            "log": {
                "level": record.levelname,
                "logger": record.name,
                "origin": {
                    "file": {
                        "line": record.lineno,
                        "name": record.filename,
                        "fullpath": record.pathname,
                    },
                    "function": record.funcName,
                },
            },
            "message": record.getMessage(),
            "process": {
                "pid": record.process,
                "name": record.name,
                "thread": {
                    "id": record.thread,
                    "name": record.threadName,
                },
            },
        }
        return json.dumps(remove_none(json_log_object), indent=2)

def test_trace_3() -> None:
    """Raise an exception to trace."""
    msg = "An error occurred in level 3"
    raise ValueError(msg)

def test_trace_2() -> None:
    """Raise an exception to trace."""
    try:
        test_trace_3()
    except ValueError as e:
        msg = "An error occurred in level 2"
        raise RuntimeError(msg) from e

def test_trace_1() -> None:
    """Raise an exception to trace."""
    try:
        test_trace_2()
    except RuntimeError:
        logger.exception("An error occurred in level 1")

if __name__ == "__main__":
    logger = logging.getLogger("TestLogger")
    handler = logging.StreamHandler()
    handler.setFormatter(JsonFormatter())
    logger.addHandler(handler)
    logger.setLevel(logging.ERROR)

    test_trace_1()