grpc / grpc-swift

The Swift language implementation of gRPC.
Apache License 2.0
2.01k stars 413 forks source link

Interceptor for retrying requests #1997

Open o15a3d4l11s2 opened 1 month ago

o15a3d4l11s2 commented 1 month ago

What are you trying to achieve?

The server sometimes returns errors or throttles the requests, so I want to be able to retry requests.

What have you tried so far?

Update: I also looked into https://github.com/grpc/grpc-swift/blob/945f7f779bbac0a201374a6e70f9d28e534f7cf7/Tests/GRPCTests/InterceptorsTests.swift#L225-L327 but once it retried, it did not get any response out. Also modified it to accommodate for doing N retries with a X seconds delay in between, but got multiple error while using.

Original: I have implemented what I think should be a sufficient interceptor for retrying, but now I start thinking if this is even possible from inside the interceptor itself.

Here is the sample code. The basic idea is that on sending the parts I store them for a possible retrying and if I receive response that is not OK or get a transport error, I just re-send the parts. But the problem is that after I re-send the parts, nothing happens - I get no more interceptor events at all and there is nothing happening at all based on the logs.

import Foundation
import GRPC
import NIO

class RetryingInterceptor<Request, Response>: ClientInterceptor<Request, Response> {
    private let delay: Int64
    private let maxRetries: Int

    private var remainingRetries: Int
    private var initialRequestParts: [GRPCClientRequestPart<Request>] = []

    init(maxRetries: Int, delay: Int64) {
        self.maxRetries = maxRetries
        self.delay = delay
        self.remainingRetries = maxRetries
    }

    override func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?, context: ClientInterceptorContext<Request, Response>) {
        initialRequestParts.append(part)

        context.send(part, promise: promise)
    }

    override func receive(_ part: GRPCClientResponsePart<Response>, context: ClientInterceptorContext<Request, Response>) {
        switch part {
        case .end(let status, _) where !status.isOk && remainingRetries > 0:
            NSLog("TEST Response error, retrying...")
            remainingRetries -= 1
            context.eventLoop.scheduleTask(in: .seconds(delay)) {
                self.retry(context: context)
            }
        default:
            context.receive(part)
        }
    }

    private func retry(context: ClientInterceptorContext<Request, Response>) {
        for part in initialRequestParts {
            context.send(part, promise: nil)
        }
    }

    override func errorCaught(_ error: any Error, context: ClientInterceptorContext<Request, Response>) {
        if remainingRetries > 0 {
            NSLog("TEST Transport error, retrying...")
            remainingRetries -= 1
            context.eventLoop.scheduleTask(in: .seconds(Int64(delay))) {
                self.retry(context: context)
            }
        } else {
            NSLog("TEST Transport error, no more retries.")
            context.errorCaught(error)
        }
    }
}
glbrntt commented 1 month ago

Note that in the interceptor you linked from the tests that when the call is retried it creates a new call into which the buffered messages are sent:

https://github.com/grpc/grpc-swift/blob/945f7f779bbac0a201374a6e70f9d28e534f7cf7/Tests/GRPCTests/InterceptorsTests.swift#L293-L316

In your code you're sending parts on the initial RPC which will be dropped if the RPC has already completed.