cloudflare / goflow

The high-scalability sFlow/NetFlow/IPFIX collector used internally at Cloudflare.
BSD 3-Clause "New" or "Revised" License
859 stars 172 forks source link

Is the current flow.proto correct? #44

Closed natemccallum closed 4 years ago

natemccallum commented 4 years ago

Great project, thank you! I am trying to use the output format of protobuf with kafka and then into Clickhouse. I keep getting this message from CH leading me to think something may not be right with the flow.proto file. Code: 444. DB::Exception: Received from localhost:9000. DB::Exception: Protobuf messages are corrupted or don't match the provided schema. Please note that Protobuf stream is length-delimited: every message is prefixed by its length in varint. If flow.proto is confirmed correct I'll take this to the CH forum.

lspgn commented 4 years ago

Hi @natemccallum, I use an inserter for Clickhouse made in Go since I need to do lookups before inserting. What's the Clickhouse Schema?

natemccallum commented 4 years ago

Thank you for the reply @lspgn Trying to use this. I went back and forth with String and Array(String) for the Addresses that are types and bytes in the flow.proto.

create table flow ( Type Enum8('FLOWUNKNOWN' = 0, 'SFLOW_5' = 1, 'NETFLOW_V5' = 2, 'NETFLOW_V9' = 3, 'IPFIX' = 4), TimeReceived UInt64, SequenceNum UInt32, SamplingRate UInt64, FlowDirection UInt32, SamplerAddress Array(String), TimeFlowStart UInt64, TimeFlowEnd UInt64, Bytes UInt64, Packets UInt64, SrcAddr Array(String), DstAddr Array(String), Etype UInt32, Proto UInt32, SrcPort UInt32, DstPort UInt32, SrcIf UInt32, DstIf UInt32, SrcMac UInt64, DstMac UInt64, SrcVlan UInt32, DstVlan UInt32, VlanId UInt32, IngressVrfID UInt32, EgressVrfID UInt32, IPTos UInt32, ForwardingStatus UInt32, IPTTL UInt32, TCPFlags UInt32, IcmpType UInt32, IcmpCode UInt32, IPv6FlowLabel UInt32, FragmentId UInt32, FragmentOffset UInt32, BiFlowDirection UInt32, SrcAS UInt32, DstAS UInt32, NextHop Array(String), NextHopAS UInt32, SrcNet UInt32, DstNet UInt32 ) ENGINE=Kafka SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list = 'flow-messages', kafka_group_name = 'mygroup', kafka_format = 'Protobuf', kafka_schema = 'flow:FlowMessage', kafka_skip_broken_messages = 1;

lspgn commented 4 years ago

Hm, that goes beyond my knowledge. Is any message inserted at all? What if you remove the unsized-fields (ip addresses)? You may get more luck with the Clickhouse forums :( sorry.

natemccallum commented 4 years ago

Thanks. will try over there.