davechallis / ocypod

Ocypod is a Redis-backed service for orchestrating background jobs. Clients/workers can be written in any language, using HTTP/JSON to queue/fetch jobs, store results, etc.
Apache License 2.0
194 stars 14 forks source link

Improper isolation using async_transaction? #26

Closed Parth closed 2 years ago

Parth commented 2 years ago

I've spent the last 2 days diving into the details of redis connections and how they relate to transactions. I've been trying to figure out what the right way to setup an async-web-server that talks to redis would be. So I've learned about the various connection strategies available to me (bb8, deadpool, and MultiplexedConnections). While doing so I came across your comment, and after some testing have determined that this is only safe to use if the async_transaction! has exclusive ownership of the connection it's using.

In this project, however, I'm pretty sure using a ManagedConnection like this: https://github.com/davechallis/ocypod/blob/d965c4a97f93ccd69572d98c926a7ca151da2243/src/bin/ocypod-server.rs#L37-L44 does not guarantee your various async execution contexts exclusive access over the underlying connection.

And so connections cloned by actix like such: https://github.com/davechallis/ocypod/blob/d965c4a97f93ccd69572d98c926a7ca151da2243/src/bin/ocypod-server.rs#L54 are just going to be sharing that same underlying connection.

maybe the red herring: https://github.com/davechallis/ocypod/blob/d965c4a97f93ccd69572d98c926a7ca151da2243/src/bin/ocypod-server.rs#L44 ConnectionManager doesn't actually perform any pooling, it just exposes a MultiplexedConnection that re-connects automatically.

I hope I'm wrong about all this and I don't understand some detail regarding the way these connections are managed (or some detail about actix for that matter, I'm using warp). But I'm pretty sure what's happening under the hood is the following:

If we were getting actual transaction isolation, you would expect the following code to infinite loop:

use redis::{pipe, AsyncCommands, RedisResult};

macro_rules! async_transaction {
    ($conn:expr, $keys:expr, $body:expr) => {
        loop {
            redis::cmd("WATCH").arg($keys).query_async($conn).await?;

            if let Some(response) = $body {
                redis::cmd("UNWATCH").query_async($conn).await?;
                break response;
            }
        }
    };
}

#[tokio::main]
async fn main() -> RedisResult<()> {
    let mut conn1 = redis::Client::open("redis://127.0.0.1:6379")?
        .get_multiplexed_tokio_connection() // This is what connection manager uses under the hood
        .await?;

    conn1.set("hello", 2).await?;

    let (_,): (isize,) = async_transaction!(&mut conn1, &["hello"], {
        println!("outer closure runs");
        let value: i32 = conn1.get("hello").await?;
        println!("GET `hello`: {}", value);

        {
            let mut conn2 = conn1.clone(); // This is a second, concurrent request's connection cloned by actix.
            let (_,): (isize,) = async_transaction!(&mut (conn2.clone()), &["hello"], {
                println!("inner closure run");
                let value: i32 = conn2.get("hello").await?;
                println!("GET `hello`: {}", value);
                pipe().atomic()
                    .set("hello", value + 1).ignore()
                    .get("hello")
                    .query_async(&mut conn2)
                    .await? // un-watches the keys from the outer async_transaction
            });
        }

        pipe().atomic()
                    .set("hello", value + 1).ignore()
                    .get("hello")
                    .query_async(&mut conn1)
                    .await? // This should fail, and try again
    });

    let value: i32 = conn1.get("hello").await?;
    println!("value in redis: {}", value);

    Ok(())
}

But it completes with a final value of 3, demonstrating a race condition.

If however the inner async_transaction gets it's own connection:

- let mut conn2 = conn1.clone();
+ let mut conn2 = redis::Client::open("redis://127.0.0.1:6379")?
        .get_multiplexed_tokio_connection()
        .await?;

Then this code infinite loops as expected.

So my current take-away, for the types of applications we're building, is that we need to use some sort of connection pooling, and perhaps we'd want to hook into deadpools post-recycle hook, in order to be sure that we're not watching some keys from a previous connection that returned it to the pool without un-watching all it's keys.

I'd love to get your thoughts on the situation as the architecture of my application seems similar to yours.

davechallis commented 2 years ago

Yup, good spot, I think you're right.

I haven't had much time to look into it yet, but I remember that the StackExchange.Redis client deals with this, and handles transactions in multiplexed connections.

I haven't looked at the code yet, but might be worth seeing if they've got any useful approach to consider.

Otherwise, where transactions are involved, either some form of mutex (to avoid interspersed WATCHes) or use of connection pooling would work here. The latter is probably the simplest approach at first glance.

davechallis commented 2 years ago

Fixed in 43cd03fd8d0f721e9b17281df7aee4784ec671f4.