grpc / grpc-dotnet

gRPC for .NET
Apache License 2.0
4.17k stars 768 forks source link

Confusion Surrounding Exception Handling in IServerStreamWriter #2475

Closed a-priestley closed 3 months ago

a-priestley commented 3 months ago

Before asking my primary question, I need to give background and clarify some things:

I have set up a token validation interceptor in my project with the capability of validating JWTs passed via request mid stream. Most of the documentation and examples I see surrounding gRPC interceptors in .NET do not go over mid-stream intercepting, but rather for streaming calls, the interceptor is called initially, prior to stream reader/writer processing, and subsequently the stream is allowed to proceed as normal. Whereas what I am looking for is interception per-message within an ongoing stream. Here is a working example of what I am talking about:

    public override async Task<TResponse> ClientStreamingServerHandler<TRequest, TResponse>(
        IAsyncStreamReader<TRequest> requestStream,
        ServerCallContext context,
        ClientStreamingServerMethod<TRequest, TResponse> continuation
    )
    {
        ContextTokenPartitionResource resource = new(context, ExtractTokenFromContext(context));
        TokenHandlingStreamReader<TRequest> validationStreamReader = new(requestStream, resource);
        return await continuation(validationStreamReader, context);
    }

    internal sealed class TokenHandlingStreamReader<T> : IAsyncStreamReader<T>
    {
        private readonly IAsyncStreamReader<T> _inner;
        private readonly ContextTokenPartitionResource _resource;
        private static readonly double _bufferRatio = 0.1;

        internal TokenHandlingStreamReader(
            IAsyncStreamReader<T> inner,
            ContextTokenPartitionResource resource
        )
        {
            _inner = inner;
            _resource = resource;
        }

        public T Current => _inner.Current;

        public async Task<bool> MoveNext(CancellationToken cancellationToken)
        {
            return
                _resource.Jwt.ValidTo
                    + TimeSpan.FromSeconds(
                        (_resource.Jwt.ValidTo - _resource.Jwt.IssuedAt).TotalSeconds * _bufferRatio
                    )
                < DateTime.UtcNow
                ? throw new RpcException(
                    new Status(StatusCode.Unauthenticated, "Token is invalid.")
                )
                : await _inner.MoveNext(cancellationToken);
        }
    }

So the custom implementation of IAsyncStreamReader ensures that the token initially passed is valid for every message the client is sending to the server. Based on the lack of official documentation on this type of interception, I have to ask: should this not be done? And if not, how do we accomplish secure authorization restrictions on long-running streams?

With this out of the way, my primary question is regarding validation going in the other direction (with ServerStreamingServerHandler). Here is my attempt so far:

    public override async Task ServerStreamingServerHandler<TRequest, TResponse>(
        TRequest request,
        IServerStreamWriter<TResponse> responseStream,
        ServerCallContext context,
        ServerStreamingServerMethod<TRequest, TResponse> continuation
    )
    {
        JsonWebToken jwt = ExtractTokenFromContext(context);
        ContextTokenPartitionResource resource = new(context, jwt);
        using RateLimitLease lease = await TryAcquireLimiterLeaseAsync(resource);
        TokenHandlingServerStreamWriter<TResponse> validationStreamWriter =
            new(responseStream, resource.Jwt);
        await continuation(request, validationStreamWriter, context);
    }

    internal sealed class TokenHandlingServerStreamWriter<T> : IServerStreamWriter<T>
    {
        private readonly IServerStreamWriter<T> _inner;
        private readonly JsonWebToken _jwt;
        private static readonly double _bufferRatio = 1;

        internal TokenHandlingServerStreamWriter(IServerStreamWriter<T> inner, JsonWebToken jwt)
        {
            _inner = inner;
            _jwt = jwt;
        }

        public WriteOptions WriteOptions
        {
            get => _inner.WriteOptions;
            set => _inner.WriteOptions = value;
        }

        public async Task WriteAsync(T message)
        {
            TimeSpan buffer = TimeSpan.FromSeconds(
                (_jwt.ValidTo - _jwt.IssuedAt).TotalSeconds * _bufferRatio
            );
            if (_jwt.ValidTo + buffer < DateTime.UtcNow)
            {
                throw new RpcException(new Status(StatusCode.Unauthenticated, "Token is expired."));
            }
            await _inner.WriteAsync(message);
        }
    }

In this example, when the token expires and the RPCException is thrown, the stream does not terminate. It continues as normal, attempting to send messages to the client, throwing the exception again and again, which the client never receives. Why the behavioral difference between server and client sent streams? Is it not possible to terminate an ongoing server-sent stream in this way?

JamesNK commented 3 months ago

Throwing an exception itself doesn't stop a gRPC call on the server. What stops the gRPC call the method exiting, which could be caused by an exception.

If you want the request to immediately terminate you could get the HttpContext from ServerCallContext, call abort on the context, and then throw the exception. Aborting immediately stops the request and triggers the cancellation token.

a-priestley commented 3 months ago

Thanks for the swift response. I've modified the code:

    public override async Task ServerStreamingServerHandler<TRequest, TResponse>(
        TRequest request,
        IServerStreamWriter<TResponse> responseStream,
        ServerCallContext context,
        ServerStreamingServerMethod<TRequest, TResponse> continuation
    )
    {
        JsonWebToken jwt = ExtractTokenFromContext(context);
        ContextTokenPartitionResource resource = new(context, jwt);
        await TryValidateToken(resource.Jwt);
        using RateLimitLease lease = await TryAcquireLimiterLeaseAsync(resource);
        TokenHandlingServerStreamWriter<TResponse> validationStreamWriter =
            new(responseStream, resource.Jwt, context.GetHttpContext());
        await continuation(request, validationStreamWriter, context);
    }

    internal sealed class TokenHandlingServerStreamWriter<T> : IServerStreamWriter<T>
    {
        private readonly IServerStreamWriter<T> _inner;
        private readonly JsonWebToken _jwt;
        private static readonly double _bufferRatio = 1;
        private static HttpContext _ctx;

        internal TokenHandlingServerStreamWriter(
            IServerStreamWriter<T> inner,
            JsonWebToken jwt,
            HttpContext ctx
        )
        {
            _inner = inner;
            _jwt = jwt;
            _ctx = ctx;
        }

        public WriteOptions WriteOptions
        {
            get => _inner.WriteOptions;
            set => _inner.WriteOptions = value;
        }

        public async Task WriteAsync(T message)
        {
            TimeSpan buffer = TimeSpan.FromSeconds(
                (_jwt.ValidTo - _jwt.IssuedAt).TotalSeconds * _bufferRatio
            );
            if (_jwt.ValidTo + buffer < DateTime.UtcNow)
            {
                _ctx.Abort();
                throw new RpcException(new Status(StatusCode.Unauthenticated, "Token is expired."));
            }
            await _inner.WriteAsync(message);
        }
    }

As you said, the stream is terminated, however this is resulting in error code 2 (unknown) being caught in the client. Perhaps I can try setting the status of the context before aborting instead of throwing the exception in the writer -- it may be the case that exceptions are not handled in this type of interceptor the way I would think. I'll experiment a bit more.

Edit: So the POST response (my client is grpc-web) is ending up as NS_ERROR_NET_PARTIAL_TRANSFER, when doing:

                HttpContext httpContext = _ctx.GetHttpContext();
                _ctx.Status = new(StatusCode.Unauthenticated, "Token is expired.");
                httpContext.Abort();

This as far as I am aware is an http error, not a gRPC error. I believe the http response should complete successfully (200), and the RPC exception should be handled on the client as an RPC exception, as would happen when the exception is thrown prior to sending the first message. Maybe grpc-web isn't quite there yet.

a-priestley commented 3 months ago

Maybe grpc-web isn't quite there yet.

Scratch that. I can catch RPC exceptions properly with grpc-web when sent from unary calls, or streams, if the exception is sent prior to any messages being written. The issue seems to be with the interceptor being unable to send an exception in the midst of streaming.

Instead of .abort(), I can also do await httpContext.Response.CompleteAsync(), and that results in a 200 response code on the client, but I am still seeing RPC error code 2 caused by missing trailers.

I may have to go a little deeper to discover what needs to be happening under the hood. The documentation instructs me to simply throw the RPCException, which ultimately results in the exception automatically being attached to the response trailer prior to the connection being closed. Since this does not appear to be working for server-sent streams mid-stream, how might I create the behavior myself? The two possibilities I can see at a glance are to either append a trailer to the http context response, or append the trailer to the server call context. In either case, we'd complete (or abort) the call following this. Is there a specific way to parse out the exception using this library (how do you go from RpcException to a key-value pair for the metadata in the trailer)?

Any tips are greatly appreciated!

a-priestley commented 3 months ago

I was able to get this working with a simple throw in IServerStreamWriter by creating mock messages. The issue I'm having seems to stem from my much more complex implementation for propagating messages with the observer pattern. I'll dive further into it. Closing.