childe / gohangout

使用 golang 模仿的 Logstash。用于消费 Kafka 数据,处理后写入 ES、Clickhouse 等。
MIT License
1.03k stars 239 forks source link

gohangout supports writing data to clickhouse? #5

Closed liulei18 closed 5 years ago

liulei18 commented 5 years ago
childe commented 5 years ago

可以的, 写到readme里面了.

但是只是实现性质的, 虽然我们也在使用, 不过未必适用各种各样的场景, 比如说有字段缺失时的处理可能不妥, 甚至有可能有错误.

childe commented 5 years ago

我尽快修掉这个问题吧. 如果确保字段不会缺失, 可以用. 否则还是先不要用.. :((

liulei18 commented 5 years ago

好的,感谢,gohangout 不需要引入插件就可以写入到clickhouse吧?而hangout我看文档是需要的

childe commented 5 years ago

不需要.

liulei18 commented 5 years ago
childe commented 5 years ago

我给的example.yml你不能直接拿来用~ 要看自己的需求配置.

liulei18 commented 5 years ago
childe commented 5 years ago

不需要指定ZK, 只要指定kafka的broker地址.

liulei18 commented 5 years ago
childe commented 5 years ago

./gohangout-linux-x64-656cd32 --config example.yml --logtostderr -v 10 看一下日志.

liulei18 commented 5 years ago

./gohangout-linux-x64-656cd32 --config example.yml --logtostderr -v 10 看一下日志.

I1227 17:44:43.180495    3128 gohangout.go:75] map[inputs:[map[Kafka:map[consumer_settings:map[bootstrap.servers:192.168.252.206:9092,192.168.252.203:9092,192.168.50.252:9092 group.id:gohangout.goweblog] topic:map[goweblog:1] codec:json]]] filters:[map[Grok:map[src:message match:[^(?P<logtime>\S+) (?P<name>\w+) (?P<status>\d+)$] remove_fields:[message]]] map[Date:map[location:Asia/Shanghai src:logtime formats:[RFC3339] remove_fields:[logtime]]]] outputs:[map[Elasticsearch:map[hosts:[http://192.168.252.206:9200 http://192.168.252.203:9200 http://192.168.50.252:9200] index:web-%{+2006-01-02} index_type:logs bulk_actions:5000 bulk_size:20 flush_interval:60]]]]
I1227 17:44:43.180709    3128 output.go:21] output type: Elasticsearch
I1227 17:44:43.180727    3128 output.go:23] output config: map[bulk_size:20 flush_interval:60 hosts:[http://192.168.252.206:9200 http://192.168.252.203:9200 http://192.168.50.252:9200] index:web-%{+2006-01-02} index_type:logs bulk_actions:5000]
I1227 17:44:43.182625    3128 filter.go:34] filter type: Date
I1227 17:44:43.182705    3128 filter.go:36] filter config: map[location:Asia/Shanghai src:logtime formats:[RFC3339] remove_fields:[logtime]]
I1227 17:44:43.183719    3128 filter.go:47] filter type: Grok
I1227 17:44:43.183739    3128 filter.go:49] filter config: map[match:[^(?P<logtime>\S+) (?P<name>\w+) (?P<status>\d+)$] remove_fields:[message] src:message]
I1227 17:44:43.183785    3128 grok.go:58] patterns:map[]
I1227 17:44:43.183832    3128 grok.go:130] final pattern:^(?P<logtime>\S+) (?P<name>\w+) (?P<status>\d+)$
I1227 17:44:43.184289    3128 gohangout.go:44] input[1] map[Kafka:map[topic:map[goweblog:1] codec:json consumer_settings:map[bootstrap.servers:192.168.252.206:9092,192.168.252.203:9092,192.168.50.252:9092 group.id:gohangout.goweblog]]]
I1227 17:44:43.184967    3128 broker.go:106] 192.168.252.43:56073 -> 192.168.252.206:9092
I1227 17:44:43.185013    3128 broker.go:107] request length: 58. api: 3 CorrelationID: 1
I1227 17:44:43.187290    3128 broker.go:135] response length in header: 24
I1227 17:44:43.187320    3128 broker.go:158] response length: 24. CorrelationID: 1
I1227 17:44:43.187348    3128 brokers.go:50] got 0 brokers
E1227 17:44:43.187402    3128 group_consumer.go:254] could not find coordinator: could not find coordinator from all brokers
E1227 17:44:43.287678    3128 group_consumer.go:254] could not find coordinator: could not find coordinator from all brokers
E1227 17:44:43.387941    3128 group_consumer.go:254] could not find coordinator: could not find coordinator from all brokers
inputs:
    - Kafka:
        topic:
            goweblog: 1
        codec: json
        consumer_settings:
            bootstrap.servers: "192.168.252.206:9092,192.168.252.203:9092,192.168.50.252:9092"
            group.id: gohangout.goweblog
filters:
    - Grok:
        src: message
        match:
            - '^(?P<logtime>\S+) (?P<name>\w+) (?P<status>\d+)$'
        remove_fields: ['message']
    - Date:
        location: 'Asia/Shanghai'
        src: logtime
        formats:
            - 'RFC3339'
        remove_fields: ["logtime"]
outputs:
    - Elasticsearch:
        hosts:
            - http://192.168.252.206:9200
            - http://192.168.252.203:9200
            - http://192.168.50.252:9200
        index: 'web-%{+2006-01-02}' #golang里面的渲染方式就是用数字, 而不是用YYMM.
        index_type: "logs"
        bulk_actions: 5000
        bulk_size: 20
        flush_interval: 60
childe commented 5 years ago

kafka集群地址是应该用逗号隔开, 格式是没错的. 你们是什么版本的kafka, 奇怪为啥拿到的broker数量是0.

liulei18 commented 5 years ago
childe commented 5 years ago

可能是版本太低了, 0.9之前的版本我没有测试过. 我这边0.9.0.X , 2.0 这两个版本都有使用, 是集群.

liulei18 commented 5 years ago
I1228 18:09:55.000614   28336 gohangout.go:75] map[inputs:[map[Kafka:map[topic:map[testclickhouse:3] codec:json consumer_settings:map[bootstrap.servers:mini5:9092,mini6:9092,mini7:9092 group.id:gohangout.testclickhouse]]]] filters:[map[Grok:map[match:[^(?P<id>\S+) (?P<name>\w+)$] remove_fields:[message] src:message]]] outputs:[map[Clickhouse:map[fields:[id name] bulk_actions:1 flush_interval:1 hosts:[tcp://localhost:8123] table:cms.testclickhouse]]]]
I1228 18:09:55.000804   28336 output.go:21] output type: Clickhouse
I1228 18:09:55.000822   28336 output.go:23] output config: map[table:cms.testclickhouse fields:[id name] bulk_actions:1 flush_interval:1 hosts:[tcp://localhost:8123]]
I1228 18:09:55.000887   28336 clickhouse_output.go:86] query: INSERT INTO cms.testclickhouse ("id","name") VALUES (?,?)
childe commented 5 years ago

8123是HTTP接口, TCP的应该是9000端口.

liulei18 commented 5 years ago
childe commented 5 years ago

多谢~

版本问题, 我需要再查一下原因, 自己搭建一个0.8版本的测下看, 因为只看kafka文档说明,应该是支持0.8版本的.

childe commented 5 years ago

之前的版本只是测试用, 最新代码更新了一些功能, 修复了一些问题.

  1. 可以配置用户名/密码
  2. 多个DB入口的时候, 轮询写
  3. 如果字段缺失, 可以根据 desc table 信息补充默认址. // 还有待完善.

这个版本打包好了放在 https://github.com/childe/gohangout/releases/tag/d1cd103