eclipse-vertx / vertx-grpc

Development of the gRPC component for Eclipse Vert.x
Eclipse Public License 2.0
42 stars 23 forks source link

`GrpcServiceBridgeImpl` should set exception handling callback #101

Open hu-chia opened 3 months ago

hu-chia commented 3 months ago

Questions

When I using GrpcServiceBridgeImpl, some of the behavior of vertx-grpc is not as expected when the network is abnormal.

Expect Behavior

https://github.com/eclipse-vertx/vertx-grpc/blob/fca746c4f3f6bc4cf4e372fe8dd08ad3c9a2dc4a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServiceBridgeImpl.java#L148-L157

if the network between client and server was interrupted, listener.onCancel() should be invoked.

Actual Behavior

nothing happened.

Version

4.5.7


The errorHandler is an method from GrpcReadStream, it was invoked only when the httpStream has a StreamResetException, but when client was killed, an IOException occurred.

https://github.com/eclipse-vertx/vertx-grpc/blob/fca746c4f3f6bc4cf4e372fe8dd08ad3c9a2dc4a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcReadStreamBase.java#L71-L81

java.io.IOException: Broken pipe
    at java.base/sun.nio.ch.SocketDispatcher.writev0(Native Method)
    at java.base/sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:66)
    at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:227)
    at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:158)
    at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:574)
    at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:430)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:359)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:935)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:921)
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:907)
    at io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:197)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:941)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:921)
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:907)
    at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:967)
    at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:254)
    at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.checkFlush(VertxHttp2ConnectionHandler.java:247)
    at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.writeData(VertxHttp2ConnectionHandler.java:242)
    at io.vertx.core.http.impl.VertxHttp2Stream.doWriteData(VertxHttp2Stream.java:244)
    at io.vertx.core.http.impl.VertxHttp2Stream.writeData(VertxHttp2Stream.java:216)
    at io.vertx.core.http.impl.Http2ServerResponse.write(Http2ServerResponse.java:469)
    at io.vertx.core.http.impl.Http2ServerResponse.write(Http2ServerResponse.java:347)
    at io.vertx.core.http.impl.Http2ServerResponse.write(Http2ServerResponse.java:48)
    at io.vertx.grpc.server.impl.GrpcServerResponseImpl.writeMessage(GrpcServerResponseImpl.java:242)
    at io.vertx.grpc.server.impl.GrpcServerResponseImpl.writeMessage(GrpcServerResponseImpl.java:112)
    at io.vertx.grpc.server.impl.GrpcServerResponseImpl.write(GrpcServerResponseImpl.java:102)
    at io.vertx.grpc.server.impl.GrpcServerResponseImpl.write(GrpcServerResponseImpl.java:248)
    at io.vertx.core.streams.impl.PipeImpl.lambda$to$1(PipeImpl.java:81)
    at io.vertx.grpc.server.impl.GrpcServerRequestImpl.lambda$handler$0(GrpcServerRequestImpl.java:79)
    at io.vertx.grpc.common.impl.GrpcReadStreamBase.handleMessage(GrpcReadStreamBase.java:207)
    at io.vertx.grpc.common.impl.GrpcReadStreamBase.lambda$init$3(GrpcReadStreamBase.java:89)
    at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:255)
    at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:134)
    at io.vertx.grpc.common.impl.GrpcReadStreamBase.handle(GrpcReadStreamBase.java:125)
    at io.vertx.grpc.common.impl.GrpcReadStreamBase.handle(GrpcReadStreamBase.java:39)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:279)
    at io.vertx.core.http.impl.HttpEventHandler.handleChunk(HttpEventHandler.java:51)
    at io.vertx.core.http.impl.Http2ServerRequest.handleData(Http2ServerRequest.java:148)
    at io.vertx.core.http.impl.Http2ServerStream.handleData(Http2ServerStream.java:206)
    at io.vertx.core.http.impl.VertxHttp2Stream.lambda$new$1(VertxHttp2Stream.java:75)
    at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:255)
    at io.vertx.core.streams.impl.InboundBuffer.drain(InboundBuffer.java:242)
    at io.vertx.core.streams.impl.InboundBuffer.lambda$fetch$0(InboundBuffer.java:295)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:279)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:261)
    at io.vertx.core.impl.ContextInternal.lambda$runOnContext$0(ContextInternal.java:59)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:1583)
hu-chia commented 3 months ago

I believe this problem is the same as #28 , and #28 provides a workaround.

hu-chia commented 3 months ago

If you use vert.x grpc server, the keep-alive workaround is unavailable, depending on #74

vietj commented 3 months ago

can someone provide a reproducer for this ?

hu-chia commented 3 months ago

@vietj here you are.

step 1: define a bidistream grpc api

// filename: echo.proto
syntax = "proto3";

service EchoGrpc {
  rpc Bidirectional(stream Str) returns (stream Str) {}
}

message Str {
  string val = 1;
}

step 2: generate grpc codes

follow this

step 3: implement it and build a server

import Echo.Str;
import EchoGrpcGrpc.EchoGrpcImplBase;
import io.grpc.stub.StreamObserver;
import io.vertx.core.Vertx;
import io.vertx.grpc.server.GrpcServer;
import io.vertx.grpc.server.GrpcServiceBridge;

public class Main {

   public static void main(String... args) throws InterruptedException {
        var vertx = Vertx.vertx();

        var grpcServer = GrpcServer.server(vertx);
        var bridge = GrpcServiceBridge.bridge(new EchoGrpcImpl());
        bridge.bind(grpcServer);

        var httpServer = vertx.createHttpServer();
        httpServer.requestHandler(grpcServer);
        httpServer.listen(8080);

        Thread.currentThread().join();
    }

    static class EchoGrpcImpl extends EchoGrpcImplBase {

        @Override
        public StreamObserver<Str> bidirectional(StreamObserver<Str> responseObserver) {
            return new StreamObserver<Str>() {
                @Override public void onNext(Str value) {
                    System.out.println("on next " + value);
                    responseObserver.onNext(value);
                }

                @Override public void onError(Throwable t) {
                    System.out.println("on error");
                    t.printStackTrace(System.out);
                    responseObserver.onError(t);
                }

                @Override public void onCompleted() {
                    System.out.println("on completed");
                    responseObserver.onCompleted();
                }
            };
        }
    }
}

step 4: disconnect when calling the api

install grpcurl
brew install grpcurl
call, then press Ctrl + C

you might see:

$ while true; do echo '{"val":"hello,world"}'; sleep 1; done | grpcurl -proto echo.proto -plaintext -d @ localhost:8080 EchoGrpc/Bidirectional
{
  "val": "hello,world"
}
{
  "val": "hello,world"
}
{
  "val": "hello,world"
}
^C

and serverside:

on next val: "hello,world"

on next val: "hello,world"

on next val: "hello,world"

No completion or error message