netsampler / goflow2

High performance sFlow/IPFIX/NetFlow Collector
BSD 3-Clause "New" or "Revised" License
479 stars 110 forks source link

Unable to decode -format=bin files using provided flow.proto schema. #350

Closed jwahsnakupaku closed 1 month ago

jwahsnakupaku commented 1 month ago

Describe the bug Unable to decode protobuf files using provided flow.proto schema for v2.1.5 and v2.2.1.

To Reproduce Steps to reproduce the behavior: v2.2.1

  1. Run GoFlow2 with arguments '...'

    ./goflow2-2.2.1 -format=bin -listen=netflow://:5000/ -transport.file=./output2-v2.2.1.bin
    ./goflow2-2.2.1 -format=bin -listen=netflow://:5000/ -transport.file=./output2-v2.2.1-nosep.bin -transport.file.sep=
  2. Receive samples, in this case IPFix.

  3. See error

    
    cat output2-v2.2.1.bin | protoc --decode flowpb.FlowMessage goflow2-v2.2.1.proto
    Failed to parse input.

cat output2-v2.2.1-nosep.bin | protoc --decode flowpb.FlowMessage goflow2-v2.2.1.proto Failed to parse input.


Same happens for v2.1.5.

When using v1.3.7 it appears to decode the fields properly using the following commands.

./goflow2-1.3.7 -format=pb -listen=netflow://:5000/ -transport.file=./output2-v1.3.7-nosep.bin -transport.file.sep= cat output2-v1.3.7-nosep.bin | protoc --decode flowpb.FlowMessage goflow2-v1.3.7.proto


**Expected behavior**
bin file is parsed correctly.

**Captures**
N/A

**Sampler device:**
Myriad of network devices, softflowd etc.

**GoFlow2:**
 - Version: 
 v2.2.1, v2.1.5
 - Environment: Compiled from source releases using;
` go version go1.21.11 (Red Hat 1.21.11-1.module_el8.10.0+3863+bb82df69) linux/amd64`
Build command
`go build  -o goflow2 cmd/goflow2/main.go`

protoc --version libprotoc 3.5.0


 - OS: [e.g. Linux Ubuntu Server 23.04]
 AlmaLinux8

**Additional context**
JSON decoding appears to work fine and is currently sending to Kafka without issue.
Reason I want to change to protobuf, is I'm seeing packet drops above 20kp/s (Probably around 150k actual flows) on a VM with ~20 cores and 32G of ram running v2.2.1.
Goal is to send to Kafka and be processed by NiFi, just testing manually with a file to rule out the issues I'm seeing in NiFi when attempting to decode.
Will try and generate some test packets I can submit for example, doubtful it is the source devices though.
lspgn commented 1 month ago

@jwahsnakupaku thanks for raising the issue. It is somewhat expected since the messages are prefixed by their length as varint. I don't have a flag to disable it but maybe in a future version.

Tools like Clickhouse require the message to be prefixed by this length but not every tool supports it.

Though, without a separator or length prefix, a series of protobufs cannot be splitted (a \n separator could also be found in a legitimate record with a value of 0x0a). Kafka provides this delimitation.

Not sure how Nifi works but there's a guide for Java: https://cwiki.apache.org/confluence/display/GEODE/Delimiting+Protobuf+Messages

GoFlow2 code supports disabling the prefix: https://github.com/netsampler/goflow2/blob/026a16f92e771aaf017c46da0221d907957464b7/producer/proto/messages.go#L30

jwahsnakupaku commented 1 month ago

Thanks for your help! Flipping that boolean made it all work, those links helped a fair bit too in understanding what I was doing wrong.

Might just be because I'm very new to protobuf, but I found these to be useful in getting everything working, leaving it here in case someone else finds it useful. This code to split up the .bin file so I could get each record individually to use with some tools. Just some garbage I got chatgpt to spit out.

import os
import sys

def read_varint(f):
    """Read a Base128 Varint from the file and return it as an integer."""
    shift = 0
    result = 0
    while True:
        byte = f.read(1)
        if len(byte) == 0:
            raise EOFError("Unexpected end of file while reading varint.")

        byte_value = ord(byte)
        result |= (byte_value & 0x7F) << shift
        shift += 7
        if not (byte_value & 0x80):
            break
    return result

def split_protobuf_file(filename, output_folder):
    """Splits a Protobuf file with length-prefixed messages into individual files."""
    # Create output directory if it doesn't exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    with open(filename, 'rb') as f:
        part = 1
        while True:
            try:
                # Read the length prefix using varint decoding
                size = read_varint(f)
            except EOFError:
                break  # End of file, stop processing

            # Read the Protobuf message using the size from the length prefix
            protobuf_message = f.read(size)

            if len(protobuf_message) < size:
                print(f"Incomplete Protobuf message for part {part}.")
                break  # End of file or truncated file

            # Write the protobuf message to a new file
            output_filename = os.path.join(output_folder, f"{os.path.basename(filename)}[{part}].pb")
            with open(output_filename, 'wb') as output_file:
                output_file.write(protobuf_message)

            print(f"Written part {part} to {output_filename}")
            part += 1

if __name__ == "__main__":
    # Check if filename argument is passed
    if len(sys.argv) < 3:
        print("Usage: python split_protobuf.py <input_filename> <output_folder>")
        sys.exit(1)

    # Get the input filename and output folder from command line arguments
    input_filename = sys.argv[1]
    output_folder = sys.argv[2]

    # Check if the input file exists
    if os.path.exists(input_filename):
        os.mkdir(output_folder)
        split_protobuf_file(input_filename, output_folder)
    else:
        print(f"File {input_filename} not found!")
        sys.exit(1)

Then used this to just get something printing. https://github.com/nccgroup/blackboxprotobuf

Reckon it'd be feasible to write the IP addresses out as IPs instead of bytes in protobuf? Using a different render or something in the mapping file? Pretty certain I can do it in NiFi, just saves me a transform step there if I can do it in goflow2. Thanks for all your work, I've been migrating from https://github.com/Edgio/vflow as the format it outputs to is pretty horrendous to flatten at scale for the tools I've got.

lspgn commented 1 month ago

Reckon it'd be feasible to write the IP addresses out as IPs instead of bytes in protobuf? Using a different render or something in the mapping file?

Unfortunately, the rendering is only accessible to JSON/text outputs from the bytes in the protobuf. For performance reasons, it's a copy of the bytes in the Protobuf Producer from the decoded sample. Since I'm also counting on the backend to be able to read it. I think it would be too much of a refactor to allow swapping protobuf fields (eg: src_addr would go from bytes to a string and increase from 4/16 bytes to up to 39 characters, the proto integer becomes a string, etc.) But I think in your case, a consumer (either pipe or via Kafka/files) is the most straightforward to achieve this.

Anecdotally, regarding vflow, they decode and render the sample in JSON without extracting (the role of the "Protobuf Producer" in this repo). This also exists with the -producer=raw setting which only does JSON. Thank you for the feedback, I appreciate to see how people use GoFlow2.

lspgn commented 1 month ago

Will close as answered. Feel free to comment if you have more questions