noreplydev / suiro

Suiro is a reverse connection service written in rust for HTTP protocol based services.
11 stars 3 forks source link

Verificar requestid recibido por el canal mpsc #2

Open noreplydev opened 10 months ago

noreplydev commented 10 months ago

Originally posted by @voliva in https://github.com/noreplydev/suiro-rs/issues/1#issuecomment-1807251727

Vi una parte del código que no termino de entender o creo que puede estar fallando algo.

    loop {
        // Check if response is ready
        if let Some((agent_response_id, agent_response_body)) = responses_rx.recv().await {
            if request_id == agent_response_id {
                http_raw_response = agent_response_body;
                break;
            }
        }

       // ...
    }

Es la parte del if requestid == agent_response_id para comprobar si el mensaje que llega es la respuesta del mensaje que esta función ha enviado al lado del socket.

Creo que cuando lees de un canal, el mensaje se consume y por lo tanto se pierde para otros que crees que pueden estar leyéndolo. Si resulta que el mensaje no es el tuyo y lo descartas, este mensaje se va a perder.

Lo que luego hablándolo con otra gente del meetup me dijeron que es imposible que un canal tenga varios consumidores, porque es un canal mpsc = Multiple Producer Single Consumer, con lo que el compilador no va a dejar nunca que pueda estar compartido con varios, con lo que creo que es posible que esta parte se pueda simplificar un poco. Pero me pierdo un poco intentando ver donde se crea este canal y como se utiliza. Creo que en la charla lo describiste bastante bien por esto.

Gracias!

noreplydev commented 10 months ago

Buenas @voliva,

Creo que estás en un commit anterior al último, aún así, está verificación se sigue haciendo en los últimos commits, voy a mirar si es del todo necesaria.

Siempre puedes hacer una pr sin miedo y ver como lo enfocas tú. Si todo es correcto la acepto sin problema.

Me lo miro lo antes posible y te digo algo.

horacimacias commented 10 months ago

hola @voliva/@noreplydev,

por si sirve de algo, yo estuve en esa charla hablando del mpsc y como dices es Single Consulmer; es parecido al ownership de rust en el que "solo una variable es la propietaria de los datos". Yo tambien creo que ese chequeo es innecesario y parece que puede ser un leftover de una implementacion anterior donde todas las peticiones/respuestas se almacenaban en un mapa:

                        // Add data to responses hashmap
                        let _ = tx
                            .send((request_id.to_string(), packet_data.to_string()))
                            .await;

ahora mismo, los tx/rx son distintos para cada session; se crean un par nuevo en cada nueva session:

// Add session to hashmap
    let hashmap_key = session_endpoint.clone();
    let (socket_tx, mut rx) = mpsc::channel(100); // 100 message queue
    let (tx, responses_rx) = mpsc::channel(100); // 100 message queue
    let session = Session::new(socket_tx, responses_rx);
    {
        sessions.lock().await.insert(hashmap_key, session); // create a block to avoid infinite lock
    }

asi que en principio, comprobar de nuevo que "es tu session" parece innecesario o almenos que poco se puede hacer si el mensaje que recibes no es para ti. Eso significaria que el insert/get del mapa de sessions esta mal.

No se si se esperan solamente una peticion/respuesta HTTP por session al mismo tiempo pero creo que puede faltar algo para gestionar peticiones concurrentes para la misma session. Si he entendido el codigo, el flujo es algo como:

  1. suiro arranca y espera peticiones http y tcp.
  2. el "sunny day scenario" es que se recibe una peticion tcp y eso inicializa una session. Entonces ahora podemos recibir una peticion http en esa misma session.
  3. cuando se recibe una peticion http se machea con la session tcp y se escribe la peticion http en el socket tcp.
  4. immediatamente se lee del socket una respuesta (creo que es incorrecto hacerlo immediatamente; es posible que la respuesta tcp no la tengamos todavia o no completa. El codigo de fragmentacion parece gestionar esto).
  5. se repite el loop 3/4

ahora mismo, como el http_connection_handler bloquea el mapa de sessiones y no lo libera, es imposible que otra request http (para la misma session o para otra) pueda ser gestionada hasta que la peticion que bloquea el mapa de sessions termine (lo del 'very dangerous' ya iba bien encaminado; falta hacer algo al respecto):

let mut sessions = sessions.lock().await; // get access to hashmap - very dangerous

Por ultimo, en el "loop central" del tcp_connection_handler:

 loop {
        // Write data to socket on request
        if let Some(Some(request)) = rx.recv().now_or_never() {
            socket.write_all(request.as_bytes()).await.unwrap();
        }

        if let Some(sock) = socket.read(&mut buffer).now_or_never() {

creo que estas haciendo un "if let Some" cuando necesitas una request (no es que "quizas tengas una request" sino que quieres bloquear/loopear hasta que tengas una request). Una vez tienes una request, necesitas la response a esa request (de nuevo, no es que "quizas tengas una response o parte de ella", sino que necesitas bloquear/loopear hasta que tengas la respuesta entera a esa request. Con eso quiero decir que el loop deberia hacer algo como:

  1. obtener una request del rx. Si no recibo nada no hace falta ni seguir leyendo del socket tcp ya que no voy a tener nunca una respuesta tcp si no he enviado nada todavia.
  2. cuando ya he recibido una request entera, entonces ahora cambio a modo "estoy esperando la respuesta tcp a lo que he enviado por tcp".
horacimacias commented 10 months ago

...quizas si haces un diagrama de flujo del proceso te ayudara. Ahora mismo en el mismo codigo estan mezcladas cosas de bajo nivel (gestion de errores/fragmentacion) con el "loop principal". Si añadimos a eso un lenguaje nuevo con varias ideas distintas de los otros lenguajes que hayas utilizado antes, es facil (almenos para mi) perderse. Deberias poder simplificarlo para que el flujo quede simple y claro.

noreplydev commented 10 months ago

Creo que hay algo que no quedó del todo claro en la charla. Y es que, el if que verifica request_id no mira si es la sesión que toca, sino que se encarga de mirar si la response que le llega es de la request que se ha hecho.

Es decir, tenemos un canal mpsc (tx, rx) para comunicar el servidor tcp y el http por cada conexión (siendo conexión en este contexto un nuevo socket conectado a un agente detrás de una nat). Por ello, cada petición que envié el servidor http al servidor tcp debe ir identificada con un id para que cuando vuelva se sepa cual es cual. En servicios en los que las peticiones son prácticamente concurrentes; como lo puede ser una web con llamadas a recursos de imágenes, css, favicon..., haría mucha diferencia la verificación. Sin ella, se enviarían repuestas a peticiones distintas. Ejemplo de lo que me refiero:

GET /session-endpoint/A GET /session-endpoint/B GET /session-endpoint/C

GET /session-endpoint/A - 200ms [Tiempo de respuesta del servidor tcp] GET /session-endpoint/B - 100ms GET /session-endpoint/C - 500ms

En este caso, por el rx del socket recibiríamos las respuestas en el siguiente orden: B, A, C. Sino verificásemos, enviaríamos a la request A la repuesta B.

Ahora mismo simplemente se queda la request en espera, pero la idea sería implementar una pila de respuestas no usadas por otras requests. Y verificar tanto la respuesta actual, como la pila de respuestas.

voliva commented 10 months ago

Dado este ejemplo, creo que puede ser que haya un error, que es que la respuesta se pierda completamente.

Si cada peticion HTTP levanta un nuevo http_connection_handler, el A va a intentar recibir solo la respuesta [A]. Lo que pasa es que si el A por mala suerte al hacer session.responses_rx.recv().now_or_never() recibe mensaje [B], va a ver que el mensaje no es el que toca y lo descarta. Hasta aqui bien.

Pero el responses_rx ya ha entregado el mensaje [B], con lo que no se lo va a pasar al B cuando haga el .recv().now_or_never(). Si esto pasa, el mensaje [B] nunca se transmitirá.

Lo que no me encaja de esto es el tema de que es un canal mpsc, que en teoria el compilador no tendria que dejar que el mismo canal haya varios con una referencia. Es aqui que no entiendo como es que esto es posible:

// sessions_ref: Arc<Mutex<HashMap<String, (mpsc::Sender<String>, mpsc::Receiver<(String, String)>)>>>
let sessions_clone = sessions_ref.clone();
http_connection_handler(req, sessions_clone)

Puede ser que se pueda hacer este .clone() porque el mpsc::Receiver esta detrás de un Mutex? Con lo que hay la garantía que en ningún momento habrán dos threads intentando recibir un mensaje?

horacimacias commented 10 months ago

puedes "hacer que el reciever sea clonable" a base de ponerlo dentro de un Arc (asi que no clonas el reciever sino que clonas el Arc y usas el Mutex para garantizar que solo uno lo utiliza al mismo tiempo) pero creo que lo de utilizar un mpsc aqui va a traer mas problemas que beneficio.

Cuando tenga mas tiempo luego envio una parrafada que estaba escribiendo; en resumen yo creo que necesitas utilizar oneshot (https://docs.rs/tokio/latest/tokio/sync/oneshot/fn.channel.html) que es muy parecido a mpsc pero solo para 1 mensaje (solo esperas 1 respuesta a 1 peticion) y ademas necesitas hacer que una Session tenga M pares de tx/rx oneshot. Cuando llega una respuesta y has identificado la session_id y el request_id, en ese momento ya sabes que puedes contestar a esa peticion y ya sabes que esa peticion no va a recibir ninguna respuesta mas (la session puede que si, pero nunca esa peticion http). Creo que necesitas algo como un mapa de pending_requests asociado a cada Session, y ahi guardar el tx del oneshot (mientras el http_handler esta haciento un rx.await).

horacimacias commented 10 months ago

Lo de utilizar oneshot es simplemente porque queda claro que solo vas a tener 1 respuesta para 1 peticion. puedes hacer lo mismo con mpsc pero es como utilizar un vector para guardar un item solamente. El problema de clonar el receiver se "soluciona" haciendo que haya multiples receivers (cada peticion/respuesta http necesita su par tx/rx)

noreplydev commented 10 months ago

Esta noche miraré de implementar el oneshot y de arreglar el error con el lock. Comento en cuanto tenga algún avance.

horacimacias commented 10 months ago

Bueno finalmente he tenido algo de tiempo.

Si lo he entendido bien, tenemos N sessions donde cada session tendrá M peticiones pendientes (M no tiene porque ser el mismo para cada session). Cada session empieza con un a conexión tcp que se recibe del agente, y cada session tiene un socket.

Suiro esta combinando dos tipos de servidores/handlers para cada session:

  1. Handlers http que reciben 1 Request (http_connection_handler). De estos habra multiples para una misma session pero cada uno es para una peticion solamente (por eso tenemos el parametro request; no es un stream de requests sino uno solo)
  2. tcp donde gestiona un socket (por el que se enviarán y recibirán M requests/responses).

Si definimos request_sessionid_requestid y response_sessionid_requestid, tenemos:

  1. cuando recibimos response_1_2 (respuesta a la petición 2 a la session 1) debemos poder enviar la respuesta a request_1_2 (y esa peticion ya no esta pendiente y ese http handler termina). Gestionar response_1_2 no debería necesitar acceso a todas las sessiones; solamente necesita acceso a la session 1 (o más concretamente a la request_1 para contestarla). En cuanto se ha gestionado response_1_2 ya se puede liberar todo lo que hacía que request_1_2 estuviese esperando.
  2. cuando recibimos una request, por ejemplo request_2_3, necesitamos acceso a session 2 y si session 2 no existe entonces ya podemos responder que esa petición está mal (es para un endpoint no establecido).
  3. Todas las requests para una misma session se encolan (para esa misma session); todas se van a enviar mediante el mismo único socket por session. Requests de sesiones distintas se gestionan concurrentemente.

La session actual es:

struct Session {
    socket_tx: mpsc::Sender<String>,
    responses_rx: mpsc::Receiver<(String, String)>,
}

Side note: Creo que si utilizas mas tipos, el compilador te ayudara a detectar errores. Utilizar un mismo tipo (String) para http requests, http responses, raw http requests y raw http responses creo que te va a complicar cosas. Yo he utilizado HttpRequest y HttpResponse para distinguir; probablemente quieras añadir métodos específicos.

Por ejemplo:

struct Session {
    session_id: String,
    session_endpoint: String,
    socket_tx: mpsc::Sender<HttpRequestTask>,
}

HttpRequestTask es simplemente un type alias type HttpRequestTask = (HttpRequest, Sender<HttpResponse>);

session_id y session_endpoint son solo para logging. 
Una Session es algo que gestiona HttpRequestTasks. Una HttpRequestTask es la combinación de una HttpRequest y el Sender de la HttpResponse que se recibirá. 
 Es como recibir un parámetro y una función callback; el callback se utiliza para responder a lo que has hecho con el primer parámetro. 
Aquí el oneshot encaja bien; solo vas a responder 1 HttpResponse por cada HttpRequest.

Entonces tendríamos algo asi para la función http_connection_handler:

  1. obtén la session del mapa de sessions (aquí necesitas acceso concurrente (solo lectura) a sessions). Si no hay session para este session_id, responde con error (eso querría decir que hemos recibido una petición para una session que no existe). Asegúrate que el lock se mantiene en uso el mínimo tiempo posible (si lo extraes a una función el scope del lock se termina con esa función).
  2. Hemos encontrado la session. Crea un par tx/rx para que se envíe la respuesta cuando llegue.
El tx sera nuestro como recibiremos el 'callback'. El rx será donde la tarea gestionando la petición concreta esperará a que alguien (la Session a la cual enviamos el tx) escriba la HttpResponse. Añade la "tarea" (HttpRequest, Sender) a la session.
3. Haz un rx.await Con eso el handler http se espera hasta que alguien envíe la respuesta al tx correspondiente.

Si extraes la lógica de bajo nivel de http/parseado/etc se aclarará bastante la función http_connection_handler.

"Añadir la tarea" a la session es algo como:


    async fn add_request(
        &self,
        request: HttpRequest,
        callback: Sender<HttpResponse>,
    ) -> Result<(), Box<dyn Error>> {
        self.incoming_requests.send((request, callback)).await?;
        Ok(())
    }

En tcp_server necesitas algo como esto, en un loop:

  1. Recibe un nuevo socket
  2. Crea una nueva Session para ese nuevo socket. Añade la session al mapa de Sessions (aquí necesitas acceso concurrente (escritura) a sessions). Cuanto mas corto sea ese acceso, mejor.
  3. Arranca la Tokio task para esa Session. Importante; cuando la task asociada a una session termina, por cualquier motivo, borra la session del mapa de sesiones (de nuevo acceso concurrente escritura necesario). Si no lo haces, no liberas las sessions.

La "tareas para esa Session" es parecido a lo que tenias en tcp_connection_handler.

Necesitas algo como:

  1. Si hay peticiones entrantes (de algún http_request_handler), envíalas al socket. Antes de enviarlas, guarda el callback para así poder responder cuando llegue la respuesta por el socket
  2. Si hay respuestas entrantes por el socket (de peticiones que venían de http_request_handlers), saca el callback asociado al request_id de esa respuesta y escribe la respuesta ahi (eso liberara al http_request_handler que esta esperando la respuesta). Importante: quieres hacer 1 y 2 "en cuanto haya algo que hacer", tanto si es 1 como 2. Si primero haces 1 y luego 2 eso quiere decir que no vas a leer respuestas hasta que tengas una nueva petición entrante. Necesitas algo como select! (https://rust-lang.github.io/async-book/06_multiple_futures/03_select.html) o destinar tanto peticiones entrantes como respuestas entrantes mediante otro mpsc (y entonces leyendo de ese mpsc vas a poder ejecutar lo que llegue, sea lo que sea que te llegue). He subido el código en dos branches distintos, https://github.com/horacimacias/suiro-rs/tree/using_select para el select y https://github.com/horacimacias/suiro-rs/tree/using_task_channel utilizando otro mpsc.

Por último, lo de leer paquetes que vienen del agente y ver si están fragmentados, te sugiero que extraigas eso en su propia función (y así también la puedes testear fácilmente). (Mira el tutorial de Tokio y el ejemplo mini-redis, que tiene una parte de framing que es muy parecido a lo que necesitas, creo https://tokio.rs/tokio/tutorial/framing) Yo he utilizado la macro todo! que te permite que el código compile pero por supuesto peta si la ejecutas. Aun así, es muy útil para poder centrarte en una parte del código sin tener otra parte totalmente terminada; almenos tienes la signatura de la función.

async fn read_next_packet(_socket: &mut TcpStream) -> io::Result<SuiroResponse> {
    todo!()
}

Otro punto a fijarse: el mapa pending_requests no necesita locking. Solo un tokio task esta utilizándolo así que compila sin necesitar Arc<Mutex<>>. Si en algun momento lo intentases utilizar desde tareas/threads distintos, el compilador no te dejaria a menos que sea seguro (https://doc.rust-lang.org/book/ch16-00-concurrency.html)

Espero que haya entendido el objetivo y las ideas sean útiles. Si me he colado en algún sitio o no está claro, por favor decidme.

noreplydev commented 9 months ago

Vale, he estado un par llevo un par de días leyendo esto, miro de implementar varias cosas que has comentado. Primero me miraré las branches que tienes. Espero poder implementar alguna parte hoy.

Lo que si he hecho es dividir el proyecto ya en módulos, creo que la carga cognitiva ya empezaba a ser muy densa para un único fichero.

horacimacias commented 9 months ago

perfecto, espero que te fuese util y no me colase mucho. Por supuesto, "feel free to ignore"; tu conoces realmente lo que quieres que haga suiro. Si necesitas alguna cosa en concreto aqui estoy.

noreplydev commented 9 months ago

Perfecto, en cuanto solucione esta issue lo hago saber por aquí.