mozillazg / ptcpdump

Process-aware, eBPF-based tcpdump
MIT License
172 stars 6 forks source link

Truncated packets #77

Open buger opened 1 week ago

buger commented 1 week ago

Hi! Any reason why you have fixed max packet payload size to 1500? I have MTUs which is bigger then this, like 65536, and it for sure gets cut. Do you think it can be set automatically based on MTU? I guess it mostly affects memory usage?

mozillazg commented 1 week ago

Any reason why you have fixed max packet payload size to 1500?

Because it's the recommended value for most interfaces and the -s flag is not implemented yet.

Do you think it can be set automatically based on MTU?

I'll try to implement the -s/--snapshot-length flag like tcpdump, after that, this issue will be fixed.

mozillazg commented 1 week ago

@buger Please try version of #78, it implemented the -s/--snapshot-length flag and allowed to change the max packet payload size (default 262144, same as tcpdump) : https://github.com/mozillazg/ptcpdump/actions/runs/9647201633

buger commented 6 days ago

Thanks! I also tried to play with it by myself, to implement similar functionality without limit (well almost).

#define MAX_PACKET_SIZE 65535
#define MIN(a, b) ((a) < (b) ? (a) : (b))

static __always_inline int process_packet_chunk(struct __sk_buff *skb, u32 packet_id, u32 *offset, u16 *chunk_index, bool egress, u32 packet_size) {
    struct packet_event_t *event;
    u32 chunk_size;

    TRACE("Processing chunk: packet_id=%u, offset=%u, packet_size=%u",
               packet_id, *offset, packet_size);

    // Validate offset
    if (*offset >= packet_size) {
        TRACE("Invalid offset: packet_id=%u, offset=%u, packet_size=%u",
                   packet_id, *offset, packet_size);
        return -1;
    }

    // Calculate and validate chunk_size
    chunk_size = MIN(packet_size - *offset, MAX_PAYLOAD_SIZE);
    if (chunk_size == 0) {
        TRACE("Zero chunk size: packet_id=%u, offset=%u, packet_size=%u",
                   packet_id, *offset, packet_size);
        return -1;
    }

    // Additional check for verifier
    if (chunk_size > MAX_PAYLOAD_SIZE) {
        bpf_printk("Chunk size too large: packet_id=%u, offset=%u, chunk_size=%u",
                   packet_id, *offset, chunk_size);
        return -1;
    }

    TRACE("Chunk size calculated: packet_id=%u, offset=%u, chunk_size=%u",
               packet_id, *offset, chunk_size);

    event = bpf_ringbuf_reserve(&packet_flow, sizeof(struct packet_event_t), 0);
    if (!event) {
        TRACE("Failed to reserve ringbuf: packet_id=%u, offset=%u, chunk_size=%u",
                   packet_id, *offset, chunk_size);
        return -1;
    }

    __builtin_memset(&event->meta, 0, sizeof(event->meta));

    if (chunk_size < 2) {
        chunk_size = 2;
    }

    if (*offset + chunk_size > packet_size) {
        chunk_size = 1;
    }

    // Use bpf_skb_load_bytes to read packet data
    if (bpf_skb_load_bytes(skb, *offset, event->payload, chunk_size) < 0) {
        TRACE("Failed to load packet data: packet_id=%u, offset=%u, chunk_size=%u",
                   packet_id, *offset, chunk_size);
        bpf_ringbuf_discard(event, 0);
        return -1;
    }

    event->meta.packet_id = packet_id;
    event->meta.packet_type = egress ? EGRESS_PACKET : INGRESS_PACKET;
    event->meta.timestamp = bpf_ktime_get_ns();
    event->meta.ifindex = skb->ifindex;
    event->meta.packet_size = packet_size;
    event->meta.payload_len = chunk_size;
    event->meta.chunk_index = *chunk_index;
    event->meta.is_last_chunk = (*offset + chunk_size) >= packet_size ? 1 : 0;

    // if (pid_meta.pid > 0) {
    //     event->meta.process.pid = pid_meta.pid;
    //     event->meta.process.mntns_id = pid_meta.mntns_id;
    //     event->meta.process.netns_id = pid_meta.netns_id;
    //     __builtin_memcpy(&event->meta.process.cgroup_name, &pid_meta.cgroup_name, sizeof(pid_meta.cgroup_name));
    // }

    bpf_ringbuf_submit(event, 0);

    *offset += chunk_size;
    (*chunk_index)++;

    TRACE("Successfully processed chunk: packet_id=%u, offset=%u, chunk_size=%u, chunk_index=%u",
               packet_id, *offset, chunk_size, *chunk_index);

    return 0;
}

static __always_inline void handle_tc(struct __sk_buff *skb, bool egress) {
    // Ensure we can access the packet data
    if (bpf_skb_pull_data(skb, 0) < 0) {
        // bpf_printk("Failed to pull skb data in handle_tc");
        return;
    }

    u32 packet_size = skb->len;
    long skb_size = (void *)(long)skb->data_end - (void *)(long)skb->data;

    if (!pcap_filter((void *)(long)skb->data, (void *)(long)skb->data_end, (void *)skb, (void *)(long)skb->data, (void *)(long)skb->data_end)) {
        // bpf_printk("Packet filtered out by pcap_filter");
        return;
    }

    if (packet_size <= skb_size) {
        // We need packets only with data in it
        // bpf_printk("Packet size is less than packet_size: packet_size=%u, skb_size=%ld", packet_size, skb_size);
        return;
    }

    // struct process_meta_t pid_meta = {0};
    // if (get_pid_meta(skb, &pid_meta, egress) < 0) {
    //     TRACE("Failed to get pid meta");
    //     return;
    // }

    u32 offset = 0;
    u16 chunk_index = 0;
    u32 packet_id = bpf_get_prandom_u32();

    TRACE("Starting packet processing: packet_id=%u, packet_size=%u, skb packet size: %u", packet_id, packet_size, skb_size);

    #pragma unroll
    for (int i = 0; i < 4; i++) {
        if (offset >= packet_size) {
            TRACE("Reached end of packet: packet_id=%u, offset=%u, packet_size=%u",
                       packet_id, offset, packet_size);
            break;
        }
        if (process_packet_chunk(skb, packet_id, &offset, &chunk_index, egress, packet_size) < 0) {
            TRACE("Failed to process chunk: packet_id=%u, chunk_index=%u", packet_id, chunk_index);
            break;
        }
    }

    TRACE("Finished packet processing: packet_id=%u, final_offset=%u, final_chunk_index=%u",
               packet_id, offset, chunk_index);
}

As you can see I switch to ring buff here, because were unable to make it work current way, and after implemented packet reconcilation logic on app level:


type BpfPacketEvent struct {
    BpfPacketEventT

    FullPayload []byte
}

type packetChunk struct {
    event      BpfPacketEventT
    receivedAt time.Time
}

type packetAssembler struct {
    chunks          map[uint32][]packetChunk
    mutex           sync.Mutex
    cleanupInterval time.Duration
}

func newPacketAssembler() *packetAssembler {
    pa := &packetAssembler{
        chunks:          make(map[uint32][]packetChunk),
        cleanupInterval: 30 * time.Second,
    }
    go pa.periodicCleanup()
    return pa
}

func (pa *packetAssembler) addChunk(event BpfPacketEventT) (*BpfPacketEvent, bool) {
    pa.mutex.Lock()
    defer pa.mutex.Unlock()

    chunk := packetChunk{
        event:      event,
        receivedAt: time.Now(),
    }

    packetID := event.Meta.PacketId
    pa.chunks[packetID] = append(pa.chunks[packetID], chunk)

    if event.Meta.IsLastChunk && event.Meta.ChunkIndex == uint8(len(pa.chunks[packetID])-1) {
        completeEvent := pa.assemblePacket(packetID)
        delete(pa.chunks, packetID)

        if event.Meta.PacketSize > 6000 {
            log.Warn().Msgf("at the moment EBPF input supports MTU only up to 6000, and your single packet size is bigger than it; actual size: %d", event.Meta.PacketSize)
        }
        return completeEvent, true
    }

    return nil, false
}

func (pa *packetAssembler) assemblePacket(packetID uint32) *BpfPacketEvent {
    chunks := pa.chunks[packetID]

    if len(chunks) == 1 {
        // If there's only one chunk, reuse its payload without copying
        return &BpfPacketEvent{
            BpfPacketEventT: chunks[0].event,
            FullPayload:     chunks[0].event.Payload[:chunks[0].event.Meta.PayloadLen],
        }
    }

    totalSize := 0
    for _, chunk := range chunks {
        totalSize += int(chunk.event.Meta.PayloadLen)
    }

    completeEvent := BpfPacketEvent{
        BpfPacketEventT: chunks[0].event, // Preserve metadata from the first chunk
        FullPayload:     make([]byte, totalSize),
    }

    offset := 0
    for _, chunk := range chunks {
        copy(completeEvent.FullPayload[offset:], chunk.event.Payload[:chunk.event.Meta.PayloadLen])
        offset += int(chunk.event.Meta.PayloadLen)
    }

    return &completeEvent
}

func (pa *packetAssembler) periodicCleanup() {
    ticker := time.NewTicker(pa.cleanupInterval)
    defer ticker.Stop()

    for range ticker.C {
        pa.cleanup()
    }
}

func (pa *packetAssembler) cleanup() {
    pa.mutex.Lock()
    defer pa.mutex.Unlock()

    now := time.Now()
    for packetID, chunks := range pa.chunks {
        if now.Sub(chunks[0].receivedAt) > pa.cleanupInterval {
            delete(pa.chunks, packetID)
        }
    }
}

func (b *BPF) PullPacketEvents(ctx context.Context, chanSize int) (<-chan BpfPacketEvent, error) {
    // reader, err := perf.NewReader(b.objs.PacketEvents, 1500*1000)
    // if err != nil {
    //  return nil, xerrors.Errorf(": %w", err)
    // }
    ch := make(chan BpfPacketEvent, chanSize)
    go func() {
        defer close(ch)
        defer b.Close()
        // defer reader.Close()
        b.handlePacketEvents(ctx, ch)
    }()

    return ch, nil
}