nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.42k stars 685 forks source link

Proper timeout handling in `func (*pullConsumer) fetch()` #1482

Open yz89122 opened 9 months ago

yz89122 commented 9 months ago

Proposed change

IMHO, there're something we can improve in these lines of code.

https://github.com/nats-io/nats.go/blob/8712190da1d17ab0c4719bffa7c0174214c56e6c/jetstream/pull.go#L770-L772

  1. In batch request, it'll create a lot of time.Timer according to the doc. Before the timeout reached, the timer is not GC-able. For client with Higher throughput and with higher Expires, there will be more concurrent timer running, which is unnecessary.
  2. Currently, the timeout is controlled by Expires and a constant 1 * time.Second, which is the same field as the request pass to server. For client or server under stress, the timeout may easily reached. Especially for FetchNoWait().
    • I think we can separate the Expires into 2 options, one used as request that passes to the server, another serves as client receiving timeout.
    • Or we can uses context.Context.Done() for receiving timeout.
  3. Messages delivered after timeout reached AND before the server received UNSUB should be unacked explicitly (instread of waiting server ack timeout). Current implementation is just dropping the messages. For those using AckPolicy: AckAll, this behavior is dangerous, we could lose message permanently if we received later message and ack them. It's possible because we normally write a loop to batch request (Fetch()).

Use case

outer:
for {
    batch, err := stream.Fetch(batchSize)
    // ...
inner:
    var lastMessage jetstream.Msg
    for _, message := batch.Messages() {
        lastMessage = message
        // If the client or server is under stress, this inner loop may never run.
    }
    // ...
    if lastMessage == nil {
        // empty batch
        continue
    }
    err := lastMessage.Ack(ctx) // Ack entire batch of message with `AckPolicy: AckAll`
    // ...
}

Contribution

Yes

YarBor commented 9 months ago

For 1), in nats package, there is a timerPool that can reuse time.Timer, I think it is possible to make a same design , or reuse timerPool in nats package? https://github.com/nats-io/nats.go/blob/8712190da1d17ab0c4719bffa7c0174214c56e6c/timer.go#L26

var globalTimerPool timerPool
// timerPool provides GC-able pooling of *time.Timer's.
// can be used by multiple goroutines concurrently.
type timerPool struct {
    p sync.Pool
}
func (tp *timerPool) Get(d time.Duration) *time.Timer {
    if t, _ := tp.p.Get().(*time.Timer); t != nil {
        t.Reset(d)
        return t
    }

    return time.NewTimer(d)
}
func (tp *timerPool) Put(t *time.Timer) {
    if !t.Stop() {
        select {
        case <-t.C:
        default:
        }
    }

    tp.p.Put(t)
}