threefoldtech / rmb-rs

RMB implementation in rust
Apache License 2.0
3 stars 1 forks source link

storage trait and implementation #14

Closed muhamadazmy closed 2 years ago

muhamadazmy commented 2 years ago

Abstract storage trait that hides the backend implementation so in the future we can replace redis with another messaging queue service.

Starting point can be found here https://github.com/threefoldtech/rmb-rs/blob/development-buildclient/src/storage/mod.rs#L10

Suggestion:

As a start, only one implementation of the storage trait will be done that uses redis as a backend. Please use connection pool

muhamadazmy commented 2 years ago

Specifications.

The storage is an interface that is intended to abstract the access to the underlying physical storage of messages. It abstract message queues.

If we go through the sequence diagram in the docs we can extract the functionality of this layer as follows:

pub trait Storage: Clone + Send + Sync + 'static {
    // gets message with ID. This will retrieve the object
    // from backlog.$id. On success, this can either be None which means
    // there is no message with that ID or the actual message.
    async fn get(&self, id: String) -> Result<Option<Message>>;

    // pushes the message to local process (msgbus.$cmd) queue.
    // this means the message will be now available to the application
    // to process.
    //
    // KNOWN ISSUE: we should not set TTL on this queue because
    // we are not sure how long the application will take to process
    // all it's queues messages. So this is potentially dangerous. A harmful
    // twin can flood a server memory by sending many large messages to a `cmd`
    // that is not handled by any application.
    //
    // SUGGESTED FIX: instead of setting TTL on the $cmd queue we can limit the length
    // of the queue. So for example, we allow maximum of 500 message to be on this queue
    // after that we need to trim the queue to specific length after push (so drop older messages)
    async fn run(&self, msg: Message) -> Result<()>;

    // forward stores the message in backlog.$id, and for each twin id in the message
    // destination, a new tuple of (id, dst) is pushed to the forward queue.
    // it also need to set TTL on the `backlog.$id` queue. This will make sure
    // message will be auto-dropped when it times out.
    async fn forward(&self, msg: Message) -> Result<()>;

    // pushes message to `msg.$ret` queue.
    async fn reply(&self, msg: Message) -> Result<()>;

    // gets a message from local queue waits
    // until a message is available
    async fn local(&self) -> Result<Message>;

    // process will wait on both msgbus.system.forward AND msgbus.system.reply
    // and return the first message available with the correct Queue type
    async fn queued(&self) -> Result<QueuedMessage>;
}

Implementation details.

the queued method waits for messages that will be handled by workers hence it tries to BPOP on both msgbus.system.forward and msgbus.system.reply. and then wrap the returned message into the proper return Enum value.

The problem is the reply queue contains a full message object, but the forward message holds only a (id, dst) pair. hence once a value is available, and based on the return queue. the queues message will still do extra work to fully reconstruct the message

Question: Who set the message id?

I was planning to allow the client (caller) to set the message id. But it's growing on me that it's better if the server (rmb) is the one that should generate this ID. Storage then does not care what ID is set on the message. BUT refuses to accept a Message that does not have a valid ID set (empty id)