axboe / liburing

Library providing helpers for the Linux kernel io_uring support
MIT License
2.86k stars 402 forks source link

Unexpected Eventfd POLLIN Event Triggered When Receiving UDP Data #1265

Closed Degoah closed 1 week ago

Degoah commented 2 weeks ago

Description:

While sending UDP data to the server via a Python script, the POLLIN event on the eventfd file descriptor gets triggered unexpectedly. This is problematic because the eventfd is designed to be triggered only by a write operation performed by the signal handler (e.g., upon receiving a SIGINT or another signal), which should cause the completion event to be generated. However, no write operation is performed on the eventfd, yet the POLLIN event gets triggered when receiving UDP data.

Steps to Reproduce:

  1. Start the provided UDP server application, which uses liburing for receiving UDP data and for waiting on an eventfd.
  2. Use the provided Python script to send UDP packets to the server. Example usage of the Python script: python udp_sender.py 10 512
  3. Observe that when the server receives UDP packets, the eventfd's POLLIN event is unexpectedly triggered, even though no signal has been sent to invoke the signal handler, and thus no write has occurred on the eventfd.

Expected Behavior:

The POLLIN event on the eventfd should only be triggered when a signal handler writes to it, such as during a SIGINT (Ctrl+C) event. It should not be triggered merely by receiving UDP data via the recvmsg operation in io_uring.

Relevant Code:

Questions:

  1. Could this issue be related to the interaction between io_uring's recvmsg and the eventfd?
  2. Is there a potential misconfiguration or unexpected side effect causing the eventfd to trigger when receiving UDP data?
  3. Are there any known issues where recvmsg or poll on unrelated file descriptors might cause false triggering of eventfd POLLIN?

Workaround: Setting user_data on the recvmsg-specific SQE request prevents the issue. However, this behavior is unexpected, and it isn't clear why omitting user_data for the recmsg-related sqe request causes the issue to arise.

Server Application Code Overview:

#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <liburing.h>
#include <fcntl.h>
#include <sys/eventfd.h>
#include <signal.h>

constexpr int UDP_PORT = 5000;
const char* UDP_ADDR = "127.0.0.1";
constexpr size_t BUFFER_SIZE = 1024;
constexpr uint64_t EVENTFD_STOP_SIGNAL = 1;  // Value written to eventfd to signal a stop

int udp_socket;   // UDP socket to be shared between threads
int event_fd;     // Eventfd for signaling
io_uring ring;    // io_uring instance

// Signal handler that writes to the eventfd to trigger shutdown
void signal_handler(int) {
    uint64_t value = EVENTFD_STOP_SIGNAL;
    write(event_fd, &value, sizeof(value));
}

// Setup signal handling for SIGINT (Ctrl+C)
void setup_signal_handler() {
    struct sigaction sa{};
    sa.sa_handler = signal_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    if (sigaction(SIGINT, &sa, nullptr) == -1) {
        perror("Failed to set signal handler");
        exit(EXIT_FAILURE);
    }
}

// Setup UDP socket for receiving data
void setup_udp_socket(int &udp_socket) {
    udp_socket = socket(AF_INET, SOCK_DGRAM, 0);
    if (udp_socket == -1) {
        perror("Failed to create UDP socket");
        exit(EXIT_FAILURE);
    }

    struct sockaddr_in server_addr{};
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = inet_addr(UDP_ADDR);
    server_addr.sin_port = htons(UDP_PORT);

    if (bind(udp_socket, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        perror("Failed to bind UDP socket");
        close(udp_socket);
        exit(EXIT_FAILURE);
    }
}

// Setup eventfd for signaling
void setup_eventfd(int &event_fd) {
    event_fd = eventfd(0, EFD_NONBLOCK);  // Non-blocking eventfd
    if (event_fd == -1) {
        perror("Failed to create eventfd");
        exit(EXIT_FAILURE);
    }
}

// Main function to setup io_uring and process events
void run_server() {
    // Initialize io_uring
    if (io_uring_queue_init(8, &ring, 0) != 0) {
        perror("Failed to initialize io_uring");
        exit(EXIT_FAILURE);
    }

    char buffer[BUFFER_SIZE];  // Buffer for receiving UDP data
    struct sockaddr_in client_addr{};

    // Prepare UDP iovec and msghdr for receiving data
    struct iovec iov;
    iov.iov_base = buffer;
    iov.iov_len = BUFFER_SIZE;

    struct msghdr msg{};
    msg.msg_name = &client_addr;
    msg.msg_namelen = sizeof(client_addr);
    msg.msg_iov = &iov;
    msg.msg_iovlen = 1;

    // Add the UDP receive request to io_uring
    io_uring_sqe* sqe = io_uring_get_sqe(&ring);
    io_uring_prep_recvmsg(sqe, udp_socket, &msg, 0);  // UDP recvmsg
    io_uring_submit(&ring);

    // Add the eventfd poll request to io_uring
    sqe = io_uring_get_sqe(&ring);
    io_uring_prep_poll_add(sqe, event_fd, POLLIN);  // Poll for eventfd
    sqe->user_data = 42;
    io_uring_submit(&ring);

    while (true) {
        io_uring_cqe* cqe;
        int ret = io_uring_wait_cqe(&ring, &cqe);
        if (ret < 0) {
            perror("io_uring_wait_cqe");
            break;
        }

        // Handle eventfd signal
        if (cqe->user_data == 42) {
            uint64_t eventfd_value;
            read(event_fd, &eventfd_value, sizeof(eventfd_value));
            std::cout << "Received eventfd signal, shutting down..." << std::endl;
            break;
        }

        // Handle UDP message reception
        if (cqe->res > 0) {
            std::cout << "Received " << cqe->res << " bytes from "
                      << inet_ntoa(client_addr.sin_addr) << ":" << ntohs(client_addr.sin_port)
                      << " - " << std::string(buffer, cqe->res) << std::endl;

            // Re-submit the recvmsg request to continue receiving messages
            io_uring_sqe* sqe = io_uring_get_sqe(&ring);
            io_uring_prep_recvmsg(sqe, udp_socket, &msg, 0);
            io_uring_submit(&ring);
        }

        io_uring_cqe_seen(&ring, cqe);  // Mark CQE as processed
    }

    // Cleanup
    io_uring_queue_exit(&ring);
    close(udp_socket);
    close(event_fd);
}

int main() {
    setup_signal_handler();
    setup_udp_socket(udp_socket);
    setup_eventfd(event_fd);

    run_server();  // Run the UDP server with eventfd for shutdown

    return 0;
}

Python Script for Reproducing the Issue:

import socket
import argparse

def send_udp_packets(num_packets, payload_size, server_address='127.0.0.1', server_port=5000):
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    payload = b'a' * payload_size

    for i in range(100):  # Repeat sending the packets 100 times
        for j in range(num_packets):
            udp_socket.sendto(payload, (server_address, server_port))
            print(f"Round {i+1} - Packet {j+1} sent with payload size {payload_size} bytes")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Send UDP packets to a server 100 times.')
    parser.add_argument('num_packets', type=int, help='Number of UDP packets to send per round')
    parser.add_argument('payload_size', type=int, help='Size of the payload for each UDP packet (in bytes)')
    args = parser.parse_args()

    send_udp_packets(args.num_packets, args.payload_size)

Environment:

Kernel version: 6.1.20+g92eeaaec35c7+p0
axboe commented 1 week ago

If you don't set sqe->user_data, then it'll contain whatever was in it before. Could this perhaps be your issue? Remember that it's a ring, and your size is 8, so whenever you roll around and get an sqe that you've used before (eg the 9th), then if you don't set user_data in the sqe, it'll be set to whatever the last user set it to.

Degoah commented 1 week ago

Thx. U r absolutely right! Cheers!

axboe commented 1 week ago

Closing this one up, I'll add a note to the man pages.