grpc / grpc-dart

The Dart language implementation of gRPC.
https://pub.dev/packages/grpc
Apache License 2.0
861 stars 271 forks source link

feat: allow to make future chain request with ResponseFuture #489

Closed amondnet closed 2 years ago

amondnet commented 3 years ago

resolve https://github.com/grpc/grpc-dart/issues/413

linux-foundation-easycla[bot] commented 3 years ago

CLA Signed

The committers are authorized under a signed CLA.

esenmx commented 3 years ago

Can we merge this @mraleph ?

andredev24 commented 3 years ago

Any news about this?

mraleph commented 2 years ago

I think instead of adding support for chaining ResponseFuture objects we could allow specifying response interceptors separate from the call interceptors. See draft PR here: https://github.com/grpc/grpc-dart/pull/548

Let me know if that would solve all use cases.

andredev24 commented 2 years ago

@mraleph It seems to me the best option and the way to go.

savy-91 commented 2 years ago

I think instead of adding support for chaining ResponseFuture objects we could allow specifying response interceptors separate from the call interceptors. See draft PR here: #548

Let me know if that would solve all use cases.

While it might cover a lot of use cases, it does not cover my specific need.

What I needed this for, was implementing a retry mechanism for permissionDenied errors. The backend this interacts with is not always consistent and it can be that the jwt token I use to authenticate the client with the backend does not immediately contain all the required entitlements when a request is being made, therefore I have to retry the same request refreshing the jwt token before each retry.

  @override
  grpc.ResponseFuture<R> interceptUnary<Q, R>(
      grpc.ClientMethod<Q, R> method,
      Q request,
      grpc.CallOptions options,
      grpc.ClientUnaryInvoker<Q, R> invoker) {

    final grpc.ResponseFuture<R> response = invoker(method, request, options);

    return response.responseCatchError(
      (Object e, StackTrace sT) async {
        logger.error(method: 'responseCatchError', message: e);
        if (_unAuthRetryCount++ < 4) {
          final grpc.CallOptions newOptions = _onRetryOptions().mergedWith(options);
          _unAuthRetryCount++;
          return await interceptUnary(method, request, newOptions, invoker);
        }
        throw e;
      },
      test: (Object error) {
        return error is grpc.GrpcError &&
            error.code == grpc.StatusCode.permissionDenied ||
                error.code;
      },
    ).responseThen((dynamic r) {
      _unAuthRetryCount = 0;
      return r;
    });
  }

Because this is a common scenario with all the microservices the app interacts with, I wanted to solve it with a generic interceptor like this. It is totally possible that this is not the "right" approach or even a good one, so if you would have solved this differently, I am all ears!

mraleph commented 2 years ago

@savy-91 your code actually illustrates why naively chaining ResponseFuture is problematic.

response.responseCatchError(...) returns ResponseFuture associated with original remote call even if you catch error and retry the call. so if you write something that tries to access trailers/headers through the original ResponseFuture you are going to get the wrong result back:

    final response = stub.someMethod(request);
    final headers = await response.headers;  // Returns headers of the original call and not retried one.
    response.cancel();  // Cancels the wrong call.

I think the only way to make this work is to change how call is wrapped into a ResponseFuture. As I have said before I think collapsing call and response future into a single entity was a mistake of the original design - unfortunately I don't see a good way to disentangle this without a breaking change.

Within the current API I think we might need a special implementation of ResponseFuture which is wrapping other ResponseFuture(s) rather then the call and routes cancel, get headers & get trailers in a special way.

savy-91 commented 2 years ago

@mraleph at least that snipped it serves a purpose, how not do to it! Thanks for pointing that out.

Am I right to jump to the conclusion that right now, without significant rewrite, the only decent way of implementing something like in the snippet is from "outside"? Something along these lines:

final response = await retryFunction(() async => await stub.someMethod(request));

Would it be unthinkable to break the current API and release a major version with the new wisdom gained over the past years?

mraleph commented 2 years ago

Am I right to jump to the conclusion that right now, without significant rewrite, the only decent way of implementing something like in the snippet is from "outside"?

I think you could do the following:

class ResponseFutureImpl<R> extends DelegatingFuture<R>
    implements ResponseFuture<R> {
  Response? pendingCall;

  final Completer<R> _result;
  final _headers = Completer<HeadersMap>();
  final _trailers = Completer<HeadersMap>();

  ResponseFutureImpl._(this._result)
      : super(_result.future);

  ResponseFutureImpl()
      : this._(Completer<R>());

  void complete(ResponseFuture<R> other) {
    _result.complete(other);
    _headers.complete(other.headers);
    _trailers.complete(other.trailers);
  }

  @override
  Future<void> cancel() async {
    await pendingCall?.cancel();
  }

  @override
  Future<Map<String, String>> get headers => _headers.future;

  @override
  Future<Map<String, String>> get trailers => _trailers.future;
}

class RetryingInterceptor implements ClientInterceptor {
  @override
  ResponseFuture<R> interceptUnary<Q, R>(ClientMethod<Q, R> method, Q request,
      CallOptions options, ClientUnaryInvoker<Q, R> invoker) {
    final result = ResponseFutureImpl<R>();

    Future<void> callWithRetry() async {
      for (var retryCount = 0;; retryCount++) {
        final response = invoker(method, request, options);
        result.pendingCall = response;
        try {
          await response;
          // Fall-through. This will forward value to the result.
        } catch (error, st) {
          if (error is GrpcError &&
              error.code == StatusCode.permissionDenied &&
              retryCount < 4) {
            continue;
          }
          // Fall-through. This will forward error to the result.
        } finally {
          result.pendingCall = null;
        }
        result.complete(response);
        return;  // Done.
      }
    }

    callWithRetry();
    return result;
  }
}
mraleph commented 2 years ago

I am going to close this PR because I think that supporting direct chaining on the ResponseFuture has some hidden pitfalls (e.g. one that @savy-91 hit). I think safer solution has to be more explicit in the style of ResponseFutureImpl provided above.