importcjj / mobc

A generic connection pool for Rust with async/await support
https://docs.rs/mobc
Apache License 2.0
285 stars 42 forks source link

[mobc-redis] PONG as result for con.get() in broken connection #28

Open zupzup opened 4 years ago

zupzup commented 4 years ago

Hey!

I've been using mobc-redis in a project for a while and I found a strange issue, which so far only happened in production (so with a lot of load) and which I haven't been able to reproduce locally.

It seems that in some cases, a connection becomes somehow "corrupted" and only returns PONG on con.get() calls, although the actual value in redis is a valid string. Also deleting that value from redis didn't change anything, as the problem seemed to be at the connection-level.

However, the connection doesn't appear to be broken (as the ping's when recycling it succeed), so the misbehaviour persists through several client-requests, as the connection is successfully recycled.

I had the same issue using the old CMD/GET API.

At first I thought this might happen in very long-running connections, so I reduced the max_lifetime to 30 seconds and that helped, but it still happens (although, as I said, very rarely). And if it happens it's limited to this one connection and stops once the connection is dropped.

I'm using:

mobc-redis = "0.5.1"
mobc = "0.5.7"

This is the config for the pool:

const CACHE_POOL_MAX_OPEN: u64 = 32;
const CACHE_POOL_MAX_IDLE: u64 = 8;
const CACHE_POOL_TIMEOUT_SECONDS: u64 = 5;
const CACHE_POOL_EXPIRE_SECONDS: u64 = 30;

pub async fn connect() -> Result<RedisPool> {
    let client = redis::Client::open(CONFIG.cache.host.as_str()).map_err(RedisClientError)?;
    let manager = RedisConnectionManager::new(client);
    Ok(Pool::builder()
        .max_open(CACHE_POOL_MAX_OPEN)
        .max_idle(CACHE_POOL_MAX_IDLE)
        .get_timeout(Some(Duration::from_secs(CACHE_POOL_TIMEOUT_SECONDS)))
        .max_lifetime(Some(Duration::from_secs(CACHE_POOL_EXPIRE_SECONDS)))
        .build(manager))
}

and this is how e.g. a get works:

async fn get_con(pool: &RedisPool) -> Result<RedisCon> {
    pool.get().await.map_err(RedisPoolError)
}

pub async fn get_str(pool: &RedisPool, key: &str) -> Result<String> {
    let mut con = get_con(&pool).await?;
    let value = con.get(key).await.map_err(RedisCMDError)?;
    FromRedisValue::from_redis_value(&value).map_err(RedisTypeError)
}

So as you can see, it's a pretty basic setup.

I'm not sure if this is an issue with the underlying redis-library, or with recycling connections - it seems the PONG response of the ping on the connection gets stuck somehow?

I tried to locally reproduce it with redis-rs and with mobc-redis by doing billions of get's, but have never seen it. Maybe you have an idea what could be the issue?

Anyway, thanks for this fantastic project, besides this issue (which I fixed by validating the return-value from the get), it's been working great, same for mobc-postgres. :+1:

Any help you could provide would be greatly appreciated.

importcjj commented 4 years ago

Hey! Thanks for your feedback. I tried but failed to reproduce your issue, but I can share some thinking with you.

At the default situation, when getting a connection from the pool, Mobc will always do a health check for the candidate conn. In your code, there are only the get requests, so when it is processing lots of sequential requests, the whole process is like this:

req: PING - GET - PING - GET - PING - GET ..... PING - GET
res: PONG - Val - PONG - Val - PONG - Val ..... PONG - Val

Since I can't reproduce the bug, according to your description, I can only give a hypothesis:

when a conn processing a GET request, it sent a GET command to the server, but before it reading the data from the server, something(we don't know yet) just happened, and the request was interrupted. But data bytes were still here and were read by the next ping command.

In such a case, the conn was still alive, but all commands and responses became mismatched, every GET command got a pong, and every PING command got a certain value. As you can imagine, the process became this:

req: PING - GET - PING - GET - PING - GET ..... PING - GET
res: Val - PONG - Val - PONG - Val ..... PONG - Val
importcjj commented 4 years ago

A new version of Mobc-Redis was just released. Would you please switch to the 0.5.2 in your production environment and see if it still occurs the same strange issue?

zupzup commented 4 years ago

Hey, thanks for the quick response!

Yeah, that could be the reason. I upgraded now to 0.5.2 - thank you - and will deploy that today, I'll post an update then if it still happens (probably sometime next week, as it happens very rarely as I said)

zupzup commented 4 years ago

Update after 1 week. There were no instances of PONG being returned, but today I got the first of these

    src/cache.rs error connecting to redis: pong response error

Which is, IIRC, the condition you put in, which checks if PONG is returned correctly for a ping. So this might be another indicator, that your theory, that responses get out of order is correct, since this exact error is only returned if the response to the ping is not PONG.

I will keep monitoring!

biluohc commented 4 years ago

There may be a bug with redis-rs itself, I also have this, the following log comes from bb8.

save dupcheckval(224359, 39) to redis failed: Response was of incompatible type: \"Could not convert from string.\" (response was status(\"PONG\"))\n","stream":"stdout","time":"2020-07-05T01:44:07.479574585Z"}

biluohc commented 4 years ago

Can you add a take method for mobc::Connection? @importcjj
I can give up the current connection when I found the error is it.

The code is probably like follow

impl<M: Manager> Connection<M> {
    /// Take this Connection from the pool permanently.
    pub fn take(self) -> M {}
}

And this method can help convert the redis connection to PubSub connection: https://docs.rs/redis/0.16.0/redis/aio/struct.Connection.html#method.into_pubsub

khodzha commented 4 years ago

so what is the suggested fix to this problem? if there was pong response error call into_inner() on connection and then drop() it?

pronvis commented 3 years ago

Ive got this too. On conn.hgetall I get: Response type not hashmap compatible" (response was status("PONG")). But I use bb8-redis, not mobc. Looks like problem is in redis-rs, but where... I move to master branch of it, but still have this errors (very rarely, but still)

stiar commented 3 years ago

An example how this behaviour can be reproduced prior to #54 (modified mobc-redis/examples/redis.rs):

use mobc::Pool;
use mobc_redis::redis;
use mobc_redis::RedisConnectionManager;
use redis::AsyncCommands;

const TEST_KEY: &'static str = "mobc::redis::test_queue";

#[tokio::main]
async fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").unwrap();
    let manager = RedisConnectionManager::new(client.clone());
    let pool = Pool::builder().max_open(1).build(manager);

    // Spawn a future that wants to do BLPOP on a list that's currently empty.
    let pool_clone = pool.clone();
    let timed_task = tokio::spawn(tokio::time::timeout(
        std::time::Duration::from_secs(1),
        async move {
            let mut conn = pool_clone.get().await.unwrap();
            let _: redis::Value = conn.blpop(TEST_KEY, 600).await.unwrap();
        },
    ));

    // Nobody pushed data to the list before the future is dropped, but the connection is still
    // alive.
    let _ = timed_task.await;

    // Push some data to the list.
    let mut connection = client.get_async_connection().await.unwrap();
    let _: redis::Value = redis::cmd("RPUSH")
        .arg(&[TEST_KEY, "42"])
        .query_async(&mut connection)
        .await
        .unwrap();

    let mut conn = pool.get().await.unwrap();
    // Oops, ping gets 42.
    //
    // thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Response was of
    // incompatible type: "Response type not string compatible." (response was
    // bulk(string-data('"mobc::redis::test_queue"'), string-data('"42"')))'
    let _: redis::Value = redis::cmd("PING").query_async(&mut *conn).await.unwrap();
}
brianbruggeman commented 3 years ago

I think get_async_connection within the Manager should actually be get_multiplexed_async_connection.