snapview / tokio-tungstenite

Future-based Tungstenite for Tokio. Lightweight stream-based WebSocket implementation
MIT License
1.88k stars 236 forks source link

Memory leak #208

Closed ilyavoronin closed 2 years ago

ilyavoronin commented 2 years ago

This is the code to reproduce the memory leak. After each of the 3 iterations, the memory used by the process increases by the same amount:

use tokio::net::{TcpListener, TcpStream};
use futures::channel::mpsc::{UnboundedSender, UnboundedReceiver, unbounded};
use tokio_tungstenite::tungstenite::Message;
use futures::StreamExt;
use std::time::Duration;

async fn handle_connection(
    tcp_stream: TcpStream
) -> () {
    let ws_stream = tokio_tungstenite::accept_async(tcp_stream)
        .await;

    if ws_stream.is_err() {
        return;
    }
    let ws_stream = ws_stream.unwrap();

    let (_, receiver): (UnboundedSender<Message>, UnboundedReceiver<Message>) = unbounded();

    let (outgoing, incoming) = ws_stream.split();
    tokio::spawn(receiver.map(Ok).forward(outgoing));

    tokio::spawn(async move {
        incoming.take_while(|msg | {
            return if msg.is_err() {
                futures::future::ready(false)
            } else {
                futures::future::ready(true)
            };
        }).for_each( |_| async {
            tokio::time::sleep(Duration::from_millis(100)).await
        }).await;
    });
}

#[tokio::main]
async fn main() {
    let addr = "127.0.0.1:30000";
    let conn: TcpListener =
        TcpListener::bind(addr.clone()).await
            .expect(format!("Failed to bind to {}", addr).as_str());

    tokio::spawn(async move {
        match conn.accept().await {
            Ok((stream, _)) => {
                stream.set_nodelay(true).unwrap();

                handle_connection(stream).await;
            },
            Err(a) => { println!("Error {}", a); }
        };
    });

    let (conn, _) = tokio_tungstenite::connect_async("ws://127.0.0.1:30000").await.unwrap();
    let (write, _) = conn.split();

    let (input, output): (UnboundedSender<Message>, UnboundedReceiver<Message>) = futures::channel::mpsc::unbounded();

    tokio::spawn(
        output.map(Ok).forward(write)
    );

    for _ in 0..3 {
        println!("Sending data");
        for _ in 0..100 {
            let bytes = vec![1_u8; 10_000_000];
            input.unbounded_send(Message::Binary(bytes)).unwrap();
        }
        println!("Waiting");

        tokio::time::sleep(Duration::from_secs(20)).await;
    }
}