Open sempervictus opened 3 years ago
Thanks for filing @sempervictus, we'll be addressing shortly!
@zsherman: thanks a ton, utterly buried and can't get to it myself for a while.
@sempervictus @zsherman Was there any traction on this? I'm looking to use vector to process SFLOW data to replace our Filebeat collectors.
@gaby: I had started collecting netflow and sflow processing libraries and stubbing out standalone code to test functionality - unfortunately i have relatively little time available for FOSS right now and had to divert effort. It seems pretty viable, but likely a week or two of work to get to production state by someone who knows their way around Rust and Vector (flow processors tend to have some churn as various vendor attributes are requested by users). @zsherman: any chance the development team might have cycles to tackle the use case? Seems i'm not the only one interested in using Vector for these data flows. Thanks
We agree this would be a nice use case, but it not currently in our planned work.
@sempervictus What caught my attention is that Datadog already has a Netflow collector through their agent. https://docs.datadoghq.com/network_monitoring/devices/netflow/
I can send data Netflow/Sflow data to Vector through a Socket Sink, but there's no way to parse. Maybe having a Netflow/Flow parser/decoder in the transform area would be a good start.
any news on this?
Have not had time to implement or do much prototyping though at this point thinking a deku slicer/extractor (or binrw) which allows for framing the protocol packets to extract would be the way to go.
@sempervictus Is that something that could be done with VRL?
TCP/UDP/Socket Source Parse/Format into SFLOW/Netflow with VRL
Its a binary format, so field-extractions specific to the protocols (and their versions) would be preferable IMO.
@sempervictus Havent tested this, but I asked GPT4o how I could parse/format netflow using VRL and it came up with this for v5:
[transforms.parse_netflow]
type = "remap"
inputs = ["tcp"]
source = """
.netflow_data = {
"version": to_int!(substring(.message, 0, 2)),
"count": to_int!(substring(.message, 2, 4)),
"sys_uptime": to_int!(substring(.message, 4, 8)),
"unix_secs": to_int!(substring(.message, 8, 12)),
"unix_nsecs": to_int!(substring(.message, 12, 16)),
"flow_sequence": to_int!(substring(.message, 16, 20)),
"engine_type": to_int!(substring(.message, 20, 21)),
"engine_id": to_int!(substring(.message, 21, 22)),
"sampling_interval": to_int!(substring(.message, 22, 24)),
"records": []
}
.record_offset = 24
while .record_offset < byte_length(.message) {
.record = {
"srcaddr": ipv4_to_str!(substring(.message, .record_offset, .record_offset + 4)),
"dstaddr": ipv4_to_str!(substring(.message, .record_offset + 4, .record_offset + 8)),
"nexthop": ipv4_to_str!(substring(.message, .record_offset + 8, .record_offset + 12)),
"input": to_int!(substring(.message, .record_offset + 12, .record_offset + 14)),
"output": to_int!(substring(.message, .record_offset + 14, .record_offset + 16)),
"dPkts": to_int!(substring(.message, .record_offset + 16, .record_offset + 20)),
"dOctets": to_int!(substring(.message, .record_offset + 20, .record_offset + 24)),
"first": to_int!(substring(.message, .record_offset + 24, .record_offset + 28)),
"last": to_int!(substring(.message, .record_offset + 28, .record_offset + 32)),
"srcport": to_int!(substring(.message, .record_offset + 32, .record_offset + 34)),
"dstport": to_int!(substring(.message, .record_offset + 34, .record_offset + 36)),
"pad1": to_int!(substring(.message, .record_offset + 36, .record_offset + 37)),
"tcp_flags": to_int!(substring(.message, .record_offset + 37, .record_offset + 38)),
"prot": to_int!(substring(.message, .record_offset + 38, .record_offset + 39)),
"tos": to_int!(substring(.message, .record_offset + 39, .record_offset + 40)),
"src_as": to_int!(substring(.message, .record_offset + 40, .record_offset + 42)),
"dst_as": to_int!(substring(.message, .record_offset + 42, .record_offset + 44)),
"src_mask": to_int!(substring(.message, .record_offset + 44, .record_offset + 45)),
"dst_mask": to_int!(substring(.message, .record_offset + 45, .record_offset + 46)),
"pad2": to_int!(substring(.message, .record_offset + 46, .record_offset + 48))
}
.netflow_data.records << .record
.record_offset = .record_offset + 48
}
"""
It this works, it might be a path into integrating it with VRL.
If we can do dependent offsets (if field A is greater than 5 then field F will exist, otherwise field E will extend into where we expect F to fall if its 5 or below) then that might be an approach we can take. Definitely worth investigating
This is much closer, still needs some work: `# Define the protocol map .protocol_map = { "1": "ICMP", "6": "TCP", "17": "UDP" }
.prot_hex, prot_err = slice(.message, 69, 70)
if prot_err == null { .prot = parse_int(.prot_hex, base: 16) ?? 0 .prot_str = to_string(.prot)
# Fetch the protocol name from the protocol map
.protocol_name = get([.protocol_map], [.prot_str]) ?? "Unknown"
} else { .protocol_name = "EXTRACTION_ERROR" }
.tcp_flags_hex, extract_err = slice(.message, 122, 124)
if extract_err == null { .tcp_flags = parse_int(.tcp_flags_hex, base: 16) ?? 0
.tcp_flags_parsed = []
if .tcp_flags >= 32 { .tcp_flags_parsed = push(.tcp_flags_parsed, "URG"); .tcp_flags = .tcp_flags - 32 }
if .tcp_flags >= 16 { .tcp_flags_parsed = push(.tcp_flags_parsed, "ACK"); .tcp_flags = .tcp_flags - 16 }
if .tcp_flags >= 8 { .tcp_flags_parsed = push(.tcp_flags_parsed, "PSH"); .tcp_flags = .tcp_flags - 8 }
if .tcp_flags >= 4 { .tcp_flags_parsed = push(.tcp_flags_parsed, "RST"); .tcp_flags = .tcp_flags - 4 }
if .tcp_flags >= 2 { .tcp_flags_parsed = push(.tcp_flags_parsed, "SYN"); .tcp_flags = .tcp_flags - 2 }
if .tcp_flags >= 1 { .tcp_flags_parsed = push(.tcp_flags_parsed, "FIN") }
if length(.tcp_flags_parsed) == 0 {
.tcp_flags_parsed = "NO_FLAGS"
} else {
.tcp_flags_parsed = join!(.tcp_flags_parsed, ",")
}
} else { .tcp_flags_parsed = "EXTRACTION_ERROR" }
.first = parse_int(.first_bytes, 16) ?? 0 .last = parse_int(.last_bytes, 16) ?? 0 .duration = .last - .first
.version_bytes, version_err = slice(.message, 0, 2) .count_bytes, count_err = slice(.message, 2, 4) .sys_uptime_bytes, sys_uptime_err = slice(.message, 4, 8) .unix_secs_bytes, unix_secs_err = slice(.message, 8, 12) .unix_nsecs_bytes, unix_nsecs_err = slice(.message, 12, 16) .flow_sequence_bytes, flow_sequence_err = slice(.message, 16, 20) .engine_type_bytes, engine_type_err = slice(.message, 20, 21) .engine_id_bytes, engine_id_err = slice(.message, 21, 22) .sampling_interval_bytes, sampling_interval_err = slice(.message, 22, 24) .srcaddr_bytes, srcaddr_err = slice(.message, 24, 28) .dstaddr_bytes, dstaddr_err = slice(.message, 28, 32) .nexthop_bytes, nexthop_err = slice(.message, 32, 36) .input_bytes, input_err = slice(.message, 36, 38) .output_bytes, output_err = slice(.message, 38, 40) .dPkts_bytes, dPkts_err = slice(.message, 40, 44) .dOctets_bytes, dOctets_err = slice(.message, 44, 48) .srcport_bytes, srcport_err = slice(.message, 56, 58) .dstport_bytes, dstport_err = slice(.message, 58, 60) .pad1_bytes, pad1_err = slice(.message, 60, 61) .tcp_flags_bytes, tcp_flags_err = slice(.message, 61, 62) .prot_bytes, prot_err = slice(.message, 62, 63) .tos_bytes, tos_err = slice(.message, 63, 64) .src_as_bytes, src_as_err = slice(.message, 64, 66) .dst_as_bytes, dst_as_err = slice(.message, 66, 68) .src_mask_bytes, src_mask_err = slice(.message, 68, 69) .dst_mask_bytes, dst_mask_err = slice(.message, 69, 70) .pad2_bytes, pad2_err = slice(.message, 70, 72)
.version = if version_err != null { -1 } else { parse_int(.version_bytes, 16) ?? -1 } .count = if count_err != null { -1 } else { parse_int(.count_bytes, 16) ?? -1 } .sys_uptime = if sys_uptime_err != null { -1 } else { parse_int(.sys_uptime_bytes, 16) ?? -1 } .unix_secs = if unix_secs_err != null { -1 } else { parse_int(.unix_secs_bytes, 16) ?? -1 } .unix_nsecs = if unix_nsecs_err != null { -1 } else { parse_int(.unix_nsecs_bytes, 16) ?? -1 } .flow_sequence = if flow_sequence_err != null { -1 } else { parse_int(.flow_sequence_bytes, 16) ?? -1 } .engine_type = if engine_type_err != null { -1 } else { parse_int(.engine_type_bytes, 16) ?? -1 } .engine_id = if engine_id_err != null { -1 } else { parse_int(.engine_id_bytes, 16) ?? -1 } .sampling_interval = if sampling_interval_err != null { -1 } else { parse_int(.sampling_interval_bytes, 16) ?? -1 }
.srcaddr = if srcaddr_err != null { "0.0.0.0" } else { .part1_bytes, part1_err = slice(.srcaddr_bytes, 0, 1) .part2_bytes, part2_err = slice(.srcaddr_bytes, 1, 2) .part3_bytes, part3_err = slice(.srcaddr_bytes, 2, 3) .part4_bytes, part4_err = slice(.srcaddr_bytes, 3, 4) if part1_err != null || part2_err != null || part3_err != null || part4_err != null { "0.0.0.0" } else { .part1 = parse_int(.part1_bytes, 16) ?? 0 .part2 = parse_int(.part2_bytes, 16) ?? 0 .part3 = parse_int(.part3_bytes, 16) ?? 0 .part4 = parse_int(.part4_bytes, 16) ?? 0 to_string(.part1) + "." + to_string(.part2) + "." + to_string(.part3) + "." + to_string(.part4) } }
.dstaddr = if dstaddr_err != null { "0.0.0.0" } else { .part1_bytes, part1_err = slice(.dstaddr_bytes, 0, 1) .part2_bytes, part2_err = slice(.dstaddr_bytes, 1, 2) .part3_bytes, part3_err = slice(.dstaddr_bytes, 2, 3) .part4_bytes, part4_err = slice(.dstaddr_bytes, 3, 4) if part1_err != null || part2_err != null || part3_err != null || part4_err != null { "0.0.0.0" } else { .part1 = parse_int(.part1_bytes, 16) ?? 0 .part2 = parse_int(.part2_bytes, 16) ?? 0 .part3 = parse_int(.part3_bytes, 16) ?? 0 .part4 = parse_int(.part4_bytes, 16) ?? 0 to_string(.part1) + "." + to_string(.part2) + "." + to_string(.part3) + "." + to_string(.part4) } }
.nexthop = if nexthop_err != null { "0.0.0.0" } else { .part1_bytes, part1_err = slice(.nexthop_bytes, 0, 1) .part2_bytes, part2_err = slice(.nexthop_bytes, 1, 2) .part3_bytes, part3_err = slice(.nexthop_bytes, 2, 3) .part4_bytes, part4_err = slice(.nexthop_bytes, 3, 4) if part1_err != null || part2_err != null || part3_err != null || part4_err != null { "0.0.0.0" } else { .part1 = parse_int(.part1_bytes, 16) ?? 0 .part2 = parse_int(.part2_bytes, 16) ?? 0 .part3 = parse_int(.part3_bytes, 16) ?? 0 .part4 = parse_int(.part4_bytes, 16) ?? 0 to_string(.part1) + "." + to_string(.part2) + "." + to_string(.part3) + "." + to_string(.part4) } }
.input = if input_err != null { -1 } else { parse_int(.input_bytes, 16) ?? -1 } .output = if output_err != null { -1 } else { parse_int(.output_bytes, 16) ?? -1 } .dPkts = if dPkts_err != null { -1 } else { parse_int(.dPkts_bytes, 16) ?? -1 } .dOctets = if dOctets_err != null { -1 } else { parse_int(.dOctets_bytes, 16) ?? -1 } .srcport = if srcport_err != null { -1 } else { parse_int(.srcport_bytes, 16) ?? -1 } .dstport = if dstport_err != null { -1 } else { parse_int(.dstport_bytes, 16) ?? -1 } .pad1 = if pad1_err != null { -1 } else { parse_int(.pad1_bytes, 16) ?? -1 } .tcp_flags = if tcp_flags_err != null { -1 } else { parse_int(.tcp_flags_bytes, 16) ?? -1 } .prot = if prot_err != null { -1 } else { parse_int(.prot_bytes, 16) ?? -1 } .tos = if tos_err != null { -1 } else { parse_int(.tos_bytes, 16) ?? -1 } .src_as = if src_as_err != null { -1 } else { parse_int(.src_as_bytes, 16) ?? -1 } .dst_as = if dst_as_err != null { -1 } else { parse_int(.dst_as_bytes, 16) ?? -1 } .src_mask = if src_mask_err != null { -1 } else { parse_int(.src_mask_bytes, 16) ?? -1 } .dst_mask = if dst_mask_err != null { -1 } else { parse_int(.dst_mask_bytes, 16) ?? -1 } .pad2 = if pad2_err != null { -1 } else { parse_int(.pad2_bytes, 16) ?? -1 }
.netflow_data = { "version": .version, "count": .count, "sys_uptime": .sys_uptime, "unix_secs": .unix_secs, "unix_nsecs": .unix_nsecs, "flow_sequence": .flow_sequence, "engine_type": .engine_type, "engine_id": .engine_id, "sampling_interval": .sampling_interval, "srcaddr": .srcaddr, "dstaddr": .dstaddr, "nexthop": .nexthop, "input": .input, "output": .output, "dPkts": .dPkts, "dOctets": .dOctets, "first": .first, "last": .last, "duration": .duration, "srcport": .srcport, "dstport": .dstport, "pad1": .pad1, "tcp_flags": .tcp_flags, "prot": .prot, "tos": .tos, "src_as": .src_as, "dst_as": .dst_as, "src_mask": .src_mask, "dst_mask": .dst_mask, "pad2": .pad2, }
{ "message": .message, "prot": .prot, "prot_hex": .prot_hex, "prot_str": .prot_str, "protocol_map": .protocol_map, "protocol_name": .protocol_name, "tcp_flags": .tcp_flags, "tcp_flags_hex": .tcp_flags_hex, "tcp_flags_parsed": .tcp_flags_parsed, "duration": .duration, "netflow_data": .netflow_data } `
Use-cases
Network devices such as switches and firewalls export sFlow/NetFLow (or the perverse vendor variants) to provide ongoing trace of network traffic details without having to store and capture all of the data involved. Tools like ElastiFlow utilize LogStash to ingest and enrich flow data with DNS and AS lookups, GeoIP information, etc. Vector seems like a much better platform for the ingest, enrichment, and forwarding given its memory and concurrency models, as well as portability across systems for localized collection.
Proposal