childe / gohangout

使用 golang 模仿的 Logstash。用于消费 Kafka 数据,处理后写入 ES、Clickhouse 等。
MIT License
1.01k stars 234 forks source link

JSON数据字段提取 #210

Closed a33151 closed 1 year ago

a33151 commented 1 year ago

感谢抽空查看!

目前有一个json的数据,只想要contents,还有tags下的数据 尝试多个方式写均无效 能否指教一下 谢谢!

我的配置如下:

filters:
  - Json:
      field: contents
  - Add:
      overwrite: true
      fields:
        container_image_name: '[tags][container_image_name]'
        log_file_path:'[tags][log_file_path]'
        topic:'[tags][topic]'
        k8s_pod_uid:'[tags][k8s_pod_uid]'
        k8s_pod_name:'[tags][k8s_pod_name]'
        k8s_node_name:'[tags][k8s_node_name]'
        k8s_namespace_name:'[tags][k8s_namespace_name]'
        host_name:'[tags][host_name]'
        host.ip:'[tags][host.ip]'
        container_name:'[tags][container_name]'
        container_ip:'[tags][container_ip]'
        ...

就需要写很长的每个字段的读取

contents下是字段是动态的, 不想写死某些字段 请问有什么好办法吗?

image

a33151 commented 1 year ago

感谢大神如此快的回复。

不过我在最新的release的时候还是有遇到问题

配置

filters:
  - Json:
      field: contents
      exclude: {"contents","tags"}

报错如下,是我这边的使用问题么? 感谢。。。

panic: interface conversion: interface {} is map[interface {}]interface {}, not []string

goroutine 59 [running]:
github.com/childe/gohangout/filter.newJSONFilter(0xafef00?)
        /home/runner/work/gohangout/gohangout/filter/json.go:49 +0x329
github.com/childe/gohangout/filter.BuildFilter({0xc00003702c, 0x4}, 0xbcbf75?)
        /home/runner/work/gohangout/gohangout/filter/filter.go:27 +0x19b
github.com/childe/gohangout/topology.BuildFilterBoxes(0x0?, 0xc12120)
        /home/runner/work/gohangout/gohangout/topology/filter.go:129 +0x403
github.com/childe/gohangout/input.(*InputBox).buildTopology(0xc00031b0a0, 0x6)
        /home/runner/work/gohangout/gohangout/input/input_box.go:108 +0xd7
github.com/childe/gohangout/input.(*InputBox).beat(0xc00031b0a0, 0x0?)
        /home/runner/work/gohangout/gohangout/input/input_box.go:66 +0x33
created by github.com/childe/gohangout/input.(*InputBox).Beat
        /home/runner/work/gohangout/gohangout/input/input_box.go:135 +0x6d
a33151 commented 1 year ago

@childe

childe commented 1 year ago

感谢大神如此快的回复。

不过我在最新的release的时候还是有遇到问题

配置

filters:
  - Json:
      field: contents
      exclude: {"contents","tags"}

报错如下,是我这边的使用问题么? 感谢。。。

panic: interface conversion: interface {} is map[interface {}]interface {}, not []string

goroutine 59 [running]:
github.com/childe/gohangout/filter.newJSONFilter(0xafef00?)
        /home/runner/work/gohangout/gohangout/filter/json.go:49 +0x329
github.com/childe/gohangout/filter.BuildFilter({0xc00003702c, 0x4}, 0xbcbf75?)
        /home/runner/work/gohangout/gohangout/filter/filter.go:27 +0x19b
github.com/childe/gohangout/topology.BuildFilterBoxes(0x0?, 0xc12120)
        /home/runner/work/gohangout/gohangout/topology/filter.go:129 +0x403
github.com/childe/gohangout/input.(*InputBox).buildTopology(0xc00031b0a0, 0x6)
        /home/runner/work/gohangout/gohangout/input/input_box.go:108 +0xd7
github.com/childe/gohangout/input.(*InputBox).beat(0xc00031b0a0, 0x0?)
        /home/runner/work/gohangout/gohangout/input/input_box.go:66 +0x33
created by github.com/childe/gohangout/input.(*InputBox).Beat
        /home/runner/work/gohangout/gohangout/input/input_box.go:135 +0x6d

语法如下,但看你之前的意思,应该是 include,不是 exclude

exclude: ["contents","tags"]
a33151 commented 1 year ago
已修改成如下: 
filters:
  - Json:
      field: contents
      include: ["contents","tags"]

启动时候还是有个报错

panic: interface conversion: interface {} is []interface {}, not []string

goroutine 83 [running]:
github.com/childe/gohangout/filter.newJSONFilter(0xafef00?)
        /home/runner/work/gohangout/gohangout/filter/json.go:46 +0x335
github.com/childe/gohangout/filter.BuildFilter({0xc00022af2c, 0x4}, 0xbcbf75?)
        /home/runner/work/gohangout/gohangout/filter/filter.go:27 +0x19b
github.com/childe/gohangout/topology.BuildFilterBoxes(0x0?, 0xc12120)
        /home/runner/work/gohangout/gohangout/topology/filter.go:129 +0x403
github.com/childe/gohangout/input.(*InputBox).buildTopology(0xc0004240e0, 0x4)
        /home/runner/work/gohangout/gohangout/input/input_box.go:108 +0xd7
github.com/childe/gohangout/input.(*InputBox).beat(0xc0004240e0, 0x0?)
        /home/runner/work/gohangout/gohangout/input/input_box.go:66 +0x33
created by github.com/childe/gohangout/input.(*InputBox).Beat
        /home/runner/work/gohangout/gohangout/input/input_box.go:135 +0x6d

@childe

childe commented 1 year ago

嗯啊,写的不对,369f538 已经修复。

a33151 commented 1 year ago

感谢 我再测试一把

a33151 commented 1 year ago

这次我这边只是有一条报错信息,日志无法采集上去

E0412 21:12:29.670319 80231 group_consumer.go:465] failed to send heartbeat, restart: The group is rebalancing, so a rejoin is needed.

我日志等级已经调整到V20了 但是还是没其他日志输出

配置与上述的是保持一致的

filters:
  - Json:
      field: contents
      include: ["contents","tags"]
childe commented 1 year ago

正常现象,join 好了就能消费了。

--logtostderr 会打印到屏幕

a33151 commented 1 year ago

大佬, 我使用的最新的版本...

目前运行都正常 但是发现 contents . tags 还是没法去掉

以下是我debug出来的日志

{"@timestamp":"2023-04-13T14:06:30.693160207+08:00","contents":{"@timestamp":"2023-04-13T03:31:38.870Z","app":"xxxxxxxxx","class":"o.a.k.c.c.internals.ConsumerCoordinator","ip":"172.100.24.243","level":"DEBUG","line":"726","message":" [Consumer clientId=consumer-9, groupId=xxxxxxxxx] Sending asynchronous auto-commit of offsets {}","parent":"","span":"","stack_trace":"","thread":"org.springframework.kafka.KafkaListenerEndpointContainer#1-1-C-1","trace":"","traceid":"TID:N/A"},"tags":{"container_image_name":"harbor.xxxx.com:18080/java/xxxxxxxxx:8ba5984d68fd74557f335ed129d37362210d868d-20230408121338","container_ip":"x.x.x.x","container_name":"master","host.ip":"10.104.63.109","host_name":"nodexxxxxxx","k8s_namespace_name":"xxxxxxx","k8s_node_ip":"x.x.x.x","k8s_node_name":"nodexxxxxxx","k8s_pod_name":"xxxxxxxxx-5745cdcbd5-mcnsf","k8s_pod_uid":"1cd8b17a-2418-4460-87ad-8ff90d148389","log_file_path":"/data/logs/xxxxxxxxx/xxxxxxxxx.json","topic":"xxxxxxx-elklog"},"time":1681365443}

最终收集到ES上也没去掉这个contents tags, include 与exclude我都试过了. 我把我配置贴下

inputs:
  - Kafka:
      topic:
        logstash-logtail: 5
      codec: json
      encoding: UTF8
      consumer_settings:
        bootstrap.servers: xx:9092
        group.id: logstash-logtail
        max.partition.fetch.bytes: "10485760"
        auto.commit.interval.ms: "10000"
        from.beginning: "false"

filters:
  - Json:
      field: contents
      include: ["contents","tags"]
  - Date:
      src: 'contents.@timestamp'
      target: '@timestamp'
      location: Asia/Shanghai
      add_year: false
      overwrite: true
      formats:
        - '2006-01-02 15:04:05'
  - Convert:
      fields:
        responsetime:
          to: float
outputs:
  - Elasticsearch:
      hosts:
        - "http://xxx:9200"
      index: '%{k8s_namespace_name}-%{app}-%{+2006.01.02}'
      index_time_location: "Local"
      bulk_actions: 36000
      bulk_size: 30
      flush_interval: 10
      concurrent: 1
      es_version: 7
      codec: json
      compress: false
      retry_response_code: [401, 502]

@childe

childe commented 1 year ago

https://github.com/childe/gohangout/issues/new?assignees=childe&labels=cookbook&template=%E6%95%B0%E6%8D%AE%E5%A4%84%E7%90%86%E9%97%AE%E9%A2%98-%E4%B8%8E%E9%A2%84%E6%9C%9F%E4%B8%8D%E7%AC%A6.md&title=

按这个测一下