loggie-io / loggie

A lightweight, cloud-native data transfer agent and aggregator
https://loggie-io.github.io/docs-en/
Apache License 2.0
1.26k stars 167 forks source link

Feat: add sink clickhouse #619

Open zhu733756 opened 1 year ago

zhu733756 commented 1 year ago

Proposed Changes:

Which issue(s) this PR fixes:

Fixes https://github.com/loggie-io/loggie/issues/494

Additional documentation:

How to run it with source kubeEvents:

1.  Exec ddl on your clickhouse server
---
CREATE DATABASE events;
CREATE TABLE events.events (
    namespace String,
    name String,
    selfLink String,
    uid String,
    resourceVersion String,
    creationTimestamp String,
    managedFields String,
    kind String,
    apiVersion String,
    fieldPath String,
    reason String,
    message String,
    host String,
    firstTimestamp String,
    lastTimestamp String,
    count Float64,
    type String,
    eventTime String,
    reportingComponent String,
    reportingInstance String
) ENGINE = MergeTree() ORDER BY (namespace, name, creationTimestamp)
---
2. git clone https://github.com/zhu733756/loggie
3. add vscode launch.json (cd loggie && mkdir -p .vscode)
---
{
    // Use IntelliSense to learn about possible attributes.
    // Hover to view descriptions of existing attributes.
    // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
    "version": "0.2.0",
    "configurations": [
        {
            "name": "loggie-clickhouse-sink",
            "type": "go",
            "request": "launch",
            "mode": "auto",
            "program": "${workspaceFolder}/cmd/loggie/main.go",
            "args": ["--config.system", "/root/go/src/github.com/loggie/.vscode/loggie.yml", "--config.pipeline", "/root/go/src/github.com/loggie/.vscode/pipeline.yml", "--meta.nodeName", "nodeX", "--log.level","debug"]
        }
    ]
}
---
4.loggie.yml(default)
---
loggie:
  reload:
    enabled: true
    period: 10s

  monitor:
    logger:
      period: 30s
      enabled: true
    listeners:
      filesource: ~
      filewatcher: ~
      reload: ~
      sink: ~
      queue: ~
      pipeline: ~
      sys: ~

  discovery:
    enabled: false
    kubernetes:
      containerRuntime: containerd
      kubeconfig: "/root/.kube/config"
      typePodFields:
        namespace: "${_k8s.pod.namespace}"
        podname: "${_k8s.pod.name}"
        containername: "${_k8s.pod.container.name}"
        nodename: "${_k8s.node.name}"
        logconfig: "${_k8s.logconfig}"

  defaults:
    sink:
      type: dev
    sources:
      - type: file
        timestampKey: "@timestamp"
        bodyKey: "message"
        fieldsUnderRoot: true
        addonMeta: true
        addonMetaSchema:
          underRoot: true
          fields:
            filename: "${_meta.filename}"
            line: "${_meta.line}"
        watcher:
          maxOpenFds: 6000
  http:
    enabled: true
---
5. pipeline.yml
---
pipelines:
- name: clickhouse
  sources:
  - type: kubeEvent
    name: events
    kubeconfig: "/root/.kube/config"
  queue:
    type: channel
    batchSize: 2048000 
    batchBytes: 33554432000 
    batchAggTimeout: 10s
  interceptors:
  - type: transformer
    name: jsonDecode
    actions:
    - if: exist(body)
      then:
      - action: jsonDecode(body)
  - type: transformer
    name: underRoot
    actions:
    - if: exist(metadata)
      then:
      - action: underRoot(metadata)
    - if: exist(involvedObject)
      then:
      - action: underRoot(involvedObject)
    - if: exist(source)
      then:
      - action: underRoot(source)
  - type: transformer
    name: setField
    actions:
    - action: set(managedFields, "")
  - type: transformer
    name: setEmpty
    actions:
    - if: NOT exist(host)
      then:
      - action: set(host, "")
    - if: NOT exist(fieldPath)
      then:
      - action: set(fieldPath, "")
    - if: NOT exist(apiVersion)
      then:
      - action: set(apiVersion, "")
    - if: NOT exist(count)
      then:
        - action: set(count, 0)
        - action: strconv(count, float)
        - action: return()
      else:
        - action: return()
  - type: maxbytes
    maxBytes: 1048576
  - type: rateLimit
    qps: 10000
  sink:
    type: clickhouse
    addr: ["xx.xx.xx.xx:9000"]
    user: "defaut"
    password: "defaut"
    database: "events"
    table: "events"
    debug: true
---
6. Click Vscode Debug button would run it.
zhu733756 commented 1 year ago

/cc @ethfoo