ntex-rs / ntex

framework for composable networking services
Apache License 2.0
2.04k stars 110 forks source link

Last update spikes WebSocket CPU usage to 100% #461

Closed JeanRomain closed 2 days ago

JeanRomain commented 2 days ago

Hi, Since the last row of update any web socket connection on my setup spike one CPU core constantly at 100% even with one connection. Apart from that the connection is still working and sending updates to clients.

I haven't changed my code for months and the websocket part never took more than a few % cpu.

Here is my full web socket code. I also attached screenshots from MacOS profiler. (I have the same spike 100% on my docker image)

use std::{cell::RefCell, io, rc::Rc, time::Duration, time::Instant};
use tokio::sync::mpsc;
use ntex::web::{self, HttpRequest};
use ntex::util::Bytes;
use ntex::{chain, fn_service};
use ntex::{channel::oneshot, rt, time};
use futures::future::{ready, select, Either};
use ntex::service::{fn_factory_with_config, fn_shutdown, map_config, Service};
use serde::{Deserialize, Serialize};
use entity::common::utils::ModelCategories;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use once_cell::sync::Lazy;

use uuid::Uuid;

use crate::middlewares::auth::get_user_id;
use crate::support::support_funcs::{encode_uuids_to_string, get_connection_id_from_request};

const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(15);
const CLIENT_TIMEOUT: Duration = Duration::from_secs(30);

struct Connections {
    connections: HashMap<Uuid, HashMap<Uuid, mpsc::Sender<WebSocketMessage>>>,
}

static CONNECTIONS: Lazy<Arc<RwLock<Connections>>> = Lazy::new(|| {
    Arc::new(RwLock::new(Connections {
        connections: HashMap::new(),
    }))
});

struct WsState {
    hb: Instant,
}

async fn ws_service((sink, user_id): (web::ws::WsSink, Uuid)) -> Result<impl Service<web::ws::Frame, Response = Option<web::ws::Message>, Error = io::Error>, web::Error> {
    let state = Rc::new(RefCell::new(WsState { hb: Instant::now() }));

    let (tx, rx) = oneshot::channel();

    let con_id = Uuid::new_v4();

    let message = WebSocketMessage {
        messageType: String::from("id"),
        what: None,
        message: Some(con_id.to_string()),
        color: None,
    };

    let message_str = match serde_json::to_string(&message) {
        Ok(m) => m,
        Err(_) => return Err(web::Error::from(io::Error::new(io::ErrorKind::Other, "Serialization error"))),
    };

    rt::spawn(heartbeat(state.clone(), sink.clone(), rx, con_id.clone(), user_id.clone()));

    match sink.send(web::ws::Message::Text(message_str.into())).await {
        Ok(_) => (),
        Err(_) => return Err(web::Error::from(io::Error::new(io::ErrorKind::Other, "Failed to send initial message"))),
    }

    let (msg_tx, mut msg_rx) = mpsc::channel(256);
    let sink_clone = sink.clone();

    rt::spawn(async move {
        while let Some(msg) = msg_rx.recv().await {
            let msg_str = match serde_json::to_string(&msg) {
                Ok(m) => m,
                Err(_) => {
                    eprintln!("Error serializing message");
                    continue;
                }
            };

            match sink_clone.send(web::ws::Message::Text(msg_str.into())).await {
                Ok(_) => (),
                Err(_) => break,
            }
        }
    });

    add_connection(con_id, user_id, msg_tx).await;

    let service = fn_service(move |frame| {
        let item = match frame {
            web::ws::Frame::Ping(msg) => {
                state.borrow_mut().hb = Instant::now();
                Some(web::ws::Message::Pong(msg))
            }

Screenshot 2024-11-10 at 9 54 34 PM Screenshot 2024-11-10 at 9 54 53 PM Screenshot 2024-11-10 at 9 55 14 PM

fafhrd91 commented 2 days ago

could you run "cargo update" and try again

JeanRomain commented 2 days ago

3.3.2 fixed the problem, thanks for the quick fix