DavidBM / rsmq-async-rs

RSMQ port to async rust. RSMQ is a simple redis queue system that works in any redis v2.6+
MIT License
43 stars 8 forks source link

Error when setting maxsize to -1 #18

Closed kengreim closed 5 months ago

kengreim commented 5 months ago

I get this error when I try to set the rsmq maxsize to -1 for unlimited message size using something like rsmq.set_queue_attributes(shared::DATAFEED_QUEUE_NAME, None, None, Some(-1))

Redis error: Response was of incompatible type - TypeError: "Could not convert from string." (response was string-data('"-1"'))

From further testing, it looks like this is being caused by get_queue_attributes here: https://github.com/DavidBM/rsmq-async-rs/blob/42fb50b9329a95a84e01366ce27cfa1818c79821/src/functions.rs#L458

I think the issue in get_queue_attributes is caused by this call: https://github.com/DavidBM/rsmq-async-rs/blob/42fb50b9329a95a84e01366ce27cfa1818c79821/src/functions.rs#L239

Which stems from maxsize in RsmqQueueAttributes being set as a u64 and not a signed int of some sort, I think? https://github.com/DavidBM/rsmq-async-rs/blob/42fb50b9329a95a84e01366ce27cfa1818c79821/src/types.rs#L70

Note that in QueueDescriptor, maxsize is an i64. send_message ultimately reads into this type when actually checking whether or not the message is too long.

kengreim commented 5 months ago

Here is the bastardization of get_queue_attributes to support maxsize as an i64 in RsmqQueueAttributes struct. Sorry, would raise a PR but I need to figure out the testing. Also, the other alternative would be to get the Redis result as a String vector and do all the parsing to ints after, which is what you do in get_queue for the QueueDescriptor type.


pub async fn get_queue_attributes(
        &self,
        conn: &mut T,
        qname: &str,
    ) -> RsmqResult<RsmqQueueAttributes> {
        let key = format!("{}{}", self.ns, qname);

        let time: (u64, u64) = redis::cmd("TIME").query_async(conn).await?;

        let result: ((Option<u64>, Option<u64>, Option<i64>, Option<u64>, Option<u64>, Option<u64>, Option<u64>), u64, u64) = pipe()
            .atomic()
            .cmd("HMGET")
            .arg(format!("{}:Q", key))
            .arg("vt")
            .arg("delay")
            .arg("maxsize")
            .arg("totalrecv")
            .arg("totalsent")
            .arg("created")
            .arg("modified")
            .cmd("ZCARD")
            .arg(&key)
            .cmd("ZCOUNT")
            .arg(&key)
            .arg(time.0)
            .arg("+inf")
            .query_async(conn)
            .await?;

        if result.0.0.is_none() 
            || result.0.1.is_none()
            || result.0.2.is_none()
            || result.0.3.is_none()
            || result.0.4.is_none()
            || result.0.5.is_none()
            || result.0.6.is_none() {
            return Err(RsmqError::QueueNotFound);
        }

        Ok(RsmqQueueAttributes {
            vt: result
                .0
                .0
                .map(Duration::from_millis)
                .unwrap_or(Duration::ZERO),
            delay: result
                .0
                .1
                .map(Duration::from_millis)
                .unwrap_or(Duration::ZERO),
            maxsize: result.0.2.unwrap_or(0),
            totalrecv: result.0.3.unwrap_or(0),
            totalsent: result.0.4.unwrap_or(0),
            created: result.0.5.unwrap_or(0),
            modified: result.0.6.unwrap_or(0),
            msgs: result.1,
            hiddenmsgs: result.2,
        })
    }
DavidBM commented 5 months ago

Hi! Thank you very much for the report! I just released version 8.0.2 that should fix that issue.

kengreim commented 5 months ago

Thanks! And I just looked at your code…it’s way better than my attempt. 😅 Been a while since I’ve worked with a language at this low of a level, but it’s fun.