Munksgaard / session-types

MIT License
550 stars 21 forks source link

Session types over tcp? #71

Open S3j5b0 opened 2 years ago

S3j5b0 commented 2 years ago

Hi, I'm doing a project where we are thinking about using this one.

We need to implement some small constrained devices that communicate over a network, and use session types.

Therefore, I have been looking at the examples here. I have noticed that all of the examples are in a single file, and consists of two or more threads, that are talking to each other.

Buit I can't really figure out how this would be done if the two parties were not in the same file, but communicating remotely with each other. Do you have any examples where the two parties are disjoint, and not in the same file? perhaps talking over tcp?

Munksgaard commented 2 years ago

Hi, thank you for your question!

Unfortunately, communication between processes, let alone across sockets, is not supported by this library in the current state.

I've given it some thought over the years, and I'm not sure session types (at least not in the way they are implemented in this repository) are well suited for something like that. In essence the purpose of this library, and session types in general, provides certain guarantees regarding inter-thread communication, which we are only able to uphold because we can statically type check the threads against each other. When two processes communicate across sockets, or services communicate across the network, it becomes much harder (if not impossible) to guarantee that the peer adheres to the same protocol as you do.

If you have any good ideas I'd be happy to hear them, but otherwise I'm sorry to disappoint you.

pothos commented 2 years ago

I think Multi-Party Session Types were the proposed solution(s) to this. I didn't read this paper in particular but it shows up as first result in a search: https://link.springer.com/chapter/10.1007/978-3-030-36987-3_5 - maybe there is still enough overlap between both that would allow for some code sharing? Edit: Ok, the concern that one doesn't know if the other client implements the same protocol is probably always valid and only robust code can help…

S3j5b0 commented 2 years ago

thanks a lot, I had an inkling that this was the case.

I did look a little bit at the mpst thing, and in that connection looked at this library. It seems to me like this one should have support for coomunication over the wire, and it also has an example of tcp communication, with a server and client. The only issue is that this example doesnt actually use the library at all. So i'm probably thinking of dropiing the idea I had all together

tov commented 2 years ago

I think you could do this by having a proxy thread in each process whose job is de/serialization and communication over the wire.

Suppose party A wants to communicate with party B using protocol S. So A communicates with the proxy PA in its own process via a session-typed channel, and PA serializes the messages and sends them over the wire to the proxy PB on the other side, which deserializes them to send to B over a second session-typed channel.

[ A <—S—> PA ] <—wire—> [ PB <—S—> B ]

If necessary, the wire protocol could check that the session types match when establishing the connection.

If you wanted to get fancy, you could also: Generate the wire protocol from the session type. Protocol versioning, where if the two parties speak slightly different protocols then the proxies can translate. (See e.g. Avro, which does this for JSON schemas.) J

On Mar 5, 2022, at 09:24, S3j5b0 @.***> wrote:

 thanks a lot, I had an inkling that this was the case.

I did look a little bit at the mpst thing, and in that connection looked at this library. It seems to me like this one should have support for coomunication over the wire, and it also has an example of tcp communication, with a server and client. The only issue is that this example doesnt actually use the library at all. So i'm probably thinking of dropiing the idea I had all together

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you are subscribed to this thread.

S3j5b0 commented 2 years ago

Hi @tov Thanks a lot for the idea, I gave it a go.

I made a little toy tcp example with a client and a server, and then made some session types for the client (with the proxy server session).

But I ran into some curios behaviour, that I would like to hear your 2 cents on (only if you have the time and energy, the code here is a bit quick and dirty, and thus a bit messy).

It's a three message protocol, where it sends arrays back and forth. The message 1 is consists of 1's, message 2 of 2's etc. Error messages consist of a array of 41's.

I first have a server, which is the boring part that looks like this:

use std::net::{Shutdown,TcpListener, TcpStream};
use std::thread;
use std::io::{Read,Write,Error};

fn handle_client(mut stream: TcpStream)-> Result<(), Error> {
    println!("incoming connection from: {}", stream.peer_addr()?);
    let mut buf = [0;512];
    loop {
        let bytes_read = stream.read(&mut buf)?;
        if bytes_read == 0 {return Ok(())}
        eprintln!("getting {:?}",&buf[0..bytes_read]);
        let msg2 = &[2; 256];
        stream.write(msg2)?;

        let mut buf = [0;512];
        let bytes_read = stream.read(&mut buf)?;
        if bytes_read == 0 {return Ok(())}
        eprintln!("getting msg 3 {:?}", &buf[0..bytes_read]);
    }
}
fn main() {
    let listener = TcpListener::bind("0.0.0.0:8888").expect("Could not bind");
    for stream in listener.incoming() {

        match stream {
            Err(e)=> {eprintln!("failed: {}", e)}
            Ok(stream) => {

                thread::spawn(move || {
                    handle_client(stream).unwrap_or_else(|error| eprintln!("{:?}", error));
                }); 
            }
        } 
    }
}

and then the client, which is the one that uses session types:

use rand::random;
use std::io::{Read, Write};
use std::net::{Shutdown,TcpStream};
use std::str::from_utf8;
extern crate session_types;
use session_types::*;
use std::thread::spawn;

type send_error_die = Send<[u8;256], Eps>;
type get_error_die = Recv<[u8;256], Eps>;

type clientlocal = Send<[u8;256], Offer<Recv<[u8;256], Choose<Send<[u8;256], Eps>,send_error_die>>, get_error_die>>;
type proxyserver = Recv<[u8;256], Choose<Send<[u8;256], Offer<Recv<[u8;256], Eps>, get_error_die>>,send_error_die>>;

fn client_session(c:Chan<(),clientlocal>){
    let msg1 : [u8;256] = [1;256];
    let c = c.send(msg1);
    offer ! {c,
    Arr => {
        let (c,msg2) = c.recv();
        println!("client thread received: {:?}", msg2);
        let msg3 = [41;256]; // msg3 is an error
        if msg3 != [41;256]{
            let c = c.sel1().send(msg3);
            c.close();
        } else {
            println!("sending error 3");
            let c = c.sel2().send(msg3);
            c.close();
        }

    },
    Eps => {
        let (c,n) = c.recv();
        println!("received error {:?}", n);
        c.close();
    }
    };

}

fn server_handler(c:Chan<(),proxyserver>){

        match TcpStream::connect("localhost:8888") {
            Ok(mut stream) => {
                println!("Successfully connected to server in port 3334");
                let (c,msg1) = c.recv();

                stream.write(&msg1).unwrap();
                println!("Sent Hello, awaiting reply...");

                let mut data = [0_u8; 256]; 
                match stream.read(&mut data) {
                    Ok(_) => {

                            if data != [41;256]{
                                let c = c.sel1().send(data);
                                offer! { c, 
                                  Arr  => {
                                    let (c, msg3) = c.recv();
                                    println!("trying to send msg3");
                                    stream.write(&msg3).unwrap();
                                    println!("sent msg3");
                                    c.close();
                                    },
                                  Err => {
                                    println!("Receiving message 3 error message");
                                    let (c, msg3) = c.recv();
                                    stream.write(&msg3).unwrap();
                                    println!("sent errormessage at message 3");
                                    c.close();
                                  }  
                                }
                            } else {
                                // we receive msg2 as an error
                                let c = c.sel2().send(data);
                                c.close();
                            }

                    }
                    Err(e) => {
                        println!("Failed to receive data: {}", e);
                    }
                }
            }
            Err(e) => {
                println!("Failed to connect: {}", e);
            }
        };

    }

fn main() {
    let (s,c) = session_channel();
    spawn(move || server_handler(c));
    client_session(s);

}

This sends the messages back and forth and works 9/10 times. But if I run it enoguh times, then once in a while, the last third error message will not be sent. At first I thought this was just a case of me messing up the tcp stuff, but after debugging a bit, it actually seems like the issue haappens here:

            println!("sending error 3");
            let c = c.sel2().send(msg3);
            c.close();

Which always is executed. and then caught here:

                                    println!("Receiving message 3 error message");
                                    let (c, msg3) = c.recv();
                                    stream.write(&msg3).unwrap();
                                    println!("sent errormessage at message 3");
                                    c.close();

most of the time, the print "sending error 3" will be followed by "Receiving message 3 error message", but every once in a while, it seems like the receiving part just doesnt happen, and is skipped.

So, my guess as to what this could be, was some data-race like behaviour where one thread gets too much ahead of the other, but I can't really decipher it.

I would be really interested to hear your guesses

laumann commented 2 years ago

So nice to see someone writing session types :-)

But if I run it enoguh times, then once in a while, the last third error message will not be sent.

Would be interesting to see some log output from both the receiving process and sending process.

but every once in a while, it seems like the receiving part just doesnt happen, and is skipped.

Hmmm, I think a lot more debugging (strace maybe?) could clue us into what's happening, but it sounds like something gets closed before it should be. Could you also print the number of bytes received in stream.read() it could be that the read isn't the full 256 bytes maybe.

Another little thing - you should be able to do:

type clientlocal = Send<[u8;256], Offer<Recv<[u8;256], Choose<Send<[u8;256], Eps>,send_error_die>>, get_error_die>>;
type proxyserver = <clientlocal as HasDual>::Dual;
aakoshh commented 2 years ago

I wanted to use this library for TCP and Web Socket connections as well. I realised that the intention here is for inter-thread communication, so I took some of the fantastic code and reused it to build an asynchronous version on top of tokio, where you can plug in your own wire types, so the session can check their expectations.

It's published here: https://crates.io/crates/async-session-types