storj / drpc

drpc is a lightweight, drop-in replacement for gRPC
MIT License
1.49k stars 50 forks source link

Context cancellation closes the connection #37

Open tomyl opened 2 years ago

tomyl commented 2 years ago

Hi,

I'm looking into replacing grpc with drpc in one of my projects and initial tests show some nice performance improvements!

Switching to drpc was fairly painless. The only snag was that context cancelling seemingly closes the connection. This behavior is different from grpc, see my tweaked drpc/grpc examples in https://github.com/tomyl/drpc/commit/acb08bd89225fe5db68ae4dd9fa82805d28b7364.

Is this behavior intentional? I can't see that the documentation mentions it.

Thanks, Tommy

zeebo commented 2 years ago

Ah, yeah. Since gRPC internally maintains a connection pool as well as does rpc multiplexing, I imagine it can do something other than closing the connection on a context cancel. DRPC will close the connection if the stream state is not in a finished state when the context is canceled.

Though, thinking out loud, it seems like it could wait to close the connection until the next rpc is started if the remote side has not acknowledged that the stream has been canceled. That grace period could help with connection reuse.

tomyl commented 2 years ago

Thanks for your reply! I'm trying to figure out how to deal with this in the best way in my code base. My code looks something like

ctx, cancel := context.WithTimeout(ctx, someTImeout)
defer cancel()

// much later, in a stack frame far far away

conn := mypool.get(ctx)
defer conn.Close() // put connection back into pool

crumbs, err := conn.EatCookie(ctx, ...)

So the first observation is that the idiomatic defer cancel() isn't safe, even if the rpc request returns successfully. Now I also worry that hitting the context deadline (without explicit cancel()) may also be unsafe, even when using a connection pool. I use a custom connection pool here but is it possible for the pool to know if the connection is safe to hand out again?

zeebo commented 2 years ago

Hmm. If the rpc is finished, cancel should not cause the connection to close. There should be a bunch of test cases around that behavior. Am I misunderstanding what you mean by your first observation?

Every conn has a Closed method that the pool can inspect to see if the connection is closed or not at that moment.

tomyl commented 2 years ago

Hmm. If the rpc is finished, cancel should not cause the connection to close.

OK! I might have been mistaken, it's possible that the rpc call wasn't finished in the case I'm troubleshooting. I can't reproduce it in a minimal example. Sorry for confusion.

Every conn has a Closed method that the pool can inspect to see if the connection is closed or not at that moment.

OK! My pool is checking Closed before putting back the connection but I get the impression it still returns connections that are closed. I should probably give storj.io/common/rpc/rpcpool a try to rule out bugs in my pool. The pool provided by go.bryk.io/pkg/net/drpc didn't behave the way I wanted (it returns an error on max capacity instead of blocking) so I whipped up my own simple pool.

zeebo commented 2 years ago

Cool. The rpcpool checks Closed on the way out because that's the latest possible time, so it has the smallest chance of being stale.

I think this is something many people have to deal with, and so the library might want to provide a basic "good enough" answer. And with generics now, it may be easier to handle more use cases (the key type can be generic, for example).

Would you mind listing some of your requirements for a pool? There are so many possible features here, and having a narrow but still useful set of requirements might help in coming up with an answer for everyone.

tomyl commented 2 years ago

Initial tests with rpcpool look promising! Will do some more testing tomorrow.

I think just having a pool example in a prominent place would go a long way. rpcpool is a bit hidden. The reason I picked go.bryk.io/pkg/net/drpc in the beginning was because it was linked from https://github.com/storj/drpc/blob/main/README.md (rpcpool isnt!) and the only rpcpool example I found was the unit test and I didn't immediately grok the API (why do I need to provide a key when getting a connection? when do I need to care about tls.ConnectionState?).

As for requirements, what I care mostly about is high throughput. My system is sending lots of blobs of data (a few MB at most) over a 10Gbps network. Right now my main concern is bad connection reuse because of context cancellation (seeing a lot of them in this system). Possibly it's fine in environments where TLS isn't a requirement, but with TLS I imagine I need some mechanism that ensures I have a minimum number of idle connections available in the pool.

zeebo commented 2 years ago

Haha, yeah the API of rpcpool being weird is why it's not linked. The API is tailored to some pretty specific and weird requirements that Storj has.

Thanks for the info!

zeebo commented 2 years ago

I pushed up a change to our gerrit code review instance that includes a connection pool based on the design in the storj rpcpool: https://review.dev.storj.io/c/storj/drpc/+/8599

I think it's easier to understand because it hides the storj weirdness by the fact that it uses generics for the cache key, and doesn't require any internal stuff, so you could copy/paste the files into your own repo if you want to try it out.

I'm going to be working on figuring out how to delay closing connections in the presence of cancellations unless necessary. The following is a mostly unstructured brain dump of ideas:

  1. We should only need to close the underlying transport to unblock potentially blocking methods on the stream. It may be possible to add a "cancel" type message that we try to async send..
  2. It interacts weirdly with the connection pool because the pool wants to know if a connection is "ready", and if we add a new state to the stream that's like "cancelling" during this async cancel, then it needs to say it's not closed so that the pool will try to reuse it, but we have to close the connection if it's not finished flushing the cancel by the time the next Invoke or NewStream comes along, but by then it's too late: the pool has already taken the connection out.
  3. Maybe that means we have a sentinel error that the pool can look at and know to retry? That maybe works.
  4. I have to double check, but I think the pool hands back connections like a stack so that the most recently put is the one taken, but that's the opposite of what you want with async cancels, because you want to give the most time possible before closing. In fact, you kinda want to know what the state is in the set {ready, canceling, closed}, and use ready connections whenever possible to give canceling connections as much time as possible.

Ok, that's all.

tomyl commented 1 year ago

A little update here. I gave drpcpool (looks nice!) a try and I'm still making the same observation as in https://github.com/storj/drpc/issues/37#issuecomment-1262260366. I dug a bit more this time. So my code looks something like

func foo(ctx context.Context) {
    conn := pool.Get(ctx, ...)
    defer conn.Close()

    stream, err := pb.NewFoo(conn).WriteStuff(ctx)
    // a bunch of stream.Send()
    return stream.CloseAndRecv()
}

The other side is reading from the stream and calling SendAndClose().

After foo() has returned, the context is cancelled, and foo() is called again with a new context and it happens to get the same connection from the pool. This time WriteStuff(ctx) immediately returns context.Canceled even though ctx has been not been cancelled yet.

I get the impression that the stream is closed in an asynchronous manner and if the stream isn't closed already by the time manageStream gets the cancellation for the old context, m.terminte(ctx.Err()) is called, which means attempts to create new streams for the same connection will always fails (acquireSemaphore returns m.sigs.term.Err())).

Does my stream usage above seem correct? I suppose I can re-run the body of foo if err is context.Canceled but ctx.Err() == nil as a workaround.

zeebo commented 1 year ago

That usage does seem correct. I'm writing some tests to see if I can reproduce.

zeebo commented 1 year ago

So here's a test I've been running and have been unable to reproduce the error:

func TestCancelRepeatedPooled(t *testing.T) {
    tctx := drpctest.NewTracker(t)
    defer tctx.Close()

    server := impl{
        Method2Fn: func(stream DRPCService_Method2Stream) error {
            var total int64
            for {
                in, err := stream.Recv()
                if err == io.EOF {
                    break
                } else if err != nil {
                    return err
                }
                total += in.In
            }
            return stream.SendAndClose(out(total))
        },
    }

    foo := func(ctx context.Context, p *drpcpool.Pool) {
        conn := p.Get(ctx, "foo", func(ctx context.Context, key interface{}) (drpcpool.Conn, error) {
            return createRawConnection(t, server, tctx), nil
        })
        defer func() { _ = conn.Close() }()

        stream, err := NewDRPCServiceClient(conn).Method2(ctx)
        assert.NoError(t, err)

        assert.NoError(t, stream.Send(in(1)))
        assert.NoError(t, stream.Send(in(2)))
        assert.NoError(t, stream.Send(in(3)))

        out, err := stream.CloseAndRecv()
        assert.NoError(t, err)
        assert.Equal(t, out.Out, 6)
    }

    p := drpcpool.New(drpcpool.Options{
        Capacity: 1,
    })

    for i := 0; i < 10000; i++ {
        ctx, cancel := context.WithCancel(tctx)
        foo(ctx, p)
        cancel()
    }
}

Does that look like it matches your usage?

tomyl commented 1 year ago

Thank you for looking into this again!

The unit test appears to do what I described but I realize now I omitted one detail that might matter. I see the context problem in the foo function above but there's another function running in parallel. foo is writing stuff while the other function is reading. Something like

func bar(ctx context.Context) {
    conn := pool.Get(ctx, ...)
    defer conn.Close()

    stream, err := pb.NewBar(conn).ReadStuff(ctx)
    // a bunch of stream.Recv()
}

Probably I can get the context error here too but when I'm able to reproduce this there happens to be a lot more writes.

One key difference between these two functions is that the context is frequently cancelled during reading, that is, before bar has returned.

Is that unit test committed somewhere? I could try to play around with it.

zeebo commented 1 year ago

The test isn't committed, but you can put it in internal/integration/common_test.go and run the tests from inside that folder.

tomyl commented 1 year ago

The test code above doesn't seem complete. I get

$ go test
# storj.io/drpc/internal/integration [storj.io/drpc/internal/integration.test]
./common_test.go:245:31: undefined: out
./common_test.go:250:82: undefined: drpcpool.Conn
./common_test.go:251:11: undefined: createRawConnection
FAIL    storj.io/drpc/internal/integration [build failed]

with latest main.

zeebo commented 1 year ago

Whoops! I forgot that I included other changes to some supporting files. I have pushed a commit with the test up.

Also, it now fails for me locally sometimes. I don't have any time to figure this out right now, but I'll look again first thing tomorrow morning.

egonelbre commented 1 year ago

TestCancelRepeatedPooled ended up failing in https://build.dev.storj.io/blue/organizations/jenkins/drpc-gerrit/detail/drpc-gerrit/452/pipeline/.

--- FAIL: TestCancelRepeatedPooled (0.98s)
    cancel_test.go:248: context canceled
FAIL
exit status 1
FAIL    storj.io/drpc/internal/integration  1.594s