tangle-network / relayer

🕸️ The Webb Relayer Network
https://webb-tools.github.io/relayer/
Apache License 2.0
22 stars 13 forks source link

[TASK] Improve the Internal Queue System for the relayer #452

Closed shekohex closed 1 year ago

shekohex commented 1 year ago

Overview

While working on the docs for the Internal Queue system #390 I noticed that we could improve the queue system so that we can keep track of the item status inside the queue so that the caller could potentially depending on the status, if it is a recoverable error they can retry the item again.

Checklist

salman01zp commented 1 year ago

This will require us to store Item and ItemStatus in sled_db to check their status. We can create QueueItem which can store both item data and its status. We can use Key:QueueKey to access/peek the QueueItem CC: @shekohex

/// A Queue item
struct QueueItem<T: Serialize + DeserializeOwned + Clone> {
    data: T,
    status: ItemStatus,
}

/// The status of the item in the queue.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum ItemStatus {
    /// The item is in the queue, and can be dequeued.
    Queued,
    /// The item is being processed, and should not be dequeued.
    Processing,
    /// The item is processed, and should be removed from the queue.
    Processed,
}

pub trait QueueKey {
    /// The Queue name, used as a prefix for the keys.
    fn queue_name(&self) -> String;
    /// an _optional_ different key for the same value stored in the queue.
    ///
    /// This useful for the case when you want to have a direct access to a specific key in the queue.
    /// For example, if you want to remove an item from the queue, you can use this key to directly
    /// remove it from the queue.
    fn item_key(&self) -> Option<[u8; 64]>;
}

pub trait QueueStore
{
    /// The type of the queue key.
    type Key: QueueKey;
    /// Insert an item into the queue.
    fn enqueue_item(&self, key: Self::Key, item: QueueItem) -> crate::Result<()>;
    /// Get an item from the queue, and removes it.
    fn dequeue_item(&self, key: Self::Key) -> crate::Result<Option<Item>>;
    /// Get an item from the queue, without removing it.
    fn peek_item(&self, key: Self::Key) -> crate::Result<Option<Item>>;
    /// Check if the item is in the queue.
    fn has_item(&self, key: Self::Key) -> crate::Result<bool>;
    /// Remove an item from the queue.
    fn remove_item(&self, key: Self::Key) -> crate::Result<Option<Item>>;
}
shekohex commented 1 year ago

Yes, there should be some way to tell the item status, However, here is a better spec:

/// A Queue item that wraps the inner item and maintains its state.
struct QueueItem<T> {
    /// The inner value wrapped by the Queue Item.
    inner: T,
    /// The current state of the item in the queue.
    state: QueueItemState,
}

impl<T> QueueItem<T> {
    /// Creates a new QueueItem with the provided inner value.
    pub fn new(inner: T) -> Self {
        Self {
            inner,
            state: Default::default(),
        }
    }

    /// Returns the state of the QueueItem.
    pub fn state(&self) -> QueueItemState {
        self.state.clone()
    }

    /// Unwraps the QueueItem and returns the inner value.
    pub fn unwrap(self) -> T {
        self.inner
    }
}

/// The status of the item in the queue.
#[derive(Debug, Clone, Eq, PartialEq, Default, Serialize, Deserialize)]
pub enum QueueItemState {
    /// The current item is pending and waiting in the queue to be dequeued and processed.
    #[default]
    Pending,
    /// The item is being processed.
    Processing {
        /// A meaningful step for the current item state.
        step: String,
        /// A meaningful progress percentage for the current item state (0 to 1).
        progress: Option<f32>,
    },
    /// The item failed to be processed.
    Failed {
        /// The error message.
        reason: String,
    },
}

pub trait QueueKey {
    /// Returns the queue name, used as a prefix for the keys.
    fn queue_name(&self) -> String;
    /// Returns an optional different key for the same value stored in the queue.
    /// This can be considered as a pointer to the key that refers to the actual value.
    /// `item_key -> actual_item_key -> item_value`.
    ///
    /// This is useful when you want to have direct access to a specific key in the queue.
    /// For example, if you want to remove an item from the queue, you can use this key to directly
    /// remove it from the queue.
    fn item_key(&self) -> Option<[u8; 64]>;
}

/// A Queue Store is a trait that helps store items in a queue.
/// The queue is a FIFO queue that can store anything that can be serialized.
///
/// There is a simple API to get the items from the queue, such as in a background task.
pub trait QueueStore<Item>
where
    Item: Serialize + DeserializeOwned + Clone,
{
    /// The type of the queue key.
    type Key: QueueKey;
    /// Inserts an item into the queue.
    fn enqueue_item(&self, key: Self::Key, item: Item) -> crate::Result<()>;
    /// Gets an item from the queue and removes it.
    ///
    /// ### Note
    /// The dequeue operation will remove the item from the queue,
    /// which means that you cannot peek at the item or track its state.
    ///
    /// However, a better approach is to peek at an item and start processing it.
    /// Once done, you can remove it from the queue.
    fn dequeue_item(&self, key: Self::Key) -> crate::Result<Option<Item>>;
    /// Gets an item from the queue without removing it.
    fn peek_item(&self, key: Self::Key) -> crate::Result<Option<Item>>;
    /// Checks whether the item is in the queue.
    ///
    /// To check whether an item is inside the queue or not, you MUST provide the [`QueueKey::item_key`].
    /// Otherwise, the implementation does not know which item to check.
    fn has_item(&self, key: Self::Key) -> crate::Result<bool>;
    /// Removes an item from the queue.
    ///
    /// To remove an item from the queue, you MUST provide the [`QueueKey::item_key`].
    /// Otherwise, the implementation does not know which item to remove.
    fn remove_item(&self, key: Self::Key) -> crate::Result<Option<Item>>;

    /// Updates an item in the queue in-place.
    ///
    /// To update an item in the queue, you MUST provide the [`QueueKey::item_key`].
    /// Otherwise, the implementation does not know which item to update.
    fn update_item<F>(&self, key: Self::Key, f: F) -> crate::Result<bool>
    where
        F: FnMut(&mut Item) -> crate::Result<()>;
}

Let me know what do you think? and feel free to ask any questions if anything is not clear.

salman01zp commented 1 year ago

Should observer which enqueues watch for item key updates.. ? I think it should just enqueue items and process next events for anchor_update_proposals and governor_update_flow.

For private_tx, yes it should watch for Item status update and retry if recoverable. I will create separate task for processing private tx thought queue and do necessary changes as required

shekohex commented 1 year ago

For private_tx, yes it should watch for Item status update and retry if recoverable. I will create separate task for processing private tx thought queue and do necessary changes as required

Yes, currently, we would utilize this new changes only for Transaction Queues.