grpc / grpc-dart

The Dart language implementation of gRPC.
https://pub.dev/packages/grpc
Apache License 2.0
853 stars 268 forks source link

Interceptor for handling response #413

Open rockerhieu opened 3 years ago

rockerhieu commented 3 years ago

Can we use ClientInterceptor for handling common use cases or errors from the response?

len8657 commented 3 years ago

thank!

mraleph commented 3 years ago

The question is rather ambiguous, so I don't know whether to say yes or no. Interceptor is called before you make the call and also can listen for responses because it has access to ResponseFuture/ResponseStream. Whether that's enough for what you are trying to achieve or not - I don't know, it depends on what exactly you are trying to achieve. If you would like a more concrete answer, you would have to ask a more concrete questions.

@len8657 the compilation error you have previously posted indicate that you are trying to use gRPC-Web from native Flutter application. This is not a support use case currently. Use native gRPC instead (make sure you import package:grpc/grpc.dart and not package:grpc/grpc-web.dart)

len8657 commented 3 years ago

The question is rather ambiguous, so I don't know whether to say yes or no. Interceptor is called before you make the call and also can listen for responses because it has access to ResponseFuture/ResponseStream. Whether that's enough for what you are trying to achieve or not - I don't know, it depends on what exactly you are trying to achieve. If you would like a more concrete answer, you would have to ask a more concrete questions.

@len8657 the compilation error you have previously posted indicate that you are trying to use gRPC-Web from native Flutter application. This is not a support use case currently. Use native gRPC instead (make sure you import package:grpc/grpc.dart and not package:grpc/grpc-web.dart)

Yes! This answer inspired me,I use $createUnaryCall with protobuffer.

I have another problem,I received sends single request called, received response data , can't received single response called.

abstract class ClientInterceptor {
// Intercept unary call.
// This method is called when client sends single request and receives single response.
ResponseFuture interceptUnary<Q, R>(ClientMethod<Q, R> method, Q request,
CallOptions options, ClientUnaryInvoker<Q, R> invoker) {
return invoker(method, request, options);
}

my code

class TPClientInterceptor implements ClientInterceptor {
  final int _id;
  int _unary = 0;
  int _streaming = 0;

  static final List<InterceptorInvocation> _invocations = new List();

  TPClientInterceptor(this._id);

  @override
  ResponseFuture<R> interceptUnary<Q, R>(ClientMethod<Q, R> method, Q request,
      CallOptions options, ClientUnaryInvoker<Q, R> invoker) {
    _invocations.add(InterceptorInvocation(_id, ++_unary, _streaming));
    print('interceptUnary');
    return invoker(method, request, _inject(options));
  }

  CallOptions _inject(CallOptions options) {
    print('_inject');
    return options.mergedWith(CallOptions(metadata: {
      "x-interceptor": _invocations.map((i) => i.toString()).join(', '),
    }));
  }

  @override
  ResponseStream<R> interceptStreaming<Q, R>(
      ClientMethod<Q, R> method,
      Stream<Q> requests,
      CallOptions options,
      ClientStreamingInvoker<Q, R> invoker) {
    _invocations.add(InterceptorInvocation(_id, _unary, ++_streaming));
    print('interceptStreaming');

    return invoker(method, requests, _inject(options));
  }

  static void tearDown() {
    _invocations.clear();
  }
}

  static HousePlanServicesClient getHousePlanServicesClient() {
    var channel = RPCHelper.tpClientChannel;
    Iterable<ClientInterceptor> interceptors = [
      TPClientInterceptor(1),
    ];
    HousePlanServicesClient hc =
        HousePlanServicesClient(channel, interceptors: interceptors);
    return hc;
  }
mraleph commented 3 years ago

@len8657 You need to listen to ResponseFuture/ResponseStream which invoker returns to intercept the response. e.g. instead of return invoker(method, request, _inject(options)); you can do

  final response = invoker(method, request, _inject(options));
  response.then((r) {
    print('got response: $r');
  });
  return response;

Though I see that ResponseFuture breaks composability here because ResponseFuture.then returns a Future and not ResponseFuture, which means you can't write

  return invoker(...).then(transformResponse);

I think I would be happy to take a PR that fixes ResponseFuture.then to return ResponseFuture to enable this sort of interception.

rockerhieu commented 3 years ago

@len8657 You need to listen to ResponseFuture/ResponseStream which invoker returns to intercept the response. e.g. instead of return invoker(method, request, _inject(options)); you can do

  final response = invoker(method, request, _inject(options));
  response.then((r) {
    print('got response: $r');
  });
  return response;

Though I see that ResponseFuture breaks composability here because ResponseFuture.then returns a Future and not ResponseFuture, which means you can't write

  return invoker(...).then(transformResponse);

I think I would be happy to take a PR that fixes ResponseFuture.then to return ResponseFuture to enable this sort of interception.

I should have mentioned this issue when asking the question. Thought there is a different way to do it as ResponseFuture.then returns a Future, that's why I asked if we can use ClientInterceptor to handle the response. Apparently the answer is ~no~ not yet. I can work on the PR.

len8657 commented 3 years ago

@len8657 You need to listen to ResponseFuture/ResponseStream which invoker returns to intercept the response. e.g. instead of return invoker(method, request, _inject(options)); you can do

  final response = invoker(method, request, _inject(options));
  response.then((r) {
    print('got response: $r');
  });
  return response;

Though I see that ResponseFuture breaks composability here because ResponseFuture.then returns a Future and not ResponseFuture, which means you can't write

  return invoker(...).then(transformResponse);

I think I would be happy to take a PR that fixes ResponseFuture.then to return ResponseFuture to enable this sort of interception.

This is working for me

rockerhieu commented 3 years ago

@len8657 You need to listen to ResponseFuture/ResponseStream which invoker returns to intercept the response. e.g. instead of return invoker(method, request, _inject(options)); you can do

  final response = invoker(method, request, _inject(options));
  response.then((r) {
    print('got response: $r');
  });
  return response;

Though I see that ResponseFuture breaks composability here because ResponseFuture.then returns a Future and not ResponseFuture, which means you can't write

  return invoker(...).then(transformResponse);

I think I would be happy to take a PR that fixes ResponseFuture.then to return ResponseFuture to enable this sort of interception.

@mraleph I just submitted a PR for this at #419. Let me know what you think.

mraleph commented 2 years ago

Now that I look at this with fresh eyes I think the instead of trying to allow ResponseFuture chaining we could just split interceptors into interceptors which handle calling transformation and interceptors that handle response transformation, e.g. something along the lines of:

abstract class ClientInterceptor {
  // Intercept unary call.
  // This method is called when client sends single request and receives single response.
  ResponseFuture<R> interceptUnary<Q, R>(ClientMethod<Q, R> method, Q request,
      CallOptions options, ClientUnaryInvoker<Q, R> invoker);

  Future<R> interceptUnaryResponse<R>(ClientMethod<dynamic, R> method, Future<R> response);

  // Intercept streaming call.
  // This method is called when client sends either request or response stream.
  ResponseStream<R> interceptStreaming<Q, R>(
      ClientMethod<Q, R> method,
      Stream<Q> requests,
      CallOptions options,
      ClientStreamingInvoker<Q, R> invoker);

  Stream<R> interceptStreamingResponse<R>(ClientMethod<dynamic, R> method, Stream<R> response);
}

Then we don't need to deal with ResponseFuture gnarlines at all.

Would that be sufficient for user purposes?

mraleph commented 2 years ago

Something like this might be a solution: https://github.com/grpc/grpc-dart/pull/548

aran commented 2 years ago

Workaround & example code (not thoroughly tested):

import 'dart:async';

import 'package:async/async.dart';

import 'package:grpc/grpc_or_grpcweb.dart';
import 'package:grpc/service_api.dart';

class LogoutInterceptor implements ClientInterceptor {
  final void Function() onLoggedOutDetected;

  LogoutInterceptor({required this.onLoggedOutDetected});

  @override
  ResponseStream<R> interceptStreaming<Q, R>(
      ClientMethod<Q, R> method,
      Stream<Q> requests,
      CallOptions options,
      ClientStreamingInvoker<Q, R> invoker) {
    return DelegatingResponseStream<R>(invoker(method, requests, options))
        .handleError((Object error) {
      onLoggedOutDetected();
    }, test: (e) => (e is GrpcError) && e.code == StatusCode.unauthenticated);
  }

  @override
  ResponseFuture<R> interceptUnary<Q, R>(ClientMethod<Q, R> method, Q request,
      CallOptions options, ClientUnaryInvoker<Q, R> invoker) {
    return DelegatingResponseFuture<R>(invoker(method, request, options))
        .catchError((Object error) {
      onLoggedOutDetected();
    }, test: (e) => e is GrpcError && e.code == StatusCode.unauthenticated);
  }
}

class DelegatingResponseFuture<R> extends DelegatingFuture<R>
    implements ResponseFuture<R> {
  Response responseDelegate;

  DelegatingResponseFuture.split(
      this.responseDelegate, Future<R> futureDelegate)
      : super(futureDelegate);

  DelegatingResponseFuture(ResponseFuture<R> delegate)
      : this.split(delegate, delegate);

  @override
  ResponseStream<R> asStream() =>
      DelegatingResponseStream.split(responseDelegate, super.asStream());

  @override
  ResponseFuture<R> catchError(Function onError,
          {bool Function(Object error)? test}) =>
      DelegatingResponseFuture.split(
          responseDelegate, super.catchError(onError, test: test));

  @override
  ResponseFuture<S> then<S>(FutureOr<S> Function(R) onValue,
          {Function? onError}) =>
      DelegatingResponseFuture.split(
          responseDelegate, super.then(onValue, onError: onError));

  @override
  ResponseFuture<R> whenComplete(FutureOr Function() action) =>
      DelegatingResponseFuture.split(
          responseDelegate, super.whenComplete(action));

  @override
  ResponseFuture<R> timeout(Duration timeLimit,
          {FutureOr<R> Function()? onTimeout}) =>
      DelegatingResponseFuture.split(
          responseDelegate, super.timeout(timeLimit, onTimeout: onTimeout));

  @override
  Future<void> cancel() {
    return responseDelegate.cancel();
  }

  @override
  Future<Map<String, String>> get headers => responseDelegate.headers;

  @override
  Future<Map<String, String>> get trailers => responseDelegate.trailers;
}

class DelegatingResponseStream<R> extends DelegatingStream<R>
    implements ResponseStream<R> {
  Response responseDelegate;

  DelegatingResponseStream.split(
      this.responseDelegate, Stream<R> streamDelegate)
      : super(streamDelegate);

  DelegatingResponseStream(ResponseStream<R> delegate)
      : this.split(delegate, delegate);

  @override
  ResponseFuture<R> get single =>
      DelegatingResponseFuture.split(responseDelegate, super.single);

  @override
  ResponseStream<R> handleError(Function onError,
      {bool Function(dynamic error)? test}) {
    return DelegatingResponseStream.split(
        responseDelegate, super.handleError(onError, test: test));
  }

  @override
  Future<void> cancel() async {
    await responseDelegate.cancel();
  }

  @override
  Future<Map<String, String>> get headers => responseDelegate.headers;

  @override
  Future<Map<String, String>> get trailers => responseDelegate.trailers;
}

(Should be OK to add any other Stream methods as needed.)

tihrasguinho commented 1 year ago

Workaround & example code (not thoroughly tested):

import 'dart:async';

import 'package:async/async.dart';

import 'package:grpc/grpc_or_grpcweb.dart';
import 'package:grpc/service_api.dart';

class LogoutInterceptor implements ClientInterceptor {
  final void Function() onLoggedOutDetected;

  LogoutInterceptor({required this.onLoggedOutDetected});

  @override
  ResponseStream<R> interceptStreaming<Q, R>(
      ClientMethod<Q, R> method,
      Stream<Q> requests,
      CallOptions options,
      ClientStreamingInvoker<Q, R> invoker) {
    return DelegatingResponseStream<R>(invoker(method, requests, options))
        .handleError((Object error) {
      onLoggedOutDetected();
    }, test: (e) => (e is GrpcError) && e.code == StatusCode.unauthenticated);
  }

  @override
  ResponseFuture<R> interceptUnary<Q, R>(ClientMethod<Q, R> method, Q request,
      CallOptions options, ClientUnaryInvoker<Q, R> invoker) {
    return DelegatingResponseFuture<R>(invoker(method, request, options))
        .catchError((Object error) {
      onLoggedOutDetected();
    }, test: (e) => e is GrpcError && e.code == StatusCode.unauthenticated);
  }
}

class DelegatingResponseFuture<R> extends DelegatingFuture<R>
    implements ResponseFuture<R> {
  Response responseDelegate;

  DelegatingResponseFuture.split(
      this.responseDelegate, Future<R> futureDelegate)
      : super(futureDelegate);

  DelegatingResponseFuture(ResponseFuture<R> delegate)
      : this.split(delegate, delegate);

  @override
  ResponseStream<R> asStream() =>
      DelegatingResponseStream.split(responseDelegate, super.asStream());

  @override
  ResponseFuture<R> catchError(Function onError,
          {bool Function(Object error)? test}) =>
      DelegatingResponseFuture.split(
          responseDelegate, super.catchError(onError, test: test));

  @override
  ResponseFuture<S> then<S>(FutureOr<S> Function(R) onValue,
          {Function? onError}) =>
      DelegatingResponseFuture.split(
          responseDelegate, super.then(onValue, onError: onError));

  @override
  ResponseFuture<R> whenComplete(FutureOr Function() action) =>
      DelegatingResponseFuture.split(
          responseDelegate, super.whenComplete(action));

  @override
  ResponseFuture<R> timeout(Duration timeLimit,
          {FutureOr<R> Function()? onTimeout}) =>
      DelegatingResponseFuture.split(
          responseDelegate, super.timeout(timeLimit, onTimeout: onTimeout));

  @override
  Future<void> cancel() {
    return responseDelegate.cancel();
  }

  @override
  Future<Map<String, String>> get headers => responseDelegate.headers;

  @override
  Future<Map<String, String>> get trailers => responseDelegate.trailers;
}

class DelegatingResponseStream<R> extends DelegatingStream<R>
    implements ResponseStream<R> {
  Response responseDelegate;

  DelegatingResponseStream.split(
      this.responseDelegate, Stream<R> streamDelegate)
      : super(streamDelegate);

  DelegatingResponseStream(ResponseStream<R> delegate)
      : this.split(delegate, delegate);

  @override
  ResponseFuture<R> get single =>
      DelegatingResponseFuture.split(responseDelegate, super.single);

  @override
  ResponseStream<R> handleError(Function onError,
      {bool Function(dynamic error)? test}) {
    return DelegatingResponseStream.split(
        responseDelegate, super.handleError(onError, test: test));
  }

  @override
  Future<void> cancel() async {
    await responseDelegate.cancel();
  }

  @override
  Future<Map<String, String>> get headers => responseDelegate.headers;

  @override
  Future<Map<String, String>> get trailers => responseDelegate.trailers;
}

(Should be OK to add any other Stream methods as needed.)

in my initial tests it works, but in Streams, if the token is expired the listeners are cancelled, the refreshToken request is called normally, but the listeners don't restart, in Futures it works fine ... .

aran commented 1 year ago

Summarizing this issue and related issues a bit.

  1. There are a couple use cases discussed. Notably: Amending calls (e.g. adding options), amending responses (no concrete example above), adding additional behaviors on certain calls and responses, including error responses (e.g. informing central application code on any authentication error to allow for recovery from a session expiry).
  2. There are two potential API changes discussed: Modifying ResponseFuture classes for smoother composability, or a bigger change to update the API to split request and response handling.
  3. There are a couple workarounds for different use cases: https://github.com/grpc/grpc-dart/issues/413#issuecomment-742405324, https://github.com/grpc/grpc-dart/pull/489#issuecomment-1117204933, and https://github.com/grpc/grpc-dart/issues/413#issuecomment-1277849117, with the latter currently broken in the latest release (https://github.com/grpc/grpc-dart/commit/d9553ca73f66116f7ad14fff5d0e4814253311a4#r118779797)
  4. Chaining creates API issues. (https://github.com/grpc/grpc-dart/pull/419#issuecomment-854542925)
  5. There's an uncommitted fix: https://github.com/grpc/grpc-dart/pull/548

@mraleph @kevmoo - any ideas on anything on the community side or otherwise that could unblock this issue overall?