sewenew / redis-plus-plus

Redis client written in C++
Apache License 2.0
1.59k stars 347 forks source link

[QUESTION] Abandon a pipeline after exec has been invoked #585

Open jbrezina opened 1 month ago

jbrezina commented 1 month ago

Describe the problem I have the code which is called in a loop and I need to use the connection pool every iteration. If the single iteration does not finish within the specified time span I need to abandon the results and I'm not sure how to do that. pipeline destructor returns the connection to the pool, but in the next iteration I get an error with access violation. I cannot use discard as it also results in access violation. Is there a safe way how to abandon the in-progress commands on connections from the connection pool?

std::future<void> task;
{
    auto pipeline{ _redis->pipeline(false) };

    task = std::async(std::launch::async, [&]() {
        // filling the pipe with data
        pipeline.set(...);

        auto replies{ pipeline.exec() };
    });

    if (const auto status{ task.wait_for(std::chrono::milliseconds(500)) };
        status == std::future_status::timeout) {
        // abandon the pipeline if it didn't finish within the required time span
        //pipeline.discard();
    }
}

Environment:

sewenew commented 1 month ago

Operations on Pipeline is not thread-safe.

In your case, e.g. calling set and exec in one thread, while calling discard in another thread, you need to use a mutex to protect these operations.

Regards

jbrezina commented 1 month ago

Calling discard caused access violation while writing to the connection in the next iteration Should setting socket timeout solve this issue? Although if I set too small value I cannot establis the connection..

sewenew commented 1 month ago

Calling discard caused access violation while writing to the connection in the next iteration

I think this is caused by race condition, and as I mentioned, you need to use mutex to protect it.

Should setting socket timeout solve this issue?

No, I don't think this is the right solution.

Regards

jbrezina commented 1 month ago

what is here to protect? the exec is executed within the task almost immediately when the task does not finish within the 500ms it is discarded (the commented code, I think there was a problem in swapping the connection options when creating the new connection) and the (new) connection is returned to the pool in the next iteration when the connection is received there is again the access violation while using set on the pipeline there is no race condition between calling exec and discard

sewenew commented 1 month ago

what is here to protect? the exec is executed within the task almost immediately

How can you ensure that all operations on pipeline in the async thread finish in 500ms?

when the task does not finish within the 500ms it is discarded

If the set or exec runs after discard, the behavior is undefined.

Also, if set or exec runs after discard, the pipeline object will go out-of-scope, and again, the behavior is undefined.

You can try the following code to test if it's working:

std::future<void> task;
sw::redis::Pipeline *pipe = nullptr;
std::mutex mtx;
{
    auto pipeline{ _redis->pipeline(false) };
        pipe = &pipeline;

    task = std::async(std::launch::async, [&]() {
                {
                  std::lock_guard<std::mutex> lock(mtx);
                  if (pipe == nullptr) {
                      return;       // pipeline has been discarded.
                  }
          // filling the pipe with data
          pipe->set(...);
                }

                {
                  std::lock_guard<std::mutex> lock(mtx);
                  if (pipe == nullptr) {
                      return;       // pipeline has been discarded.
                  }
          auto replies{ pipe->exec() };
               }
    });

    if (const auto status{ task.wait_for(std::chrono::milliseconds(500)) };
        status == std::future_status::timeout) {
        // abandon the pipeline if it didn't finish within the required time span
                std::lock_guard<std::mutex> lock(mtx);
        pipe->discard();
                pipe = nullptr;
    }
}

Regards

jbrezina commented 1 month ago

How can you ensure that all operations on pipeline in the async thread finish in 500ms?

I know they finish and that is not the issue here even thought I get your point. All set and execute operations are done long before 500ms

If the set or exec runs after discard, the behavior is undefined.

So that's the question, how can I abandon the pipeline? discard should create new connection and return that to the pool, so in the next iteration when I call pipeline on redis instance, I should get the new connection from the pool.

The problem with your code sample is, that it is blocked until the exec is finished because of the mutex and that's exactly what I'm trying to avoid. I wanna abandon the connection the moment wait_for timeouts and I don't care about the results anymore because the next iteration overwrites it all. Furthermore, keeping the pointer to the connection does not have much sense, as the underlying Connection object is swapped in the reconnect function and the local temporary object, which holds the previous operations in progress after the swap, is being destroyed.

sewenew commented 1 month ago

discard should create new connection and return that to the pool, so in the next iteration when I call pipeline on redis instance, I should get the new connection from the pool.

Yes, that's the current behavior.

The problem with your code sample is, that it is blocked until the exec is finished because of the mutex and that's exactly what I'm trying to avoid.

If you don't block until exec finishes, how can you ensure that exec runs before discard? If you cannot ensure it, you'll get race condition.

keeping the pointer to the connection does not have much sense, as the underlying Connection object is swapped in the reconnect function and the local temporary object, which holds the previous operations in progress after the swap, is being destroyed.

Yes, the previous connection has been destroyed, and that's why I reset the pipe pointer after I call discard.

If you don't want to wait for 500ms, you can set ConnectionOptions::socket_timeout to 500ms, or try the async interface.

Regards