HFQR / xitca-web

Apache License 2.0
691 stars 42 forks source link

spawning xitca_server address in use error #1042

Closed andodeki closed 4 weeks ago

andodeki commented 1 month ago

I am trying to start a xitca_server by spawning it on a task with another mqtt server task like below but i get an error

Failed to start HTTP server: Address already in use (os error 48)

But if i dont spawn (only the xitca server) it in a task it works normally


#[tokio::main]
async fn main() -> std::io::Result<()> {
    let devices: DeviceList = Arc::new(Mutex::new(HashMap::new()));
     // Start the HTTP server task
    let http_server = tokio::spawn(async move {
        let (http_addr, srv) = startxitca(devices).await;
        match srv {
            Ok(srv) => {
                println!("Started HTTP server on: {http_addr}");
                if let Err(e) = srv.build().await {
                    eprintln!("HTTP server error: {}", e);
                }
            }
            Err(e) => eprintln!("Failed to start HTTP server: {}", e),
        }
    });
    // Start the MQTT client task
    let mqtt_client = tokio::spawn(async {
        match startmqtt().await {
            Ok((client, mut eventloop)) => {
                // Spawn a task to publish a message
                // Handle incoming messages
                loop {
                    match eventloop.poll().await {
                    }
                 }
                 Err(e) => eprintln!("Failed to start MQTT client: {}", e),
        }
    });
    // Await both tasks to complete
    Ok(match tokio::try_join!(http_server, mqtt_client) {
        Ok(_) => Ok(()),
        Err(e) => {
            eprintln!("One of the tasks failed: {}", e);
            Err(e)
        }
    }?)
}
fakeshadow commented 1 month ago

It's an OS error raised by your system and there is nothing xitca-web can do about it. Normally it's caused by trying to bind to the same socket address multiple times when your OS not supporting or not config correctly. From my experience you want to check if the address is already used by other process and/or correctly configure your socket options(like so_reuseaddr and so_reuseport)

andodeki commented 1 month ago

so when i am spawing task the my OS does not allow?

fakeshadow commented 1 month ago

so when i am spawing task the my OS does not allow?

It does not matter if you are spawning or not. The other way you write the code may have other side effect to not trigger the error. As long as you reuse a socket address your OS needs to be configured correctly to do it.

BTW your code can be better organized if written this way:

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // server future is a handle to wait or cancel the server. it does not need to be spawned
    // as a separate task and it does very litter actual work(mostly system signal listening)
    let handle = xitca_server::Builder::new()
        .bind("test", "localhost:8080", ..)?
        .build();

    // you can spawn your client or not it depends on your use case
    let (client, mut eventloop) = startmqtt().await?;

    while let Ok(..) = eventloop.poll().await {}

    // you can use the server future as handle to stop xitca server and wait for it to finish.
    // you can select on this part with your client task or execute after your client is gone.
    handle.handle()?.stop(true);
    handle.await;
    Ok(())
}

The trick here is xitca-server spawn it's own async task regardless what you do and the the serverfuture you call wait or await is just a handle to help blocking your thread/async task to wait for the server task to despawn(and trigger the despawn when needed)

andodeki commented 1 month ago

this is the best i could come up with following your code setup above since i cant figure out the return type of the 3 parameter to be passed into the bind method with this setup the server does not start in terminal. but no errors


#[tokio::main]
async fn main() -> std::io::Result<()> {
    let devices: DeviceList = Arc::new(Mutex::new(HashMap::new()));

    let (http_addr, srv) = startxitca(devices).await;
    let mut handle = srv?.build();
    println!("Started HTTP server on: {http_addr}");

    let (client, mut eventloop) = startmqtt().await?;
    tokio::spawn(async move {
        if let Err(e) = requests(client).await {
            eprintln!("Failed to send request: {}", e);
        }
        time::sleep(Duration::from_secs(3)).await;
    });

    while let Ok(p) = eventloop.poll().await {
        match p {
            Event::Incoming(Incoming::Publish(pp)) => {
                println!("Topic: {}, Payload: {:?}", pp.topic, pp.payload);
            }
            Event::Incoming(i) => {
                println!("Incoming = {:?}", i);
            }
            Event::Outgoing(o) => println!("Outgoing = {:?}", o),
        }
    }

    handle.handle()?.stop(true);
    let _ = handle.await;
    Ok(())
}
fakeshadow commented 1 month ago

xitca-server would start in async manner when let mut handle = srv?.build(); is called. If there is no error then it will start eventually. It's most likely other parts of your code is causing problem irrelevant to xitca

andodeki commented 1 month ago

Sorry. What do you mean it will start eventually. Since when i cargo run, the terminal is not ready or does not indicate a server is running to receive incoming requests.

fakeshadow commented 1 month ago

Sorry. What do you mean it will start eventually. Since when i cargo run, the terminal is not ready or does not indicate a server is running to receive incoming requests.

Eventually means the server starts in async and parallel manner from the main function's pov. You can't depend nor assume the running order from your main. Here is an example to show case what it means:

Cargo.toml

[dependencies]
xitca-http = { version = "0.5" }
xitca-service = { version = "0.1" }
xitca-server = { version = "0.3" }

tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] }
tracing = { version = "0.1.40", default-features = false }
tracing-subscriber = { version = "0.3.16", default-features = false, features = ["env-filter", "fmt"] }

main.rs

use std::convert::Infallible;

use xitca_http::{
    http::{IntoResponse, Request, RequestExt},
    HttpServiceBuilder, RequestBody, Response, ResponseBody,
};
use xitca_service::ServiceExt;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        .init();

    let service = xitca_service::fn_service(handler).enclosed(HttpServiceBuilder::new());

    let handle = xitca_server::Builder::new()
        .bind("test", "localhost:8080", service)?
        .build();

    tracing::info!("can happen first");

    handle.await
}

async fn handler(
    req: Request<RequestExt<RequestBody>>,
) -> Result<Response<ResponseBody>, Infallible> {
    Ok(req.into_response("hello,world!"))
}

When running the example you can observe the log in main function can happen between or after the server's starting log and the order is a race condition and can vary from run to run(and machine to machine).

Besides that it also showcase how to observe log produced by xitca-server. xitca crates utilize tracing crate for providing traceable logs. tracing-subscriber is the crate to config how you want to consume/display these logs.