grpc / grpc-java

The Java gRPC implementation. HTTP/2 based RPC
https://grpc.io/docs/languages/java/
Apache License 2.0
11.42k stars 3.84k forks source link

Proper handling of server stubs that throw? #2429

Closed jhspaybar closed 6 years ago

jhspaybar commented 7 years ago

Please answer these questions before submitting your issue.

What version of gRPC are you using?

1.0.1

What JVM are you using (java -version)?

1.8

What did you do?

If possible, provide a recipe for reproducing the error. throw new RuntimeException() in any server implementation of an rpc defined in your proto

What did you expect to see?

client gets UNKNOWN, close or cancel to eventually be called in the interceptor chain on the server

What did you see instead?

client gets UNKNOWN, and my interceptor chain stops executing in the halfClose (for Unary requests/responses).

I'm wondering, if some interceptors want/need to do cleanup to do things like emit metrics, modify logging MDC in thread locals and such, what should I expect here and be doing? Is it reasonable to have an exception mapping interceptor that is the last one called right before the actual implementation and if it catches any exception translate it into a close(Status.UNKNOWN, new Metadata()) ? Should I instead have every interceptor catch exceptions in onHalfClose and onMessage and then bubble it after they do their personal cleanup? We had up to this point kept our cleanup logic isolated to close(...) and onCancel().

carl-mastrangelo commented 7 years ago

Not sure how much clean up you want to do, but if you want to change the error returned you can throw a SRE with the actual type you want. The reason the code catches and closes is because that is running on the executor thread that you provided. The exception would bubble up and then fall off the edge of the stack. (In theory that would be the UncaughtExceptionHandler, but by that point the code that threw it is far out of scope).

ServerInterceptor has a ServerCallHandler, which can return a listener. You could hook that I think. I am not sure if it is possible to continue processing the RPC by that point though.

jhspaybar commented 7 years ago

I'm actually okay with the error returned, I'm mostly concerned with how to make sure I always get close or onCancel called in my interceptor. It looks like it's intentional this doesn't happen and I'm fine with that, I'm just kind of brainstorming the right way to recover here. It sounds like catching exceptions in onHalfClose and onMessage may be needed?

carl-mastrangelo commented 7 years ago

This work?

            .addService(ServerInterceptors.intercept(new BlockingServer(), new ServerInterceptor() {

              @Override
              public <ReqT, RespT> Listener<ReqT> interceptCall(
                  ServerCall<ReqT, RespT> call,
                  Metadata headers,
                  ServerCallHandler<ReqT, RespT> next) {

                Listener<ReqT> delegate = next.startCall(call, headers);
                return new ForwardingServerCallListener<ReqT>() {
                  @Override
                  public void onHalfClose() {
                    try {
                      super.onHalfClose();
                    } catch (Exception e) {
                      // Handle stuff here
                    }
                  }

                  @Override
                  protected Listener<ReqT> delegate() {
                    return delegate;
                  }
                };
              }
            }))
jhspaybar commented 7 years ago

Absolutely, and this is what I'm doing. But I'm having to do this per interceptor. I'm wondering if I can have a single interceptor that in catching the exception instead calls super.close(). It sounds like maybe this isn't advised to switch to a different chain and i should stick with the approach shown here per interceptor.

carl-mastrangelo commented 7 years ago

Is the error handling the same per interceptor? Can you make interceptors extend an abstract interceptor? That would cut down on the boiler plate:

public abstract class ErrorHandlingInterceptor extends  ForwardingServerCallListener {
    ...
}

.addService(ServerInterceptors.intercept(new BlockingServer(), new ErrorHandlingInterceptor(actualInterceptor)))

I think you are describing a hierarchical interception something, which I don't think exists in the model.

jhspaybar commented 7 years ago

Okay, here are two interceptors where I just added onHalfClose and onMessage handling to show what this looks like as it becomes more complete. To your point, there is a lambda in both of these cases I could be calling through an abstract base class, so I think I'll pursue that route.

/**
 * Interceptor for logging all clients' accesses similar to Apache access.log.
 * Logging format is as follows,
 * 
 * <pre>
 * {@code
 * [service/method] [client ip] [user agent] [status] [elapsed time] [headers if trace logging is enabled]
 * }
 * </pre>
 *
 * @see ServerInterceptors for instructions on how to add interceptors to a server
 */

public final class AccessLogServerInterceptor implements ServerInterceptor {
    private static final Logger LOG = LoggerFactory.getLogger("AccessLog");

    private static final String UNKNOWN_IP = "unknown-ip";
    private static final String UNKNOWN_USER_AGENT = "unknown-user-agent";

    public static final String GRPC_RPC_NAME_KEY = "grpcRpcName";
    public static final String GRPC_CLIENT_IP_KEY = "grpcClientIp";
    public static final String GRPC_USER_AGENT_KEY = "grpcUserAgent";
    public static final String GRPC_STATUS_KEY = "grpcStatus";
    public static final String GRPC_ELAPSED_MS_KEY = "grpcElapsedMs";
    public static final String GRPC_HEADERS_KEY = "grpcHeaders";

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> call,
                                                                 final Metadata headers,
                                                                 final ServerCallHandler<ReqT, RespT> next) {
        final Stopwatch stopwatch = Stopwatch.createStarted();
        final String clientIp = clientIp(call.attributes().get(ServerCall.REMOTE_ADDR_KEY));
        final String userAgent = Optional.ofNullable(headers.get(GrpcUtil.USER_AGENT_KEY)).orElse(UNKNOWN_USER_AGENT);
        final String serviceRpcName = call.getMethodDescriptor().getFullMethodName();

        final ServerCall<ReqT, RespT> wrappedCall = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
            @Override
            public void close(Status status, Metadata trailers) {
                logCallEnded(serviceRpcName, clientIp, userAgent, status, stopwatch, headers);
                super.close(status, trailers);
            }
        };
        return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(next.startCall(wrappedCall, headers)){
            @Override
            public void onCancel() {
                logCallEnded(serviceRpcName, clientIp, userAgent, Status.CANCELLED, stopwatch, headers);
                super.onCancel();
            }

            @Override
            public void onHalfClose() {
                try {
                    super.onHalfClose();
                } catch (Exception e) {
                    logCallEnded(serviceRpcName, clientIp, userAgent, Status.UNKNOWN, stopwatch, headers);
                    throw e;
                }
            }

            @Override
            public void onMessage(ReqT message) {
                try {
                    super.onHalfClose();
                } catch (Exception e) {
                    logCallEnded(serviceRpcName, clientIp, userAgent, Status.UNKNOWN, stopwatch, headers);
                    throw e;
                }
            }
        };
    }

    private String clientIp(@Nullable final SocketAddress socketAddress) {
        if (socketAddress == null) {
            return UNKNOWN_IP;
        }

        if (!(socketAddress instanceof InetSocketAddress)) {
            return socketAddress.toString();
        }

        final InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
        final String hostString = inetSocketAddress.getHostString();
        return hostString == null ? UNKNOWN_IP : hostString;
    }

    private void logCallEnded(String serviceRpcName, String clientIp, String userAgent, Status status, Stopwatch stopwatch, Metadata headers) {
        MDC.put(GRPC_RPC_NAME_KEY, serviceRpcName);
        MDC.put(GRPC_CLIENT_IP_KEY, clientIp);
        MDC.put(GRPC_USER_AGENT_KEY, userAgent);
        String statusString = status.getCode().name();
        MDC.put(GRPC_STATUS_KEY, statusString);
        String elapsedString = Long.toString(stopwatch.elapsed(TimeUnit.MILLISECONDS));
        MDC.put(GRPC_ELAPSED_MS_KEY, elapsedString);
        if (LOG.isTraceEnabled()) {
            String headerString = headers.toString();
            MDC.put(GRPC_HEADERS_KEY, headerString);
            LOG.trace("[{}] [{}] [{}] [{}] [{} ms] [{}]", serviceRpcName, clientIp, userAgent, statusString, elapsedString, headerString);
        } else {
            LOG.info("[{}] [{}] [{}] [{}] [{} ms]", serviceRpcName, clientIp, userAgent, statusString, elapsedString);
        }
    }
}

and another example

public final class SpectatorMetricsServerInterceptor implements ServerInterceptor {
    private static final String METRIC_NAME = "grpc.server.call";

    private final Function<MethodDescriptor<?, ?>, MethodMetrics> metricsResolver;

    public SpectatorMetricsServerInterceptor(ServerServiceDefinition serviceDefinition, Registry registry) {
        this(serviceDefinition.getServiceDescriptor(), registry);
    }

    public SpectatorMetricsServerInterceptor(ServiceDescriptor serviceDefinition, Registry registry) {
        metricsResolver = MethodMetrics.createMetricResolver(
                registry, 
                METRIC_NAME, 
                serviceDefinition);
    }

    /**
     * @deprecated 2016-07-18 GRPC generated code now provides proper access to the ServiceDescriptor.  
     *  Use {@link SpectatorMetricsClientInterceptor#SpectatorMetricsClientInterceptor(ServiceDescriptor, Registry)} instead
     */
    @Deprecated
    public SpectatorMetricsServerInterceptor(Class<?> serviceGrpcClass, Registry registry) {
        try {
            this.metricsResolver = MethodMetrics.createMetricResolver(
                    registry, 
                    METRIC_NAME, 
                    (ServiceDescriptor) serviceGrpcClass.getDeclaredMethod("getServiceDescriptor").invoke(null));
        } catch (Exception e) {
            throw new RuntimeException("Error getting service descriptor for " + serviceGrpcClass.getName(), e);
        }
    }

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> call,
                                                                 final Metadata headers,
                                                                 final ServerCallHandler<ReqT, RespT> next) {
        final MethodMetrics metrics = metricsResolver.apply(call.getMethodDescriptor());
        final MetricsContextImpl metricsContext = new MetricsContextImpl();
        final Context context = Context.current().withValue(MetricsContext.KEY, metricsContext);

        final Stopwatch sw = Stopwatch.createStarted();
        final AtomicBoolean done = new AtomicBoolean();

        metrics.startCall();

        final ServerCall<ReqT, RespT> nextCall = new SimpleForwardingServerCall<ReqT, RespT>(call) {
            @Override
            public void close(Status status, Metadata metadata) {
                if (done.compareAndSet(false, true)) {
                    metrics.finishCall(PercentileTimer::get, status, sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS, metricsContext.getTags());
                }
                super.close(status, metadata);
            }
        };

        return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(Contexts.interceptCall(context, nextCall, headers, next)) {
            @Override
            public void onCancel() {
                // We need to handle onCancel() separately as it does not result in close() being called
                // The done flag is here just in case so we don't double log if the behavior ever changed
                if (done.compareAndSet(false, true)) {
                    metrics.finishCall(PercentileTimer::get, Status.CANCELLED, sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS, metricsContext.getTags());
                }
                super.onCancel();
            }

            @Override
            public void onHalfClose() {
                try {
                    super.onHalfClose();
                } catch (Exception e) {
                    if (done.compareAndSet(false, true)) {
                        metrics.finishCall(PercentileTimer::get, Status.UNKNOWN, sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS, metricsContext.getTags());
                    }
                    throw e;
                }
            }

            @Override
            public void onMessage(ReqT message) {
                try {
                    super.onMessage(message);
                } catch (Exception e) {
                    if (done.compareAndSet(false, true)) {
                        metrics.finishCall(PercentileTimer::get, Status.UNKNOWN, sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS, metricsContext.getTags());
                    }
                    throw e;
                }
            }
        };
    }
}

Maybe what I'm asking for here is a well defined and single location that a "done" call will go through an interceptor. As it sits right now, there are a lot of possible exit points.

ejona86 commented 6 years ago

So the interceptor should be able to listen to just onCancel and onCompleted. We do our best to guarantee one of those two methods will be called, and will be the last thing called. (I say "do our best" because interceptors can do anything, and if something throws in those methods there's not much we can do.) In this case though onCompleted is probably being called unexpectedly. While it's probably a good idea to handle onCompleted anyway (because another interceptor could maybe cause the same confusion), the fact onCompleted is being called in this case should be fixed as part of #3746.