google-deepmind / reverb

Reverb is an efficient and easy-to-use data storage and transport system designed for machine learning research
Apache License 2.0
700 stars 93 forks source link

Feature request: timeout argument for `client.sample` #18

Open gehring opened 3 years ago

gehring commented 3 years ago

Similarly to #4, it would be useful to be able to back out of sampling without needing to wrap things in a thread or use an executor. I agree that in many cases, you'd want to sample asynchronously to maximize throughput, but there are cases where the predictability and simplicity are preferable even if comes at the expense of efficiency (e.g., in research). A timeout argument would simplify the synchronous setting without sacrificing safeguards from indefinite waiting.

tfboyd commented 3 years ago

We have worked on this a couple of times and we want to resolve it. We do not have an ETA.

ebrevdo commented 3 years ago

The Reverb datasets do allow a rate_limiter_timeout_ms argument; is that sufficient for your needs?

gehring commented 3 years ago

I had to move away from reverb for now since I had trouble working with states with variable shape (I'm not sure if that is supported but it seemed possible to me at time) but I'll can try to recall as best I can the context for this.

If you're referring to ReplayDataset, then I don't believe it would have worked for me due to the explicit shape argument. Also, I believe what I was hoping to do is to avoid explicitly checking if there are enough samples, using the timeout as sort of a hack. The context for this is one where you have several problem/env instances generating samples to be stored in separate buffer. Batches would be sampled from a random buffer but some buffers might not have seen enough samples. Skipping updates when batches timeout would have made handling things simple regardless of whether updates were done synchronously or asynchronously. Dealing with the iterator closing every time would likely have been a bit awkward for that use case though it probably could have been workable.

That being said, I want to acknowledge that it's very possible that I was trying to use reverb to do things it wasn't meant to support! I wouldn't be offended if that was the gist of your reply ;)

ebrevdo commented 3 years ago

The new Trajectory{Writer,Dataset} do provide more flexibility; it's also possible to have variable shaped tensors as long as you're not concatenating them into time trajectories.

That said, the python client sampler lacks a timeout argument - only the Datasets allow it. So I'll leave this open until we propagate the rate_limiter_timeout_ms into the python API for the python sampler.

varshneydevansh commented 1 year ago

Is this still pending? Can I look into this?

acassirer commented 1 year ago

The FR is still open so feel free to take a look! Just a warning though that the implementation is a bit involved so it isn't quite as just "propagating an argument".

varshneydevansh commented 1 year ago

Hi @acassirer,

I sat last night and tried to understand that the problem encountered in the #4 is a common one in distributed systems or client-server architectures: how to handle timeouts gracefully when the server goes offline or becomes unresponsive. When using the Reverb library, as the example shows, the client can indeed get blocked indefinitely if the server crashes or is terminated.

Our current workaround was using the server_info method to check the server's status before calling create_item makes sense but, as we've noted, it is not completely foolproof.

The provided suggestion from the supposed response regarding using concurrent.futures for offloading the create_item call to a separate thread is a valid approach. The main advantage there is that we can set a timeout on the thread's execution, and if the server goes down, the timeout will trigger, allowing your client to handle the situation gracefully.

The updated Reverb API seems to provide a method called flush for the TrajectoryWriter which has a built-in timeout. This is ideal because it means that timeout functionality is natively supported by the library. You can set a timeout for the flush operation using the timeout_ms argument. If the operation doesn't complete within the timeout, a DeadlineExceededError is raised. This is a much cleaner and efficient way to handle timeouts compared to using a separate thread for each call.


In this FR do, we have to modify the https://github.com/deepmind/reverb/blob/58f5f018082860caa4057d24d75d725709dcd2bb/reverb/client.py#L345

def sample(
   timeout: Optional[float] = None,

somewhat like this and then maybe we have to modify the RPC call related to this method?

acassirer commented 1 year ago

Hey,

The sampler in the c++ layer have to be updated and the pybind layer have to be modified accordingly. The RPC call might have to change in two ways:

  1. You would have to make use of the rate limiter timeout ms. This only applies when the server is up and you are blocked on the lack of data rather than the server being offline.
  2. The gRPC context probably have to be configured so that the call is terminated if the server cannot be found within the deadline.
varshneydevansh commented 1 year ago

https://github.com/deepmind/reverb/blob/58f5f018082860caa4057d24d75d725709dcd2bb/reverb/cc/sampler.cc#L198

https://github.com/deepmind/reverb/blob/58f5f018082860caa4057d24d75d725709dcd2bb/reverb/cc/sampler.h#L127

 std::pair<int64_t, absl::Status> FetchSamples(
      internal::Queue<std::unique_ptr<Sample>>* queue, int64_t num_samples,
      absl::Duration rate_limiter_timeout) override {
    std::unique_ptr<grpc::ClientReaderWriterInterface<SampleStreamRequest,
                                                      SampleStreamResponse>>

absl::Duration rate_limiter_timeout

the timeout is already being set -

      request.mutable_rate_limiter_timeout()->set_milliseconds(
          NonnegativeDurationToInt64Millis(rate_limiter_timeout));

This is what I modified in the sampler.cc

  std::pair<int64_t, absl::Status> FetchSamples(
      internal::Queue<std::unique_ptr<Sample>>* queue, int64_t num_samples,
      absl::Duration rate_limiter_timeout) override {
    std::unique_ptr<grpc::ClientReaderWriterInterface<SampleStreamRequest,
                                                      SampleStreamResponse>>
        stream;
    {
      absl::MutexLock lock(&mu_);
      if (closed_) {
        return {0, absl::CancelledError("`Close` called on Sampler.")};
      }
      context_ = std::make_unique<grpc::ClientContext>();
      context_->set_wait_for_ready(false);

      // Setting the deadline for the gRPC context
      context_->set_deadline(absl::ToChronoTime(absl::Now() + rate_limiter_timeout));

      stream = stub_->SampleStream(context_.get());
    }
acassirer commented 1 year ago

That does indeed look sensible with the exception that the time it takes to connect is not zero so there is a potential issue when you establish a connection only to have the gRPC deadline expire before the rate limiter returns a valid sample. This would result in the data being lost as it is successfully sampled from the table but never returned to the caller.

varshneydevansh commented 1 year ago

So should I do something like this?

       // Buffer time to account for connection overhead
       constexpr auto CONNECTION_BUFFER_TIME = std::chrono::milliseconds(50); // Or some other suitable value

      // Setting the deadline for the gRPC context
      context_->set_deadline(absl::ToChronoTime(absl::Now() + rate_limiter_timeout + CONNECTION_BUFFER_TIME));
acassirer commented 1 year ago

Yes I think something like that would be reasonable. The important thing will be to add test coverage in the C++ layer and Python layer.

varshneydevansh commented 1 year ago

So, beside the test coverage, are there any other changes, do I have to make changes?

I am also opening the PR so that you can guide me better. =)

acassirer commented 1 year ago

You would have to change the pybind layer as well of course in order to expose it to Python. Then there is the story with the datasets in which this MUST NOT be enabled.

Then the test coverage will show more of whether this solution is working or not.

varshneydevansh commented 1 year ago

Reading about pybind11 https://buildmedia.readthedocs.org/media/pdf/pybind11/stable/pybind11.pdf