phaag / go-nfdump

go-nfdump: A Go module to read and process nfdump files
BSD 2-Clause "Simplified" License
7 stars 1 forks source link

Add new feature: aggregation and order by #10

Open gabrielmocan opened 3 months ago

gabrielmocan commented 3 months ago

Hi Pete, so long in this project.

It's working very nice, but I may ask a few features?

My biggest problem currently are very large flow data, mostly due to DDoS attacks. I have a few exporters that sometimes push more than 300MB of data in a single minute, going above 3M flows in the nffile.

I would like to be able to aggregate fields, pretty much like the -A parameter from classic nfdump. Also, an equivalent to -O to order by the output.

My intention is to do some sort of downsampling in those cases. Some kind of 'aggregate by values < x'. I'm open to ideas also.

gabrielmocan commented 3 months ago

ORDER BY would be most desired by now, as aggregation can be done down the pipe.

phaag commented 3 months ago

Hi Gabe, Sure - it's always welcome!. I need to check, how I could this implement efficient. So far go-nfdump is a reader and nothing more. The -A aggregation is not that easy in Go, but let me see, what I can do. Just allow me some time to experiment.

gabrielmocan commented 3 months ago

@phaag no rush, I've managed to do some workarounds here to downsample, still, this a desired feature.

Do you think -O is easier than -A? -O would help me a lot when downsampling as I'm doing a 'lesser than x packets' cutoff logic. Ordering the output by packets without having to read all data blocks would optimize this process, as I would enter a 1:N downsampling loop as soon as cutoff point is reached.

For now I just read the entire nffile, then sort the slice and downsample records that have 'less than x packets'.

phaag commented 2 months ago

@gabrielmocan - I've created a new branch work for testing. Could you please checkout work for tests?

Changes in existing code: AllRecords() no longer returns a channel, but a chain object. This enables chaining processing filters - in this case for -O maybe for more in future. In order to get the final records, use Get() as the final chain element.

Example - simply list all records

if recordChannel, err := nffile.AllRecords().Get(); err != nil {
        fmt.Printf("Failed to process flows: %v\n", err)
    } else {
        for record := range recordChannel {
            record.PrintLine()
        }
    }

The new chain processing function is: OrderBy(type, direction) . This processing element adds the ordering of the records, equivalent to nfdump -O tstart:a or nfdump -O tstart:d as an example. Currently OrderBy is limited to tstart, tend, packets, bytes and direction can be ASCENDING or DESCENDING. It can be extended if needed. At the end of the chain Get() the records and process them as usually.

You will find some example code in the folder example/sorter

    if recordChannel, err := nffile.AllRecords().OrderBy("bytes", nfdump.DESCENDING).Get(); err != nil {
        fmt.Printf("Failed to process flows: %v\n", err)
    } else {
        for record := range recordChannel {
            record.PrintLine()
        }
    }

Please send me your feedback. With your feedback integrated, I can merge the work branch into main. A -A aggregation can be done the same way, but needs some work for an efficient hash table.

gabrielmocan commented 2 months ago

@phaag will do some testing and feedback to you. Thanks in advance!

phaag commented 2 months ago

@phaag no rush, I've managed to do some workarounds here to downsample, still, this a desired feature.

Do you think -O is easier than -A? -O would help me a lot when downsampling as I'm doing a 'lesser than x packets' cutoff logic. Ordering the output by packets without having to read all data blocks would optimize this process, as I would enter a 1:N downsampling loop as soon as cutoff point is reached.

For now I just read the entire nffile, then sort the slice and downsample records that have 'less than x packets'.

-A should be doable as well

gabrielmocan commented 2 months ago

@phaag after some testing, the function is working as expected. We can try -A, if viable.

gabrielmocan commented 2 months ago

@phaag after further testing, I noticed that if nffile has more than 1024*1024 records, the code panics.

I've tracked this down to these default values in orderby.go

// store all flow records into an array for later printing
// initial len - 1 meg
recordArray = make([]*FlowRecordV3, 1024*1024)
// store value to be sorted and index of appropriate flow record of
// recordArray. initial len - 1 meg
sortArray = make([]sortRecord, 1024*1024)

If I change these default values to greater than the flow count, panic is gone.

Could we create those slices based on nffile.StatRecord.Numflows? That would be an exact match. No need to resize.

Sample sent via e-mail.

phaag commented 2 months ago

It's fixed in work branch. Please test! Thanks

gabrielmocan commented 2 months ago

It's fixed in work branch. Please test! Thanks

It works just fine 😎