grafana / loki

Like Prometheus, but for logs.
https://grafana.com/loki
GNU Affero General Public License v3.0
23.8k stars 3.43k forks source link

Log Dropping When Tailing Logs with Loki #14550

Open derhnyel opened 1 week ago

derhnyel commented 1 week ago

Describe the bug I am encountering an issue where logs ingested using Fluent Bit are not being tailed correctly in Loki 3.x versions. Logs seem to be dropped or are missing when tailing logs using the gRPC client or log-cli. This issue does not occur in Loki 2.9.10, where all logs are tailed correctly without any missing entries. (even with the --delay-for arg set)

To Reproduce Steps to reproduce the behavior:

  1. Started Loki with versions 3.x (where the issue is present).
  2. Fluent Bit (v3.1.1) is reading Docker logs and sending them to Loki.
  3. Tail logs using one of the following methods: • gRPC client (custom implementation). • Loki CLI with the following query:
    logcli query --tail '{job="logs", appName="mock-service"}' --addr=http://localhost:3200 

Expected behavior All logs ingested into Loki should appear in the tail stream in order and without any dropped or missing entries, even during high log throughput.

Environment: • Loki Version: 3.x (issue present); 2.9.10 (working as expected) • Fluent Bit Version: v3.1.1 • Infrastructure: Docker • Deployment tool: Docker containers (both Fluent Bit and Loki deployed as containers) • Log Source: Docker logs from a custom mock-service Python application • Fluent Bit Configuration (fluent-bit-config.yaml):

service:
  flush: 1
  log_level: info
  storage.max_chunks_up: 128
  parsers_file:  /fluent-bit/etc/parsers.conf
  storage.sync: normal
  storage.backlog.mem_limit: 5M

pipeline:
  inputs:
    - name: tail
      tag: docker_tail.<container_id>
      tag_regex: (?<container_id>[^/]+)-json\.log$
      path: /var/lib/docker/containers/*/*-json.log
      parser: docker
      refresh_interval: 10

  filters:
    - name: lua
      match: docker_tail.*
      script: /fluent-bit/scripts/docker_filter.lua
      call: Enrich

    - name: rewrite_tag
      match: docker_tail.*
      rule: $_metadata_docker_container_name ^(.*)$ docker.$_metadata_docker_container_name.$_metadata_docker_container_id false
      emitter_name: re_emitted_docker

    - name: modify
      match: '*'
      copy:
        - app appName
        - app_name appName
        - _metadata_docker_label_appName appName
        - _metadata_docker_container_name appName
        - _metadata_docker_container_id appName
        - _metadata_file_appName appName

  outputs:
    - name: loki
      match: '*'
      host: loki
      port: 3200
      labels: job=logs, appName=$appName, level=$level
      retry_limit: 20
      storage.total_limit_size: 1000M

• Fluent Bit Dockerfile:

# Fluent Bit Dockerfile (fake.fluentbit.dockerfile)

FROM debian:bookworm-slim as builder

RUN apt-get update && apt-get install -y \
    lua5.4 \
    liblua5.4-dev \
    luarocks \
    build-essential
RUN luarocks install lua-cjson 2.1.0
RUN luarocks install date 2.2.1

FROM fluent/fluent-bit:3.1.1

COPY fluent-bit-config.yaml /fluent-bit/etc/fluent-bit.yaml

COPY parsers.conf /fluent-bit/etc/parsers.conf

COPY docker_filter.lua /fluent-bit/scripts/docker_filter.lua

COPY --from=builder /usr/local/lib/lua /usr/local/lib/lua
COPY --from=builder /usr/local/share/lua /usr/local/share/lua

EXPOSE 2020

CMD ["/fluent-bit/bin/fluent-bit", "-c", "/fluent-bit/etc/fluent-bit.yaml"]

• Fluent Bit (parsers.ini):

[PARSER]
    Name        json
    Format      json

[PARSER]
    Name         docker
    Format       json
    Time_Key     time
    Time_Format  %Y-%m-%dT%H:%M:%S.%L%z
    Time_Keep    On
docker network create loki-network
docker build -t fluent-bit-custom:latest -f fake.fluentbit.dockerfile .
docker run -d --name fluent-bit-custom  --network loki-network  -v /var/lib/docker/containers:/var/lib/docker/containers fluent-bit-custom:latest

• Fluent Bit (docker_filter.lua):

CJSON = require("cjson")
CACHE = {}

local function get_container_name(container_id)
    local metadata = CACHE[container_id]
    if metadata then
        return metadata.docker_container_name
    end

    -- Read config file
    local config_file_path = "/var/lib/docker/containers/" .. container_id .. "/config.v2.json"
    local config_file = io.open(config_file_path, "rb")
    if not config_file then
        return nil
    end
    local config_json = config_file:read("*a")
    config_file:close()

    -- Map json config
    local config = CJSON.decode(config_json)

    -- Get container name and cache it
    local container_name = config.Name:gsub("^/", "")
    CACHE[container_id] = { docker_container_name = container_name }

    return container_name
end

function Enrich(tag, timestamp, record)
    -- Get container id from tag
    local container_id = tag:match("docker_tail%.(.+)")
    if not container_id then
        return 0, timestamp, record
    end

    -- Get container name and enrich record
    local container_name = get_container_name(container_id)
    if container_name then
        record["_metadata_docker_container_name"] = container_name
    end

    return 1, timestamp, record
end

• Loki Configuration in (config/loki-config.yaml):

auth_enabled: false

server:
  http_listen_address: 0.0.0.0
  http_listen_port: 3200
  grpc_listen_address: 0.0.0.0
  grpc_listen_port: 9095
  log_format: json
  log_level:error
  grpc_server_max_recv_msg_size: 16777216
  grpc_server_max_send_msg_size: 16777216

common:
  instance_addr: 127.0.0.1
  path_prefix: /loki
  storage:
    filesystem:
      chunks_directory: /loki/chunks
      rules_directory: /loki/rules
  replication_factor: 1
  ring:
    kvstore:
      store: inmemory

query_range:
  results_cache:
    cache:
      embedded_cache:
        enabled: true
        max_size_mb: 100

schema_config:
  configs:
    - from: 2020-10-24
      store: tsdb
      object_store: filesystem
      schema: v13
      index:
        prefix: index_
        period: 24h

limits_config:
  max_query_length: 365d
  ingestion_rate_mb: 256

ruler:
  alertmanager_url: http://localhost:9093

analytics:
  reporting_enabled: false
docker run --name loki  --network loki-network -d -v $(pwd)/config:/mnt/config -p 3200:3200 grafana/loki:3.0.0 -config.file=/mnt/config/loki-config.yaml

Mock Service Dockerfile (fake.dockerfile):

FROM python:3.9-slim

WORKDIR /app

COPY main.py /app

RUN pip install fastapi structlog uvicorn

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "9012", "--reload", "--log-level", "debug"]
EXPOSE 9012

Mock Service Code (main.py):

from typing import Any
import datetime
import uuid

from fastapi import FastAPI
import structlog
import uvicorn

APP_NAME = "mock-service"

logger: Any = structlog.get_logger()
app: FastAPI = FastAPI(title="Mock Service", description="Mock Service", version="0.1.0")

class CustomPrintLogger:
    def msg(self, message: Any) -> None:
        print(message)

logger = structlog.wrap_logger(
    CustomPrintLogger(),
    wrapper_class=structlog.BoundLogger,
    processors=[structlog.processors.JSONRenderer()],
)

@app.get("/levels")
def levels() -> dict[str, Any]:
    logger.msg("mock-event", time=str(datetime.datetime.now(datetime.timezone.utc)),
               level="fatal", appName=APP_NAME, message="mock message", host="mock-host",
               compName="mock-component")
    logger.msg("mock-event", time=str(datetime.datetime.now(datetime.timezone.utc)),
               level="critical", appName=APP_NAME, message="mock message", host="mock-host",
               compName="mock-component")
    logger.msg("mock-event", time=str(datetime.datetime.now(datetime.timezone.utc)),
               level="error", appName=APP_NAME, message="mock message", host="mock-host",
               compName="mock-component")
    logger.msg("mock-event", time=str(datetime.datetime.now(datetime.timezone.utc)),
               level="warn", appName=APP_NAME, message="mock message", host="mock-host",
               compName="mock-component")
    logger.msg("mock-event", time=str(datetime.datetime.now(datetime.timezone.utc)),
               level="warning", appName=APP_NAME, message="mock message", host="mock-host",
               compName="mock-component")
    logger.msg("mock-event", time=str(datetime.datetime.now(datetime.timezone.utc)),
               level="info", appName=APP_NAME, message="mock message", host="mock-host",
               compName="mock-component")
    logger.msg("mock-event", time=str(datetime.datetime.now(datetime.timezone.utc)),
               level="debug", appName=APP_NAME, message="mock message", host="mock-host",
               compName="mock-component")
    logger.msg("mock-event", time=str(datetime.datetime.now(datetime.timezone.utc)),
               level="trace", appName=APP_NAME, message="mock message", host="mock-host",
               compName="mock-component")
    logger.msg("mock-event", time=str(datetime.datetime.now(datetime.timezone.utc)),
               level="unknown", appName=APP_NAME, message="mock message", host="mock-host",
               compName="mock-component")
    logger.msg("mock-event", time=str(datetime.datetime.now(datetime.timezone.utc)),
               level="random", appName=APP_NAME, message=f"mock message {uuid.uuid4()}",
               host="mock-host", compName="mock-component")

    print("FATAL: This is an example of unstructured log line")
    print("CRITICAL: This is an example of unstructured log line")
    print("ERROR: This is an example of unstructured log line")
    print("WARN: This is an example of unstructured log line")
    print("WARNING: This is an example of unstructured log line")
    print("INFO: This is an example of unstructured log line")
    print("DEBUG: This is an example of unstructured log line")
    print("TRACE: This is an example of unstructured log line")
    print("UNKNOWN: This is an example of unstructured log line")
    print(f"RANDOM: This is an example of unstructured log line {uuid.uuid4()}")

    return {}

if __name__ == "__main__":
    structlog.configure(
        processors=[
            structlog.stdlib.filter_by_level,
            structlog.processors.TimeStamper(fmt="iso", key="time"),
            structlog.processors.JSONRenderer(),
        ],
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
    )

    uvicorn.run("main:app", host="0.0.0.0", port=9012, reload=True, log_level="critical")
docker run --name mock-service -p 9012:9012 mock-custom:latest
docker build -t mock-custom:latest -f fake.dockerfile .
curl 127.0.0.1:9012/levels # to generate logs 

Screenshots, Promtail config, or terminal output Current result Image

Expected result Image

SmotrovaLilit commented 1 week ago

I think the issue might be related to https://github.com/grafana/loki/issues/14576