google / gopacket

Provides packet processing capabilities for Go
BSD 3-Clause "New" or "Revised" License
6.22k stars 1.11k forks source link

High memory usage in reaseembly and tcpreassembly #444

Open styleex opened 6 years ago

styleex commented 6 years ago

I wrote a small program for the tcpreassembly test and started traffic over FTP (ftp inside a gigabit LAN). On ftp uploaded 1 file (iso image 533 megabytes). As a result, the amount of memory used increased 78612 kb to 877 megabytes. If I upload iso a second time, then memory will grow to 1664 megabytes.

The test and reassembly package and the tcpreassembly package are the same result.

If you comment assembler.Assemble, the memory consumption becomes normal. Those. the problem is where it is in reassembly

Any idea why this happens?

package main

import (
    "github.com/google/gopacket/reassembly"
    "github.com/google/gopacket"
    "github.com/google/gopacket/layers"
    "log"
    "github.com/google/gopacket/afpacket"
)

type NullStream struct {
}

func (d *NullStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, ackSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool {
    return true
}

func (d *NullStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.AssemblerContext) {
}

func (d *NullStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
    return true
}

type myFactory struct {
}

func (h *myFactory) New(netFlow, tcpFlow gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
    return &NullStream{}
}

func main() {
    inputAFpacket, err := afpacket.NewTPacket(afpacket.OptInterface("enp1s0"))
    if err != nil {
        log.Fatal(err)
    }
    defer inputAFpacket.Close()

    streamFactory := &myFactory{}
    streamPool := reassembly.NewStreamPool(streamFactory)
    assembler := reassembly.NewAssembler(streamPool)

    for {
        pkt, _, err := inputAFpacket.ZeroCopyReadPacketData()
        if err != nil {
            log.Fatal(err)
        }

        packet := gopacket.NewPacket(pkt, layers.LayerTypeEthernet, gopacket.NoCopy)
        if packet.NetworkLayer() == nil || packet.TransportLayer() == nil || packet.TransportLayer().LayerType() != layers.LayerTypeTCP {
            continue
        }

        tcp, ok := packet.TransportLayer().(*layers.TCP)
        if !ok {
            continue
        }

        assembler.Assemble(packet.NetworkLayer().NetworkFlow(), tcp)
    }
}
gconnell commented 6 years ago

Without looking at the packets it's captured, my guess is there's a little bit of packet loss, and the assembler's waiting around for a packet to be delivered out-of-order somewhere in the early part of the stream. For a given TCP stream, if a packet is missed mid-way through, both libraries will cache the subsequent data until either the packet is seen, or until one of the Flush* methods is called.

If you'd like to debug exactly what's going on, there's flags in tcpassembly that will dump (a ton of verbose) log info about what the assembler is doing, see https://github.com/google/gopacket/blob/master/tcpassembly/assembly.go#L31. Reassembly also has (just one) flag: https://github.com/google/gopacket/blob/master/reassembly/tcpassembly.go#L42.

If you'd like, feel free to run either with the associated flags turned on and drop the logs in here, and I can see about trying to figure out what's going on from the exposed logs.

styleex commented 6 years ago

Now I do not have a test stand near my hand, but in the logs I saw many entries of the form "% v gap in sequence numbers (% v,% v) diff% v, storing into connection".

But I can not understand why this happens. Wireshark, for example, does not show any anomalies in traffic.

gconnell commented 6 years ago

To figure this out, let's first see if it's an afpacket problem or a reassembly problem. To do that, let's capture the traffic with tcpdump into a pcap file, then read the packets in from the file with pcapgo.NewReader. If that causes issues but is fine in wireshark, the problem is with the assembly code. If that works fine, the issue then appears due to packet loss from afpacket.

styleex commented 6 years ago

Yes, a great idea, I'll do it in two days

gconnell commented 6 years ago

Awesome! Thanks for the issue filing and debugging help, Антон!

styleex commented 6 years ago

When reading packages from a file, the memory also grows: at start 13 mb, at the end - 1089 mb (res). Program:

package main

import (
    "github.com/google/gopacket/reassembly"
    "github.com/google/gopacket"
    "github.com/google/gopacket/layers"
    "log"
    "os"
    "github.com/google/gopacket/pcapgo"
    "io"
    "time"
)

type NullStream struct {
}

func (d *NullStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, ackSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool {
    return true
}

func (d *NullStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.AssemblerContext) {
}

func (d *NullStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
    return true
}

type myFactory struct {
}

func (h *myFactory) New(netFlow, tcpFlow gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
    return &NullStream{}
}

func main() {
    // Create new reader:
    f, _ := os.Open("ftp_dump.pcap")
    defer f.Close()
    r, err := pcapgo.NewReader(f)
    if err != nil {
        log.Fatal(err)
    }

    streamFactory := &myFactory{}
    streamPool := reassembly.NewStreamPool(streamFactory)
    assembler := reassembly.NewAssembler(streamPool)

    time.Sleep(20 * time.Second)
    log.Println("start")

    for {
        //pkt, _, err := inputAFpacket.ZeroCopyReadPacketData()
        pkt, _, err := r.ReadPacketData()
        if err != nil {
            if err == io.EOF {
                break
            }
            log.Fatal(err)
        }

        packet := gopacket.NewPacket(pkt, layers.LayerTypeEthernet, gopacket.NoCopy)
        if packet.NetworkLayer() == nil || packet.TransportLayer() == nil || packet.TransportLayer().LayerType() != layers.LayerTypeTCP {
            continue
        }

        tcp, ok := packet.TransportLayer().(*layers.TCP)
        if !ok {
            continue
        }

        assembler.Assemble(packet.NetworkLayer().NetworkFlow(), tcp)
    }

    log.Println("end")
    time.Sleep(50 * time.Minute)
}
spitfire55 commented 4 years ago

For those from the interwebz that find this, you'll need to occasionally flush and close connections that haven't been completed/reassembled due to packet loss or out-of-order shenanigans. You can do this by regularly calling assembler.FlushCloseOlderThan(time.Time). For example, using time.Tick and channels:

ticker := time.Tick(time.Minute)
assembler := createAssembler() // assume a function exists that creates an assembler
for {
        select {
        case packet := <- packets: // assume packets is a channel created by gopacket.PacketSource.Packets()
            var tcp *layers.TCP
            if packet.TransportLayer() == nil || packet.TransportLayer().LayerType() != layers.LayerTypeTCP {
                continue
            }
            tcp = packet.TransportLayer().(*layers.TCP)
            assembler.Assemble(packet.NetworkLayer().NetworkFlow(), tcp)
        case <- ticker:
            assembler.FlushCloseOlderThan(time.Now().Add(time.Minute * -4))
        }
    }

If you wish to see a working example, my project gourmet has one.

lukemakeit commented 2 years ago

@styleex I encountered the same problem. After adding assembler.FlushCloseOlderThan(), the problem is still not solved. Later I added the following two lines of code and the problem was solved.

assembler.MaxBufferedPagesPerConnection = 500
assembler.MaxBufferedPagesTotal = 100000