containerd / ttrpc

GRPC for low-memory environments
Apache License 2.0
555 stars 80 forks source link

ttrpc client send hang, even ctx is timeout. #174

Open ningmingxiao opened 1 month ago

ningmingxiao commented 1 month ago

ttrpc send hang, even ctx is timeout

goroutine 6132118 [IO wait, 1818 minutes]:

internal/poll.runtime_pollWait(0x7f9e8013bbd0, 0x77)

        /usr/local/go/src/runtime/netpoll.go:306 +0x89

internal/poll.(*pollDesc).wait(0xc000f2b900?, 0xc0021e6000?, 0x0)

        /usr/local/go/src/internal/poll/fd_poll_runtime.go:84 +0x32

internal/poll.(*pollDesc).waitWrite(...)

        /usr/local/go/src/internal/poll/fd_poll_runtime.go:93

internal/poll.(*FD).Write(0xc000f2b900, {0xc0021e6000, 0x74, 0x1000})

        /usr/local/go/src/internal/poll/fd_unix.go:391 +0x2f6

net.(*netFD).Write(0xc000f2b900, {0xc0021e6000?, 0xc000abf260?, 0x41c4f1?})

        /usr/local/go/src/net/fd_posix.go:96 +0x29

net.(*conn).Write(0xc00128a238, {0xc0021e6000?, 0x5222f5?, 0x7f9e8008bbd0?})

        /usr/local/go/src/net/net.go:195 +0x45

bufio.(*Writer).Flush(0xc0021de800)

        /usr/local/go/src/bufio/bufio.go:628 +0x62

github.com/containerd/ttrpc.(*channel).send(0xc0021de840, 0x21a7680?, 0xc0?, 0x0?, {0xc002fa2070, 0x6a, 0x6a})

        github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/channel.go:161 +0x98

github.com/containerd/ttrpc.(*Client).createStream(0xc001b59290, 0x0?, {0xc002fa2070, 0x6a, 0x6a})

        github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:370 +0x245

github.com/containerd/ttrpc.(*Client).dispatch(0xc001b59290, {0x1cba358, 0xc005081d70}, 0xc000462800?, 0xc0019feda0?)

        github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:480 +0x95

github.com/containerd/ttrpc.defaultClientInterceptor({0x1cba358?, 0xc005081d70?}, 0x2b5d3c0?, 0xc0056ecc40?, 0xc000abf5e0?, 0x45a7b1?)

        github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/interceptor.go:56 +0x26

github.com/containerd/ttrpc.(*Client).Call(0xc001b59290, {0x1cba358, 0xc005081d70}, {0x1a4b86d, 0x17}, {0x1a2ad25, 0x5}, {0x18ddd60?, 0xc003a836d0?}, {0x1993ba0, ...})

        github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:134 +0x353

github.com/containerd/containerd/api/runtime/task/v2.(*taskClient).State(0xc00128b950, {0x1cba358, 0xc005081d70}, 0x2b5d3c0?)

        github.com/containerd/containerd/api/runtime/task/v2/shim_ttrpc.pb.go:169 +0x98

github.com/containerd/containerd/runtime/v2.(*shimTask).State(0xc000f2f960, {0x1cba358, 0xc005081d70})

        github.com/containerd/containerd/runtime/v2/shim.go:698 +0xd4

github.com/containerd/containerd/services/tasks.getProcessState({0x1cba390?, 0xc005081aa0?}, {0x7f9e800dcf10, 0xc000f2f960})

        github.com/containerd/containerd/services/tasks/local.go:340 +0xef

github.com/containerd/containerd/services/tasks.(*local).Get(0xc003a83540?, {0x1cba390, 0xc005081aa0}, 0xc003a83540, {0x194a040?, 0x19be760?, 0x40f000?})

        github.com/containerd/containerd/services/tasks/local.go:386 +0xe5

github.com/containerd/containerd/services/tasks.(*service).Get(0x18e0160?, {0x1cba390?, 0xc005081aa0?}, 0x0?)

        github.com/containerd/containerd/services/tasks/service.go:86 +0x33

github.com/containerd/containerd/api/services/tasks/v1._Tasks_Get_Handler({0x19be760?, 0xc000014fc0}, {0x1cba390, 0xc005081aa0}, 0xc0036e90a0, 0x0)

        github.com/containerd/containerd/api/services/tasks/v1/tasks_grpc.pb.go:384 +0x170

google.golang.org/grpc.(*Server).processUnaryRPC(0xc0003e4000, {0x1cc2a40, 0xc003e6e000}, 0xc002ae5e60, 0xc00047de90, 0x2abe980, 0x0)

        github.com/containerd/containerd/vendor/google.golang.org/grpc/server.go:1336 +0xd33

google.golang.org/grpc.(*Server).handleStream(0xc0003e4000, {0x1cc2a40, 0xc003e6e000}, 0xc002ae5e60, 0x0)

        github.com/containerd/containerd/vendor/google.golang.org/grpc/server.go:1704 +0xa36

google.golang.org/grpc.(*Server).serveStreams.func1.2()

        github.com/containerd/containerd/vendor/google.golang.org/grpc/server.go:965 +0x98

created by google.golang.org/grpc.(*Server).serveStreams.func1

        github.com/containerd/containerd/vendor/google.golang.org/grpc/server.go:963 +0x28a
kevpar commented 4 weeks ago

What version of ttrpc are you using on the client? From your line numbers, it appears you may be using an old version.

We fixed a deadlock in the client in v1.2.4, it could be what you are hitting: https://github.com/containerd/ttrpc/commit/1b4f6f8edba5f374f1afbf10d7666136286806e7

ningmingxiao commented 2 weeks ago

some stack

goroutine 4796 [IO wait]:
internal/poll.runtime_pollWait(0x7f9e8013bea0, 0x72)
        /usr/local/go/src/runtime/netpoll.go:306 +0x89
internal/poll.(*pollDesc).wait(0xc000138e80?, 0xc00075f000?, 0x0)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:84 +0x32
internal/poll.(*pollDesc).waitRead(...)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0xc000138e80, {0xc00075f000, 0x1000, 0x1000})
        /usr/local/go/src/internal/poll/fd_unix.go:167 +0x299
net.(*netFD).Read(0xc000138e80, {0xc00075f000?, 0xc00336dcf8?, 0x2?})
        /usr/local/go/src/net/fd_posix.go:55 +0x29
net.(*conn).Read(0xc000126858, {0xc00075f000?, 0x418313?, 0xc000905918?})
        /usr/local/go/src/net/net.go:183 +0x45
bufio.(*Reader).Read(0xc000d60060, {0xc000731220, 0xa, 0xc000bb14a0?})
        /usr/local/go/src/bufio/bufio.go:237 +0x1bb
io.ReadAtLeast({0x1ca1b00, 0xc000d60060}, {0xc000731220, 0xa, 0xa}, 0xa)
        /usr/local/go/src/io/io.go:332 +0x9a
io.ReadFull(...)
        /usr/local/go/src/io/io.go:351
github.com/containerd/ttrpc.readMessageHeader({0xc000731220?, 0xa?, 0x30?}, {0x1ca1b00?, 0xc000d60060?})
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/channel.go:73 +0x55
github.com/containerd/ttrpc.(*channel).recv(0xc000731200)
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/channel.go:121 +0x4d
github.com/containerd/ttrpc.(*Client).receiveLoop(0xc0008047e0)
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:320 +0x85
github.com/containerd/ttrpc.(*Client).run(0xc0008047e0)
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:301 +0x1e
created by github.com/containerd/ttrpc.NewClient
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:95 +0x1f6

goroutine 6132118 [IO wait, 1818 minutes]:
internal/poll.runtime_pollWait(0x7f9e8013bbd0, 0x77)
        /usr/local/go/src/runtime/netpoll.go:306 +0x89
internal/poll.(*pollDesc).wait(0xc000f2b900?, 0xc0021e6000?, 0x0)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:84 +0x32
internal/poll.(*pollDesc).waitWrite(...)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:93
internal/poll.(*FD).Write(0xc000f2b900, {0xc0021e6000, 0x74, 0x1000})
        /usr/local/go/src/internal/poll/fd_unix.go:391 +0x2f6
net.(*netFD).Write(0xc000f2b900, {0xc0021e6000?, 0xc000abf260?, 0x41c4f1?})
        /usr/local/go/src/net/fd_posix.go:96 +0x29
net.(*conn).Write(0xc00128a238, {0xc0021e6000?, 0x5222f5?, 0x7f9e8008bbd0?})
        /usr/local/go/src/net/net.go:195 +0x45
bufio.(*Writer).Flush(0xc0021de800)
        /usr/local/go/src/bufio/bufio.go:628 +0x62
github.com/containerd/ttrpc.(*channel).send(0xc0021de840, 0x21a7680?, 0xc0?, 0x0?, {0xc002fa2070, 0x6a, 0x6a})
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/channel.go:161 +0x98
github.com/containerd/ttrpc.(*Client).createStream(0xc001b59290, 0x0?, {0xc002fa2070, 0x6a, 0x6a})
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:370 +0x245
github.com/containerd/ttrpc.(*Client).dispatch(0xc001b59290, {0x1cba358, 0xc005081d70}, 0xc000462800?, 0xc0019feda0?)
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:480 +0x95
github.com/containerd/ttrpc.defaultClientInterceptor({0x1cba358?, 0xc005081d70?}, 0x2b5d3c0?, 0xc0056ecc40?, 0xc000abf5e0?, 0x45a7b1?)
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/interceptor.go:56 +0x26
github.com/containerd/ttrpc.(*Client).Call(0xc001b59290, {0x1cba358, 0xc005081d70}, {0x1a4b86d, 0x17}, {0x1a2ad25, 0x5}, {0x18ddd60?, 0xc003a836d0?}, {0x1993ba0, ...})
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:134 +0x353
github.com/containerd/containerd/api/runtime/task/v2.(*taskClient).State(0xc00128b950, {0x1cba358, 0xc005081d70}, 0x2b5d3c0?)
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/api/runtime/task/v2/shim_ttrpc.pb.go:169 +0x98

sync.runtime_SemacquireMutex(0xc00124b800?, 0x4b?, 0x0?)
        /usr/local/go/src/runtime/sema.go:77 +0x26
sync.(*Mutex).lockSlow(0xc001b592cc)
        /usr/local/go/src/sync/mutex.go:171 +0x165
sync.(*Mutex).Lock(...)
        /usr/local/go/src/sync/mutex.go:90
github.com/containerd/ttrpc.(*Client).createStream(0xc001b59290, 0x0?, {0xc000f390a0, 0x6a, 0x6a})
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:366 +0x1ce
github.com/containerd/ttrpc.(*Client).dispatch(0xc001b59290, {0x1cba358, 0xc0035bfdd0}, 0xc00124b800?, 0xc003836d30?)
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:480 +0x95
github.com/containerd/ttrpc.defaultClientInterceptor({0x1cba358?, 0xc0035bfdd0?}, 0x2b5d3c0?, 0xc00190ff80?, 0xc0053675e0?, 0x45a7b1?)
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/interceptor.go:56 +0x26
github.com/containerd/ttrpc.(*Client).Call(0xc001b59290, {0x1cba358, 0xc0035bfdd0}, {0x1a4b86d, 0x17}, {0x1a2ad25, 0x5}, {0x18ddd60?, 0xc003e172c0?}, {0x1993ba0, ...})
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:134 +0x353

It will hit https://github.com/containerd/ttrpc/commit/1b4f6f8edba5f374f1afbf10d7666136286806e7 I don't see it was blocked at "s := c.getStream(sid)" (It will block at c.getStream(sid))

func (c *Client) receiveLoop() error {
    for {
        select {
        case <-c.ctx.Done():
            return ErrClosed
        default:
            var (
                msg = &streamMessage{}
                err error
            )

            msg.header, msg.payload, err = c.channel.recv()
            if err != nil {
                _, ok := status.FromError(err)
                if !ok {
                    // treat all errors that are not an rpc status as terminal.
                    // all others poison the connection.
                    return filterCloseErr(err)
                }
            }
            sid := streamID(msg.header.StreamID)
                        s := c.getStream(sid)

@kevpar i use https://github.com/containerd/ttrpc-rust as ttrpc server.

ningmingxiao commented 2 weeks ago

I review this pr https://github.com/containerd/ttrpc/commit/1b4f6f8edba5f374f1afbf10d7666136286806e7

func (c *Client) createStream(flags uint8, b []byte) (*stream, error) {
    c.streamLock.Lock()

    // Check if closed since lock acquired to prevent adding
    // anything after cleanup completes
    select {
    case <-c.ctx.Done():
        c.streamLock.Unlock()
        return nil, ErrClosed
    default:
    }

    // Stream ID should be allocated at same time
    s := newStream(c.nextStreamID, c)
    c.streams[s.id] = s
    c.nextStreamID = c.nextStreamID + 2

    c.sendLock.Lock()
    defer c.sendLock.Unlock()
    c.streamLock.Unlock()

    if err := c.channel.send(uint32(s.id), messageTypeRequest, flags, b); err != nil {
        return s, filterCloseErr(err)
    }

    return s, nil
}

If c.channel.send return there will not be dead lock.

If reader don't have chance to read data(hang at c.getStream ) ,will c.channel.send hang ?

func (c *Client) receiveLoop() error {
    for {
        select {
        case <-c.ctx.Done():
            return ErrClosed
        default:
            var (
                msg = &streamMessage{}
                err error
            )

            msg.header, msg.payload, err = c.channel.recv()
            if err != nil {
                _, ok := status.FromError(err)
                if !ok {
                    // treat all errors that are not an rpc status as terminal.
                    // all others poison the connection.
                    return filterCloseErr(err)
                }
            }
            sid := streamID(msg.header.StreamID)
            s := c.getStream(sid)
            if s == nil {
                logrus.WithField("stream", sid).Errorf("ttrpc: received message on inactive stream")
                continue
            }

            if err != nil {
                s.closeWithError(err)
            } else {
                if err := s.receive(c.ctx, msg); err != nil {
                    logrus.WithError(err).WithField("stream", sid).Errorf("ttrpc: failed to handle message")
                }
            }
        }
    }
}

can you show me your testcase ? @kevpar thank you.