zking2000 / NotePad

1 stars 0 forks source link

log #84

Closed zking2000 closed 1 month ago

zking2000 commented 1 month ago
gcloud container clusters describe YOUR_CLUSTER_NAME --zone YOUR_ZONE
# 创建Pub/Sub主题
gcloud pubsub topics create loki-logs

# 创建日志接收器
gcloud logging sinks create loki-sink \
    pubsub.googleapis.com/projects/YOUR_PROJECT_ID/topics/loki-logs \
    --log-filter="resource.type=k8s_container"

# 获取sink的service account
SINK_SERVICE_ACCOUNT=$(gcloud logging sinks describe loki-sink --format="value(writerIdentity)")

# 授予发布权限给sink的service account
gcloud pubsub topics add-iam-policy-binding loki-logs \
    --member="$SINK_SERVICE_ACCOUNT" \
    --role="roles/pubsub.publisher"
apiVersion: apps/v1
kind: Deployment
metadata:
  name: log-forwarder
spec:
  replicas: 1
  selector:
    matchLabels:
      app: log-forwarder
  template:
    metadata:
      labels:
        app: log-forwarder
    spec:
      containers:
      - name: log-forwarder
        image: gcr.io/YOUR_PROJECT_ID/log-forwarder:v1
        env:
        - name: PUBSUB_SUBSCRIPTION
          value: "loki-logs-sub"
        - name: LOKI_URL
          value: "http://loki:3100/loki/api/v1/push"
import os
from google.cloud import pubsub_v1
import requests
import json

project_id = os.getenv('GOOGLE_CLOUD_PROJECT')
subscription_name = os.getenv('PUBSUB_SUBSCRIPTION')
loki_url = os.getenv('LOKI_URL')

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_name)

def callback(message):
    log_entry = json.loads(message.data)
    loki_payload = {
        "streams": [
            {
                "stream": {
                    "job": "gke_logs",
                },
                "values": [
                    [str(int(log_entry['timestamp']['seconds']) * 1e9), log_entry['textPayload']]
                ]
            }
        ]
    }
    response = requests.post(loki_url, json=loki_payload)
    if response.status_code == 204:
        message.ack()
    else:
        print(f"Failed to send log to Loki: {response.status_code}, {response.text}")

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}")

try:
    streaming_pull_future.result()
except Exception as e:
    streaming_pull_future.cancel()
    print(f"Listening for messages on {subscription_path} threw an exception: {e}")
zking2000 commented 1 month ago
# 使用官方 Python 运行时作为父镜像
FROM python:3.9-slim

# 设置工作目录
WORKDIR /app

# 将当前目录内容复制到容器的 /app 中
COPY . /app

# 安装所需的包
RUN pip install --no-cache-dir google-cloud-pubsub requests

# 设置环境变量
ENV GOOGLE_APPLICATION_CREDENTIALS=/app/service-account-key.json

# 运行 app.py
CMD ["python", "app.py"]
zking2000 commented 1 month ago
# Step 1: 获取之前创建的 KMS 密钥的完整资源名称
KMS_KEY_NAME=$(gcloud kms keys describe loki-logs-key \
    --location global \
    --keyring pubsub-keyring \
    --format="value(name)")

# Step 2: 创建使用 CMEK 加密的 Pub/Sub 订阅
gcloud pubsub subscriptions create loki-logs-sub \
    --topic loki-logs \
    --kms-key=$KMS_KEY_NAME \
    --ack-deadline=60 \
    --message-retention-duration=1h \
    --expiration-period=never

# Step 3: 验证订阅创建和加密设置
gcloud pubsub subscriptions describe loki-logs-sub

# Step 4: (可选) 设置死信队列策略
gcloud pubsub subscriptions update loki-logs-sub \
    --dead-letter-topic=projects/YOUR_PROJECT_ID/topics/loki-logs-dlq \
    --max-delivery-attempts=5

# 注意:请确保先创建 loki-logs-dlq 主题,再执行步骤 4
zking2000 commented 1 month ago

Detailed Flow

Log Generation and Collection

Applications running in the GKE cluster generate logs. The default GKE logging agent (typically fluentd) collects these logs. Collected logs are automatically sent to GCP Cloud Logging.

Cloud Logging Processing

Cloud Logging receives and stores the logs. Logs are available for viewing and analysis in the GCP Console.

Log Export to Pub/Sub

A Cloud Logging export (Log Sink) is configured to send logs to a Pub/Sub topic. The Pub/Sub topic is encrypted using Customer-Managed Encryption Keys (CMEK) for enhanced security. Google Cloud's internal log export service automatically publishes matching log entries to the specified Pub/Sub topic.

Log Forwarder Processing

A custom log forwarder is deployed as a Deployment in the GKE cluster. The log forwarder subscribes to the Pub/Sub topic and reads log messages. It processes each log message, transforming it into a format acceptable by Loki. Processed logs are then sent to Loki via HTTP requests.

zking2000 commented 1 month ago

curl -v -H "traceparent: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01" \
        -H "X-B3-TraceId: 0af7651916cd43dd8448eb211c80319c" \
        -H "X-B3-SpanId: b7ad6b7169203331" \
        -H "X-B3-Sampled: 1" \
        http://your-server-address:port/your-endpoint
zking2000 commented 1 month ago

receivers:
  syslog:
    udp:
      listen_address: "localhost:5140"
    protocols:
      rfc3164:
        location: UTC
      rfc5424:
        location: UTC
  otlp:
    protocols:
      grpc:
        endpoint: "localhost:8017"
      http:
        endpoint: "localhost:8018"
  filelog:
    include: [ /appvol/nginx/logs/access.log ]
    start_at: beginning
    operators:
      - type: json_parser
        timestamp:
          parse_from: time_local
          layout: '02/Jan/2006:15:04:05 -0700'
      - type: trace_parser
        trace_id:
          parse_from: opentelemetry_trace_id
        span_id:
          parse_from: opentelemetry_span_id
      - type: attributes
        attributes:
          http.method: request
          http.target: request
          http.host: http_host
          http.scheme: scheme
          http.status_code: status
          http.response.body.size: body_bytes_sent
          user_agent.original: http_user_agent
          client.ip: remote_addr
          url.full: request
prometheus:
  config:
    scrape_configs:
      - job_name: 'gce-otel-collector'
        scrape_interval: 5s
        static_configs:
          # ... (保持原有的prometheus配置不变)
zking2000 commented 1 month ago

receivers:
  filelog:
    include: [ /path/to/your/nginx/access.log ]
    start_at: beginning
    operators:
      - type: json_parser
        timestamp:
          parse_from: time_local
          layout: '02/Jan/2006:15:04:05 -0700'
      - type: trace_parser
        trace_id:
          parse_from: opentelemetry_trace_id
        span_id:
          parse_from: opentelemetry_span_id
      - type: attributes
        attributes:
          http.method: 
            from: request
            pattern: '^(\S+)'
          http.target: 
            from: request
            pattern: '^(?:\S+\s+)(\S+)'
          http.host: http_host
          http.scheme: 
            from: request
            pattern: '^(\S+)'
          http.status_code: status
          http.response.body.size: body_bytes_sent
          user_agent.original: http_user_agent
          client.ip: remote_addr
          url.full: request
          opentelemetry.trace_id: opentelemetry_trace_id
          opentelemetry.span_id: opentelemetry_span_id
          opentelemetry.parent_span_id: opentelemetry_context_traceparent
          ssl.protocol: ssl_protocol
          request.time: request_time
          upstream.response.time: upstream_response_time
          upstream.addr: upstream_addr
          upstream.status: upstream_status
          bytes.sent: bytes_sent
          request.length: request_length

processors:
  # 添加任何需要的处理器,例如批处理或过滤

exporters:
  # 配置您的导出器,例如 OTLP 导出器

service:
  pipelines:
    logs:
      receivers: [filelog]
      processors: [batch]
      exporters: [otlp]
zking2000 commented 1 month ago

receivers:
  filelog:
    include: [ /path/to/your/nginx/access.log ]
    start_at: beginning
    operators:
      - type: json_parser
      - type: add_attributes
        attributes:
          log.file.name: "{{ .LogFileName }}"
      - type: metadata_attributes
        attributes:
          host.name: "{{ host.name }}"
      - type: move
        from: body
        to: message
      - type: trace_parser
        trace_id:
          parse_from: attributes.opentelemetry_trace_id
        span_id:
          parse_from: attributes.opentelemetry_span_id
      - type: resource
        attributes:
          service.name: "nginx"

processors:
  batch:

exporters:
  otlp:
    endpoint: "your-otlp-endpoint:4317"
    tls:
      insecure: true

service:
  pipelines:
    logs:
      receivers: [filelog]
      processors: [batch]
      exporters: [otlp]
zking2000 commented 1 month ago

receivers:
  filelog:
    include: [ /path/to/your/nginx/access.log ]
    start_at: beginning
    operators:
      - type: json_parser
      - type: trace_parser
        trace_id:
          parse_from: opentelemetry_trace_id
        span_id:
          parse_from: opentelemetry_span_id
        parent_span_id:
          parse_from: opentelemetry_context_traceparent

processors:
  batch:

exporters:
  otlp:
    endpoint: "your-otlp-endpoint:4317"
    tls:
      insecure: true

service:
  pipelines:
    logs:
      receivers: [filelog]
      processors: [batch]
      exporters: [otlp]
zking2000 commented 1 month ago
(resource.type="gce_instance" OR resource.type="k8s_container")
AND severity >= WARNING
AND (
  jsonPayload.logging.googleapis.com/trace != "" 
  OR labels."compute.googleapis.com/resource_name" != ""
)
zking2000 commented 1 month ago
(resource.type="gce_instance" OR resource.type="k8s_container")
AND severity >= WARNING
AND (
  jsonPayload.logging.googleapis.com/trace != "" 
  OR labels."compute.googleapis.com/resource_name" != ""
)
AND NOT (
  logName:"/logs/cloudaudit.googleapis.com%2Factivity" OR
  logName:"/logs/cloudaudit.googleapis.com%2Fdata_access" OR
  logName:"/logs/cloudaudit.googleapis.com%2Fsystem_event" OR
  logName:"/logs/cloudaudit.googleapis.com%2Fpolicy"
)
zking2000 commented 1 month ago

您说得对,我们确实需要一个更精确的过滤条件,以避免误过滤有用的日志。根据图片中显示的日志内容,我们可以创建一个更具体的过滤条件。以下是一个更精确的过滤条件建议:

logName="projects/hsbc-10614851-apigwppt-dev/logs/cloudaudit.googleapis.com%2Fdata_access" AND protoPayload.methodName="google.container.v1beta1.ClusterManager.ListClusters" AND protoPayload.authenticationInfo.principalEmail="forseti-scanner@hsbc-6320774-fdw-dev.iam.gserviceaccount.com" AND resource.type="gke_cluster" AND severity="INFO"


这个过滤条件的特点:

1. 精确匹配 `logName`,包含特定的项目ID和日志类型。
2. 匹配特定的 `methodName`,即 ClusterManager.ListClusters 方法。
3. 匹配来自特定服务账号的日志。
4. 匹配资源类型为 "gke_cluster" 的日志。
5. 匹配 severity 为 "INFO" 的日志。

使用这个过滤条件,您应该能够准确地识别并排除图片中所示的特定类型的日志,同时不会影响其他可能重要的日志。

要使用这个过滤条件,您需要在您的日志管理系统中配置一个排除规则。具体的配置方法可能会因您使用的系统而异。如果您使用的是 Google Cloud Logging,您可以在创建或编辑 sink 时使用这个条件的否定形式:

NOT(logName="projects/hsbc-10614851-apigwppt-dev/logs/cloudaudit.googleapis.com%2Fdata_access" AND protoPayload.methodName="google.container.v1beta1.ClusterManager.ListClusters" AND protoPayload.authenticationInfo.principalEmail="forseti-scanner@hsbc-6320774-fdw-dev.iam.gserviceaccount.com" AND resource.type="gke_cluster" AND severity="INFO")


这样配置后,只有匹配这个条件的日志会被排除,其他日志都会被保留。如果您需要针对特定的日志管理系统进行更详细的配置指导,请告诉我您使用的是哪个系统,我会提供更具体的步骤。​​​​​​​​​​​​​​​​
zking2000 commented 1 month ago

您提出了一个很好的问题。为了使Nginx的日志格式更好地适配syslog和OpenTelemetry Collector,我们需要对log_format进行一些调整。以下是一个优化后的配置建议:

  1. 修改Nginx的log_format
log_format json_combined escape=json '{'
    '"@timestamp":"$time_iso8601",'
    '"remote_addr":"$remote_addr",'
    '"remote_user":"$remote_user",'
    '"request":"$request",'
    '"status": "$status",'
    '"body_bytes_sent":"$body_bytes_sent",'
    '"http_referer":"$http_referer",'
    '"http_user_agent":"$http_user_agent",'
    '"http_x_forwarded_for":"$http_x_forwarded_for",'
    '"request_id":"$request_id",'
    '"request_time":"$request_time",'
    '"upstream_response_time":"$upstream_response_time",'
    '"upstream_addr":"$upstream_addr",'
    '"upstream_status":"$upstream_status",'
    '"bytes_sent":"$bytes_sent",'
    '"request_length":"$request_length"'
'}';
  1. 配置Nginx将日志发送到syslog:
access_log syslog:server=localhost:514,facility=local7,tag=nginx,severity=info json_combined;

这个配置做了以下改进:

  1. 使用$time_iso8601替代$time_local,提供了一个标准的ISO 8601格式的时间戳。

  2. 包含了图片中显示的所有字段。

  3. 移除了一些OpenTelemetry特定的字段(如opentelemetry_trace_id),因为这些通常不是由Nginx直接生成的。

  4. 保持了JSON格式,使得解析更加直接。

  5. 相应的OpenTelemetry Collector配置:

receivers:
  syslog:
    tcp:
      listen_address: "0.0.0.0:514"
    protocol: rfc5424
    operators:
      - type: json_parser
        parse_from: body
        timestamp:
          parse_from: '@timestamp'
          layout: '2006-01-02T15:04:05Z'

processors:
  # 可以添加处理器来重命名字段或进行其他转换
  attributes:
    actions:
      - key: http.request.id
        from_attribute: request_id
        action: insert
      - key: http.response.body.size
        from_attribute: body_bytes_sent
        action: insert
      # 可以添加更多字段映射

exporters:
  # 配置您的导出器

service:
  pipelines:
    logs:
      receivers: [syslog]
      processors: [attributes]
      exporters: [your_exporters]

这个配置:

  1. 正确解析JSON格式的syslog消息。
  2. 使用@timestamp字段作为日志时间戳。
  3. 通过属性处理器将一些字段映射到OpenTelemetry的标准命名约定。

实施这些更改后,您的Nginx日志将以结构化的JSON格式通过syslog发送,并且可以被OpenTelemetry Collector正确解析和处理。这种方法保持了日志的丰富信息,同时提高了与OpenTelemetry生态系统的兼容性。

如果您在实施过程中遇到任何问题,或者需要针对特定需求进行进一步的调整,请随时告诉我。​​​​​​​​​​​​​​​​

zking2000 commented 1 month ago
# 加载 UDP 模块
module(load="imudp")

# 监听来自 Nginx 的 UDP 消息
input(type="imudp" port="1514")

# 转发 Nginx 日志到 OpenTelemetry Collector
if $programname == 'nginx' then {
    action(type="omfwd" 
           Target="otel-collector-host" 
           Port="1514" 
           Protocol="tcp"
           Template="RSYSLOG_SyslogProtocol23Format")
    stop
}
receivers:
  syslog:
    tcp:
      listen_address: "0.0.0.0:1514"
    protocol: rfc5424
    operators:
      - type: regex_parser
        regex: '^<(?P<priority>\d+)>(?P<version>\d+)\s+(?P<timestamp>[^\s]+)\s+(?P<hostname>[^\s]+)\s+(?P<appname>[^\s]+)\s+(?P<procid>[^\s]+)\s+(?P<msgid>[^\s]+)\s+(?P<structured_data>[^\s]+)\s+(?P<message>.+)'
        parse_from: body
        timestamp:
          parse_from: attributes.timestamp
          layout: '%Y-%m-%dT%H:%M:%S.%LZ'
      - type: json_parser
        parse_from: attributes.message

processors:
  batch:

exporters:
  logging:
    verbosity: detailed
  otlp:
    endpoint: "your-otlp-endpoint:4317"
    tls:
      insecure: true

service:
  pipelines:
    logs:
      receivers: [syslog]
      processors: [batch]
      exporters: [logging, otlp]