lunatic-solutions / lunatic-rs

This library contains higher level Rust wrappers for low level Lunatic syscalls.
267 stars 32 forks source link

Primitive for reader processes #22

Closed SquattingSocrates closed 2 years ago

SquattingSocrates commented 2 years ago

Hi! I've been working on some lunatic based projects and noticed that creating "writer" processes which listen to the mailbox and either respond to requests or accept messages is really easy with the new AbstractProcess primitive. Yet I find that "reader" processes that listen not to the mailbox but to some other source (e.g. a TcpStream) are only available via function-based processes instead of trait-based ones. So what I'm proposing is to include another trait like LoopingProcess which simply adds another function like run(mailbox, state: Self::State). Could even be another method on the AbstractProcess trait which defaults to listening to the mailbox

Code for a reader process could then look like this:

impl AbstractProcess for ClientProcess {
    type Arg = TcpStream;
    type State = Self;

    fn init(this: ProcessRef<Self>, stream: Self::Arg) -> Self::State {
        // ... set up stuff for the state (e.g. writer, coordinator, metrics_recorder, packet_reader)
        // ...
        ClientProcess {
            this: this.clone(),
            coordinator,
            writer,
            metrics_recorder,
            packet_reader
        }
    }

    fn run(state: Self::State) {
        loop {
            match state.packet_reader.read() {
                Ok(message) => {
                    state.metrics_recorder.track_new_packet();
                    println!("Received packet {:?}", message);
                    state.writer.respond("Some response");
                }
                Err(err) => panic!("A decoding error ocurred: {:?}", err),
            };
        }
    }
}

In my opinion this has the following benefits:

  1. It allows us to keep logic to manipulate/read state within the ClientProcess struct
  2. Allows for easier reasoning about the logic
  3. We don't have to manually spawn a process and send the context because that's taken care of the trait/struct
  4. Makes testing easier because it encourages one to keep highly cohesive code together in a more natural way

Actually, I already forked the lib and tried to change this (albeit in a dirty way) myself. Here's the commit where I added a run method: https://github.com/lunatic-solutions/rust-lib/compare/main...SquattingSocrates:extend-abstract-process

I can create a PR later if we agree on implementing it this way

bkolobara commented 2 years ago

I don't think we can implement this as part of the AbstractProcess though. The main purpose of the AbstractProcess is to dispatch messages based on the type. This means to allow handling incoming messages with ProcessMessage and ProcessRequest.

If you change the run function implementation, this breaks it. Now defining a ProcessMessage handler and sending a message to this process will actually never trigger the handler, because the process is running in an infinite loop and never processing messages. It may communicate the wrong idea to developers, that they somehow can change the state from inside the handler function.

We could introduce a LoopingProcess trait, but I don't think this trait would be of much use to us. It would only allow us to keep the logic "closer" to the struct, but you can do this with something like:

struct ClientProcess {
     // fields
}

impl ClientProcess {
    pub fn start() {
        Process::spawn(Self{ ... }, |this, _| Self::run(this))
    }

    fn run(state: Self) {
        loop {
            match state.packet_reader.read() {
                Ok(message) => {
                    state.metrics_recorder.track_new_packet();
                    println!("Received packet {:?}", message);
                    state.writer.respond("Some response");
                }
                Err(err) => panic!("A decoding error ocurred: {:?}", err),
            };
        }
    }
}

Notice also that attaching a reader like this to a supervision tree generally doesn't work. You can't restart a failed reader that already consumed some of the TcpStream. It will not have enough context to continue working.

What we could do is abstract this start() implementation behind a LoopingProcess trait so that you only need to implement a fn loop(state: &Self) method that will be called over and over again. This could save some boilerplate, but it's not much code and staying explicit about what is exactly happening here is cool too.

SquattingSocrates commented 2 years ago

Yeah, just using a "run" function close to the struct was my first solution as well. But since you pointed out how the supervision doesn't work with a reader process it really makes little sense to have a trait for a reader process. It would probably be nicer to have some built-in way to define the "run" or "loop" logic, but it's not a necessity I guess. I will continue writing it the way you suggested here and see how that works in the long run.