grpc / grpc-go

The Go language implementation of gRPC. HTTP/2 based RPC
https://grpc.io
Apache License 2.0
20.94k stars 4.35k forks source link

transport: handler server transport continuously reads from streams without waiting for application #7261

Open wmxhw opened 4 months ago

wmxhw commented 4 months ago

What version of gRPC are you using?

grpc v1.51.0

What version of Go are you using (go version)?

go version go1.21.7 windows/amd64

What operating system (Linux, Windows, …) and version?

Windows 11

What did you do?

My client has a large amount of data to send, but the server's processing performance (such as writing to disk) may not be as good. Here is a test code, and it also exhibits the same issue.

test proto

service HelloService {
    rpc SayHelloStream(stream SayHelloRequest) returns (google.protobuf.Empty){}
}

message SayHelloRequest {
    string hello = 1;
}

server code

func (s helloServer) SayHelloStream(stream hello.HelloService_SayHelloStreamServer) error {
    for {
        r, err := stream,Recv()
        if err == io.EOF { 
            break 
        }
        if err != nil {
            return err
        }
        fmt.Println(r.Hello)
        time.Sleep(10*time.Second)  // Print once every 10 seconds.
    }
}

client code

func main() {
    clt := hello.NewHelloServiceClient(cc)

    body := bytes.NewBuffer(nil)  // Simulate a large amount of data.
    for i:=0; i< 1e6; i++ {
        body.WriteString("n")
    }
    for {
        err = stream.Send(&hello.SayHelloRequest{
            Hello: body.String(),
        })
        if err != nil {
            panic(err)
        }
        time.Sleep(10*time.Millisecond)  // Send once every 10 milliseconds.
    }
}

What did you expect to see?

Even if the client sends a large amount of traffic, the server should not experience memory leaks. gRPC should have flow control and will not have memory leaks.

What did you see instead?

There is a memory leak occurring in the HandleStreams function at internal/transport/handler_server.go.

func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*Stream)) {
    ....
    go func() {
        defer close(readerDone)

        // TODO: minimize garbage, optimize recvBuffer code/ownership
        const readSize = 8196
        for buf := make([]byte, readSize); ; {
            n, err := req.Body.Read(buf)
            if n > 0 {
                s.buf.put(recvMsg{buffer: bytes.NewBuffer(buf[:n:n])})
                buf = buf[n:]
            }
            if err != nil {
                s.buf.put(recvMsg{err: mapRecvMsgError(err)})
                return
            }
            if len(buf) == 0 {
                buf = make([]byte, readSize)     // Does this piece of code have flow control?
            }
        }
    }()
    ....
}
wmxhw commented 4 months ago

leak

arjan-bal commented 4 months ago

Hi wmxhw@, can you try the same repro and share the profiler's output in a more recent grcp-go version (v1.61+)?

The server transport code has changed significantly since 1.51 was released in 2022.

wmxhw commented 4 months ago

@arjan-bal Thank you for your response. Even after switching to gRPC v1.64.0, I am still encountering the same problem.

arjan-bal commented 4 months ago

Can you share the graph generated by the profiler for 1.64?

wmxhw commented 4 months ago

This is the test code and a screenshot of its pprof results

grpc_client_stream_example.zip

image image image

wmxhw commented 4 months ago

@arjan-bal

arjan-bal commented 4 months ago

wmxhw@, the server code sleeps for 10 seconds after reading one message while the client code sleeps for 10 millis after sending each message. This means that the client can send 1000 message before the server reads 1. Since each message is of 1 Mb, this means that gRPC will need to buffer 1 Gb of messages till the application calls stream.Recv(). The buffering would explain why the server's memory consumption is increasing by more than 100 Mb each second. After removing the sleep in the server side, the memory usage stayed around 50 Mb.

If your aim is to only restrict the logging interval, you can do something like this:

{
        logInterval := 10 * time.Second
    lastLogTime := time.Time{}
    for {
        recv, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            panic(err)
        }
        now := time.Now()
        if now.Sub(lastLogTime) >= logInterval {
            lastLogTime = now
            fmt.Printf("Len: %d\n", len(recv.GetHello()))
        }
    }
    return nil
}
wmxhw commented 4 months ago

Perhaps I didn't make it clear earlier. The example code above was just for testing. The actual scenario is that the client sends a large amount of data through the gRPC client-streaming mode, and the server needs to write the received data to the hard disk, Due to the slow writing speed of the hard drive,the memory leak mentioned earlier occurred.  Can I understand that the stream mode of GRPC does not limit the upper limit of cached packets, and can only control traffic through negotiation between the client and server at the application layer?  For example, using bidirectional streaming mode, the server can send an acknowledgment to the client when it completes writing to the hard disk, and the client can then send more data.  Do you have any suggestions for this? @arjan-bal 

arjan-bal commented 4 months ago

Can you try letting grpc manage the listener?

l, err := net.Listen("tcp", ":12345")
if err != nil {
    panic(err)
}
defer l.Close()

server := grpc.NewServer()

hello.RegisterHelloServiceServer(server, helloServer{})
if err := server.Serve(l); err != nil {
        // handle the error
}

This allows grpc to implement flow control. I tried this and the server memory remained low even when the server was slow to read (due to the 10 sec delay).

If you want an http server for other application like pprof, you can use a different port.

wmxhw commented 4 months ago

Thanks, I now know that ServerTransport has two implementations.

Why is serverHandlerTransport asynchronous and what are the restrictions?

arjan-bal commented 4 months ago

I discussed this issue with the team. The serverHandlerTransport is an experimental adaptor to make gRPC work with the std library's http server. It is missing many features including flow control. Users are encouraged to use the http2server transport instead. This is the transport used when calling server.Serve(listener).

Having said that, both the transports buffer http2 frames to ensure that the server doesn't waste time waiting for more packets to arrive. In case of serverHandlerTransport, there is no bound on the size of the buffer, it keeps reading frames until the connection is closed. This is a bug.

As serverHandlerTransport is an experimental transport and isn't a high priority feature, we will not be fixing it immediately, but PRs are welcome from contributors.

wmxhw commented 4 months ago

Thank you very much for your answer. Now I have changed the listener to grpc.

arvindbr8 commented 3 months ago

We can pick this up during a fixit in the coming Qs. We should definitely be able to come up with a plan to resolve this.