elfo-rs / elfo

An asynchronous distributed actor framework in Rust with robust observability
217 stars 12 forks source link

Opinion on multiparty session types ? #111

Closed planetoryd closed 11 months ago

planetoryd commented 11 months ago

https://github.com/zakcutner/rumpsteak

loyd commented 11 months ago

Hello. I like the original idea of session types and long time, this theme has been underestimated. It's nice that such a project appears and the theme hasn't died. I need some time to play with this project and decide whether this requires some support from the framework or not.

Anyway, right now, I'm focusing on fixing distribution and preparing v0.2 (a lot of documentation and stabilizing unstable parts, also a public announcement), so it's not focused right now. But thanks for the link, very promising!

planetoryd commented 11 months ago

I made some major changes for my use in another project https://github.com/planetoryd/rumpsteak still wip

loyd commented 11 months ago

What's the difference tldr with upstream?

planetoryd commented 11 months ago

Code snippet from what I'm writing.

#[choices]
#[derive(Message, Serialize, Deserialize, Clone)]
pub enum Msg {
    CtrlMsg(),
}

#[derive(From, Wrapper, Serialize, Deserialize, Clone)]
pub struct FD(RawFd);

#[choices]
#[derive(Serialize, Deserialize, Clone)]
pub enum CtrlMsg {
    SubjectMsg(),
}

#[choices]
#[derive(Serialize, Deserialize, Clone)]
pub enum SubjectMsg {
    Initiate(SubjectName, InitialParams),
    GC(SubjectName),
}

type FramedChan = Framed<UnixStream, LengthDelimitedCodec>;

/// Client endpoint that connects the server
#[derive(Role)]
pub struct Client(
    #[route(Server, Msg)] FramedChan,
    #[route(Server, FD)] FDStream,
);

/// The main endpoint of daemon.
#[derive(Role)]
pub struct Server(
    #[route(Client, Msg)] FramedChan,
    #[route(Client, FD)] FDStream,
);

#[derive(Wrapper, From)]
pub struct FDStream(UnixStream);

#[session]
pub struct Ctrl(Select<Server, FramedChan>);

impl<'k> ChoiceB<'k> for SubjectMsg {
    type Brancher = Server;
    type Selector = Client;
    #[session('k Self::Selector)]
    type SelectorSession = Select<Server, SubjectMsgBranch>;
    #[session('k Self::Brancher)]
    type BrancherSession = <Self::SelectorSession as FullDual<'k, Client, Server>>::Dual;
}

impl<'k> ChoiceB<'k> for InitiateVariant {
    type Selector = Client;
    type Brancher = Server;
    #[session('k Self::Selector)]
    type SelectorSession = Send<Server, FDStream, End>;
    #[session('k Self::Brancher)]
    type BrancherSession = <Self::SelectorSession as PartialDual<'k, Client, Server>>::Dual<
        <Ctrl<'k, Client> as FullDual<'k, Client, Server>>::Dual,
    >;
}

impl<'k> ChoiceB<'k> for GCVariant {
    type Brancher = Server;
    type Selector = Client;
    #[session('k Self::Selector)]
    type SelectorSession = End;
    #[session('k Self::Brancher)]
    type BrancherSession = <Self::SelectorSession as PartialDual<'k, Client, Server>>::Dual<
        <Ctrl<'k, Client> as FullDual<'k, Client, Server>>::Dual,
    >;
}

impl Sending<RawFd> for FDStream {
    type Fut<'x> = impl Future<Output = Result<(), std::io::Error>>;
    type Error = std::io::Error;
    fn send(&mut self, item: RawFd) -> Self::Fut<'_> {
        async move { self.0.send_fd(item).await }
    }
}

impl Recving<RawFd> for FDStream {
    type Fut<'x> = impl Future<Output = Result<Option<RawFd>, Self::Error>>;
    type Error = std::io::Error;
    fn recv(&mut self) -> Self::Fut<'_> {
        async move { self.0.recv_fd().await.map(|k| Some(k)) }
    }
}