ithewei / libhv

🔥 比libevent/libuv/asio更易用的网络库。A c/c++ network library for developing TCP/UDP/SSL/HTTP/WebSocket/MQTT client/server.
https://github.com/ithewei/libhv/wiki
BSD 3-Clause "New" or "Revised" License
6.69k stars 1.22k forks source link

libhv udp server on threads #433

Closed nominalval closed 11 months ago

nominalval commented 11 months ago

Hello. I am trying to build a simple UDP multi-threaded echo server in C using libhv (library is using non-blocking events).

Basically i am trying to combine https://github.com/ithewei/libhv/blob/master/examples/udp_echo_server.c with the threading of https://github.com/ithewei/libhv/blob/master/examples/multi-thread/one-acceptor-multi-workers.c

I do this because i will have high amount of traffic and multiple threads will be needed to handle that kind of stream.

SOURCE CODE:

#include "hloop.h"
#include "hsocket.h"
#include "hthread.h"

static const char* host = "127.0.0.1";
static int port = 8000;
static int thread_num = 4;
static hloop_t*  accept_loop = NULL;
static hloop_t** worker_loops = NULL;

static hloop_t* get_next_loop() {
    static int s_cur_index = 0;
    if (++s_cur_index >= thread_num) {
        s_cur_index = 0;
    }
    return worker_loops[s_cur_index % thread_num];
}

// not used for UDP
/*
static void on_close(hio_t* io) {
    printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
}
*/

// not used for UDP
/*
static void on_recv(hio_t* io, void* buf, int readbytes) {
    // echo
    hio_write(io, buf, readbytes);
}
*/

// not used for UDP
/*
static void new_conn_event(hevent_t* ev) {
    hloop_t* loop = ev->loop;
    hio_t* io = (hio_t*)hevent_userdata(ev);
    hio_attach(loop, io);

    char localaddrstr[SOCKADDR_STRLEN] = {0};
    char peeraddrstr[SOCKADDR_STRLEN] = {0};
    printf("tid=%ld connfd=%d [%s] <= [%s]\n",
            (long)hv_gettid(),
            (int)hio_fd(io),
            SOCKADDR_STR(hio_localaddr(io), localaddrstr),
            SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));

    hio_setcb_close(io, on_close);
    hio_setcb_read(io, on_recv);

    hio_read(io);
}
*/

// not used for UDP
/*
static void on_accept(hio_t* io) {
    hio_detach(io);

    hloop_t* worker_loop = get_next_loop();
    hevent_t ev;
    memset(&ev, 0, sizeof(ev));
    ev.loop = worker_loop;
    ev.cb = new_conn_event;
    ev.userdata = io;
    hloop_post_event(worker_loop, &ev);
}
*/

static void on_recvfrom(hio_t* io, void* buf, int readbytes) {
    pthread_t tid = pthread_self();

    printf("on_recvfrom fd=%d readbytes=%d\n", hio_fd(io), readbytes);
    char localaddrstr[SOCKADDR_STRLEN] = {0};
    char peeraddrstr[SOCKADDR_STRLEN] = {0};
    printf("[tid=%ld][fd=%d][%s] <=> [%s] #[%lu]\n",
        (long)hv_gettid(),
        (int)hio_fd(io),
        SOCKADDR_STR(hio_localaddr(io), localaddrstr),
        SOCKADDR_STR(hio_peeraddr(io), peeraddrstr),
        (unsigned long)tid);

    // get msg
    char* str = (char*)buf;
    printf("< %.*s", readbytes, str);

    // echo back
    printf("> %.*s", readbytes, str);
    hio_write(io, buf, readbytes);

#if TEST_KCP
    if (strncmp(str, "CLOSE", 5) == 0) {
        hio_close_rudp(io, hio_peeraddr(io));
    }
#endif
}

static HTHREAD_ROUTINE(worker_thread) {
    hloop_t* loop = (hloop_t*)userdata;
    hloop_run(loop);
    return 0;
}

static HTHREAD_ROUTINE(accept_thread) {
    // not used for UDP
    /*
    hloop_t* loop = (hloop_t*)userdata;
    hio_t* listenio = hloop_create_tcp_server(loop, host, port, on_accept);
    if (listenio == NULL) {
        exit(1);
    }
    hloop_run(loop);
    */

    //hloop_t* loop = hloop_new(0);
    hloop_t* loop = (hloop_t*)userdata;
    hio_t* io = hloop_create_udp_server(loop, host, port); // Change the port as needed
    if (io == NULL) {
        return NULL;
    }

    hio_setcb_read(io, on_recvfrom);
    hio_read(io);
    hloop_run(loop);
    hloop_free(&loop);

    return 0;
}

int main(int argc, char** argv) {
    if (argc < 2) {
        printf("Usage: cmd port\n");
        return -10;
    }
    port = atoi(argv[1]);

    worker_loops = (hloop_t**)malloc(sizeof(hloop_t*) * thread_num);
    for (int i = 0; i < thread_num; ++i) {
        worker_loops[i] = hloop_new(HLOOP_FLAG_AUTO_FREE);
        hthread_create(worker_thread, worker_loops[i]);
    }

    accept_loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
    accept_thread(accept_loop);

    return 0;
}

If i do a simple test, it is oppening 4 threads but also observe that it only uses one thread:

netcat test1:

nc 127.0.0.1 8000  -u
msg1
msg1
msg2
msg2
^C

netcat test2:

nc 127.0.0.1 8000  -u
msg1 connect2
msg1 connect2
msg2 connect2
msg2 connect2
^C

Output:

on_recvfrom fd=8 readbytes=5
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:55234] #[139856893998912]
< msg1
> msg1
on_recvfrom fd=8 readbytes=5
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:55234] #[139856893998912]
< msg2
> msg2
on_recvfrom fd=8 readbytes=14
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:43275] #[139856893998912]
< msg1 connect2
> msg1 connect2
on_recvfrom fd=8 readbytes=14
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:43275] #[139856893998912]
< msg2 connect2
> msg2 connect2

As you can see tid=27641 remains the same even after re-connection.

What should i solve this?

Thank you.

ithewei commented 11 months ago

UDP only uses one fd, unlike TCP which uses a new fd every time it accepts a new connection. An fd is usually processed using one loop. If you want to process it with multiple threads, you should distribute the message received from recvfrom to multiple threads to consume.

nominalval commented 11 months ago

I am concerned about I/O block when receiving UDP packets, this is why I want to write a program similar to an app written in node.js that uses dgram and cluster packages.

const os = require('os');
const dgram = require('dgram');
const cluster = require('cluster');
const client_udp = dgram.createSocket('udp4');

config = {};
config['host'] = "127.0.0.1";
config['port'] = 7500;

if(cluster.isMaster)
{
  // Master process
  for (let i = 0; i < os.cpus().length; i++)
  {
    worker = cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    worker = cluster.fork();
  });
}
else
{
  // Child
  const server = dgram.createSocket('udp4');
  server.on('message', (msg, rinfo) => {
    console.log("Message got from #"+process.pid+" :"+msg);
  });

  server.on('listening', () => {
    const address = server.address();
    if(config['debug'] >= 1) console.log(`UDP server listening on ${address.address}:${address.port}`);
  });
  server.bind(config.port,config.host);
}

NETCAT:

nc 127.0.0.1 7500 -u
ss
dd
ff
ss
ff
ss

OUTPUT:

Message got from #12786 :ss

Message got from #12787 :dd

Message got from #12787 :ff

Message got from #12799 :ss

Message got from #12787 :ff

Message got from #12787 :ss

I want to obtain same behavior (even though i see that the load balancing is not fair). For I/O operation i pass them in a thread pool already, i am just concerned about the fact that for high amount of data flow it can loose data, that is why i want to have events + threads.

Thank you.

ithewei commented 11 months ago

Multi-processes mode, please refer to https://github.com/ithewei/libhv/blob/master/examples/multi-thread/multi-acceptor-processes.c.

nominalval commented 11 months ago

Thank you for your answer, your library is great. This is what i want but using UDP not TCP.

ithewei commented 11 months ago

Thank you for your answer, your library is great. This is what i want but using UDP not TCP.

TCP => UDP
Listen(port, host) => Bind(port, host, SOCK_DRGRAM)
haccept (hio_get -> hio_setcb_accept -> hio_accept) => hio_get(fd) -> hio_setcb_read -> hio_read
nominalval commented 11 months ago

Is this the order of the function that i should call?

I would like to see an example, as i have already tried before.

I also don't see a simple sender in C, would be great if there were one.

nominalval commented 9 months ago

Thank you for taking your time and solve this. I will test this in a emulated production environment with a lot of requests.