cloudflare / goflow

The high-scalability sFlow/NetFlow/IPFIX collector used internally at Cloudflare.
BSD 3-Clause "New" or "Revised" License
852 stars 171 forks source link

Protobuf messages are corrupted or don't match the provided schema #105

Closed freedomwarrior closed 2 years ago

freedomwarrior commented 3 years ago
2021.08.18 12:55:49.898125 [ 53445 ] {} <Error> void DB::StorageKafka::threadFunc(size_t): Code: 444, e.displayText() = DB::Exception: Protobuf messages are corrupted or don't match the provided schema. Please note that Protobuf stream is length-delimited: every message is prefixed by its length in varint.: while parsing Kafka message (topic: flows, partition: 0, offset: 0)', Stack trace (when copying this message, always include the lines below):

0. DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, bool) @ 0x8f9b87a in /usr/bin/clickhouse
1. DB::ProtobufReader::throwUnknownFormat() const @ 0x11155007 in /usr/bin/clickhouse
2. ? @ 0x11155588 in /usr/bin/clickhouse
3. ? @ 0x11150ed9 in /usr/bin/clickhouse
4. DB::ProtobufRowInputFormat::readRow(std::__1::vector<COW<DB::IColumn>::mutable_ptr<DB::IColumn>, std::__1::allocator<COW<DB::IColumn>::mutable_ptr<DB::IColumn> > >&, DB::RowReadExtension&) @ 0x110f6582 in /usr/bin/clickhouse
5. DB::IRowInputFormat::generate() @ 0x11087828 in /usr/bin/clickhouse
6. DB::ISource::tryGenerate() @ 0x110127b5 in /usr/bin/clickhouse
7. DB::ISource::work() @ 0x1101239a in /usr/bin/clickhouse
8. DB::KafkaBlockInputStream::readImpl() @ 0x10b7772e in /usr/bin/clickhouse
9. DB::IBlockInputStream::read() @ 0xfd8b364 in /usr/bin/clickhouse
10. DB::copyData(DB::IBlockInputStream&, DB::IBlockOutputStream&, std::__1::function<void (DB::Block const&)> const&, std::__1::atomic<bool>*) @ 0xfdaf795 in /usr/bin/clickhouse
11. DB::StorageKafka::streamToViews() @ 0x10b6e4d9 in /usr/bin/clickhouse
12. DB::StorageKafka::threadFunc(unsigned long) @ 0x10b6d0b8 in /usr/bin/clickhouse
13. DB::BackgroundSchedulePoolTaskInfo::execute() @ 0xffef8a0 in /usr/bin/clickhouse
14. DB::BackgroundSchedulePool::threadFunction() @ 0xfff1917 in /usr/bin/clickhouse
15. ? @ 0xfff2694 in /usr/bin/clickhouse
16. ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) @ 0x8fdc4ff in /usr/bin/clickhouse
17. ? @ 0x8fdfde3 in /usr/bin/clickhouse
18. start_thread @ 0x9609 in /usr/lib/x86_64-linux-gnu/libpthread-2.31.so
19. clone @ 0x122293 in /usr/lib/x86_64-linux-gnu/libc-2.31.so
 (version 21.8.3.44 (official build))

Hello. I got this error when trying to load some data from kafka to clickhouse. ClickHouse server version 21.8.3.44 (official build).

Schema I use:

syntax = "proto3";
package flowprotob;

option java_package = "com.cloudflare.net.flowagg";
option java_outer_classname = "FlowMessagePb";

message FlowMessage {

  enum FlowType {
    FLOWUNKNOWN = 0;
    SFLOW_5 = 1;
    NETFLOW_V5 = 2;
    NETFLOW_V9 = 3;
    IPFIX = 4;
  }
  FlowType Type = 1;

  uint64 TimeReceived = 2;
  uint32 SequenceNum = 4;
  uint64 SamplingRate = 3;

  uint32 FlowDirection = 42;

  // Sampler information
  bytes SamplerAddress = 11;

  // Found inside packet
  uint64 TimeFlowStart = 38;
  uint64 TimeFlowEnd = 5;

  // Size of the sampled packet
  uint64 Bytes = 9;
  uint64 Packets = 10;

  // Source/destination addresses
  bytes SrcAddr = 6;
  bytes DstAddr = 7;

  // Layer 3 protocol (IPv4/IPv6/ARP/MPLS...)
  uint32 Etype = 30;

  // Layer 4 protocol
  uint32 Proto = 20;

  // Ports for UDP and TCP
  uint32 SrcPort = 21;
  uint32 DstPort = 22;

  // Interfaces
  uint32 InIf = 18;
  uint32 OutIf = 19;

  // Ethernet information
  uint64 SrcMac = 27;
  uint64 DstMac = 28;

  // Vlan
  uint32 SrcVlan = 33;
  uint32 DstVlan = 34;
  // 802.1q VLAN in sampled packet
  uint32 VlanId = 29;

  // VRF
  uint32 IngressVrfID = 39;
  uint32 EgressVrfID = 40;

  // IP and TCP special flags
  uint32 IPTos = 23;
  uint32 ForwardingStatus = 24;
  uint32 IPTTL = 25;
  uint32 TCPFlags = 26;
  uint32 IcmpType = 31;
  uint32 IcmpCode = 32;
  uint32 IPv6FlowLabel = 37;
  // Fragments (IPv4/IPv6)
  uint32 FragmentId = 35;
  uint32 FragmentOffset = 36;
  uint32 BiFlowDirection = 41;

  // Autonomous system information
  uint32 SrcAS = 14;
  uint32 DstAS = 15;

  bytes NextHop = 12;
  uint32 NextHopAS = 13;

  // Prefix size
  uint32 SrcNet = 16;
  uint32 DstNet = 17;

  // IP encapsulation information
  bool HasEncap = 43;
  bytes SrcAddrEncap = 44;
  bytes DstAddrEncap = 45;
  uint32 ProtoEncap = 46;
  uint32 EtypeEncap = 47;

  uint32 IPTosEncap = 48;
  uint32 IPTTLEncap = 49;
  uint32 IPv6FlowLabelEncap = 50;
  uint32 FragmentIdEncap = 51;
  uint32 FragmentOffsetEncap = 52;

  // MPLS information
  bool HasMPLS = 53;
  uint32 MPLSCount = 54;
  uint32 MPLS1TTL = 55; // First TTL
  uint32 MPLS1Label = 56; // First Label
  uint32 MPLS2TTL = 57; // Second TTL
  uint32 MPLS2Label = 58; // Second Label
  uint32 MPLS3TTL = 59; // Third TTL
  uint32 MPLS3Label = 60; // Third Label
  uint32 MPLSLastTTL = 61; // Last TTL
  uint32 MPLSLastLabel = 62; // Last Label

  // PPP information
  bool HasPPP = 63;
  uint32 PPPAddressControl = 64;

  // Custom fields: start after ID 1000:
  // uint32 MyCustomField = 1000;

}

And I use create.sh from repo for clickhouse:

#!/bin/bash
set -e

clickhouse client --password -n <<-EOSQL

    CREATE DATABASE IF NOT EXISTS dictionaries;

    CREATE DICTIONARY IF NOT EXISTS dictionaries.protocols (
        proto UInt8,
        name String,
        description String
    )
    PRIMARY KEY proto
    LAYOUT(FLAT())
    SOURCE (FILE(path '/var/lib/clickhouse/user_files/protocols.csv' format 'CSVWithNames'))
    LIFETIME(3600);

    CREATE TABLE IF NOT EXISTS flows
    (
        TimeReceived UInt64,
        TimeFlowStart UInt64,

        SequenceNum UInt32,
        SamplingRate UInt64,
        SamplerAddress FixedString(16),

        SrcAddr FixedString(16),
        DstAddr FixedString(16),

        SrcAS UInt32,
        DstAS UInt32,

        EType UInt32,
        Proto UInt32,

        SrcPort UInt32,
        DstPort UInt32,

        Bytes UInt64,
        Packets UInt64
    ) ENGINE = Kafka()
    SETTINGS
        kafka_broker_list = 'localhost:9092',
        kafka_topic_list = 'flows',
        kafka_group_name = 'clickhouse',
        kafka_format = 'Protobuf',
        kafka_schema = 'flow.proto:FlowMessage';

    CREATE TABLE IF NOT EXISTS flows_raw
    (
        Date Date,
        TimeReceived DateTime,
        TimeFlowStart DateTime,

        SequenceNum UInt32,
        SamplingRate UInt64,
        SamplerAddress FixedString(16),

        SrcAddr FixedString(16),
        DstAddr FixedString(16),

        SrcAS UInt32,
        DstAS UInt32,

        EType UInt32,
        Proto UInt32,

        SrcPort UInt32,
        DstPort UInt32,

        Bytes UInt64,
        Packets UInt64
    ) ENGINE = MergeTree()
    PARTITION BY Date
    ORDER BY TimeReceived;

    CREATE MATERIALIZED VIEW IF NOT EXISTS flows_raw_view TO flows_raw 
    AS SELECT
        toDate(TimeReceived) AS Date,
        *
       FROM flows;

    CREATE TABLE IF NOT EXISTS flows_5m
    (
        Date Date,
        Timeslot DateTime,

        SrcAS UInt32,
        DstAS UInt32,

        ETypeMap Nested (
            EType UInt32,
            Bytes UInt64,
            Packets UInt64,
            Count UInt64
        ),

        Bytes UInt64,
        Packets UInt64,
        Count UInt64
    ) ENGINE = SummingMergeTree()
    PARTITION BY Date
    ORDER BY (Date, Timeslot, SrcAS, DstAS, \`ETypeMap.EType\`);

    CREATE MATERIALIZED VIEW IF NOT EXISTS flows_5m_view TO flows_5m 
    AS
        SELECT
            Date,
            toStartOfFiveMinute(TimeReceived) AS Timeslot,
            SrcAS,
            DstAS,

            [EType] AS \`ETypeMap.EType\`,
            [Bytes] AS \`ETypeMap.Bytes\`,
            [Packets] AS \`ETypeMap.Packets\`,
            [Count] AS \`ETypeMap.Count\`,

            sum(Bytes) AS Bytes,
            sum(Packets) AS Packets,
            count() AS Count

        FROM flows_raw
        GROUP BY Date, Timeslot, SrcAS, DstAS, \`ETypeMap.EType\`;

EOSQL
lspgn commented 3 years ago

(FYI please use goflow2 as I am sometimes checking issues on this repo but I am not maintaning it here). Which version of GoFlow are you using? Are you doing any enrichment? Could you try with GoFlow2 as well?

lspgn commented 2 years ago

Are you setting -proto.fixedlen=true when you're running GoFlow?

freedomwarrior commented 2 years ago

Are you setting -proto.fixedlen=true when you're running GoFlow?

Yea, it works, thank you! One more thing, is it better to use goflow2 then goflow?

lspgn commented 2 years ago

I made a few updates to GoFlow2 and I do not have access to push Dockerfiles for GoFlow so I would advise GoFlow2.