nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.55k stars 696 forks source link

why consumer redelivered again when msg.DoubleAck with no error #1717

Open alifpay opened 1 month ago

alifpay commented 1 month ago

Observed behavior

streamBatch, err := s.js.NewStream(ctx, jetstream.StreamConfig{
        Name:      "PUSHBATCH",
        Subjects:  []string{"push.batch", "push.batch.retry"},
        MaxAge:    3 * 24 * time.Hour,
        Storage:   jetstream.FileStorage,
        Retention: jetstream.WorkQueuePolicy,
    })
    if err != nil {
        log.Fatalln("failed to create streamBatch: ", err)
    }

consBatch, err := s.js.NewConsumer(ctx, streamBatch, jetstream.ConsumerConfig{
        Name:          "pushbatch",
        Durable:       "pushbatch",
        FilterSubject: "push.batch",
        AckWait:       50 * time.Second,
        AckPolicy:     jetstream.AckExplicitPolicy,
    })
    if err != nil {
        err = fmt.Errorf("failed to create consumer, %w", err)
        return err
    }
    _, err = consBatch.Consume(s.processBatch,
        jetstream.PullMaxMessages(1),
        jetstream.ConsumeErrHandler(func(consumeCtx jetstream.ConsumeContext, err error) {
            fmt.Println(err)
        }))
    if err != nil {
        err = fmt.Errorf("failed to consume, %w", err)
        return err
    }
    _, err = consBatch.Consume(s.processBatch,
        jetstream.ConsumeErrHandler(func(consumeCtx jetstream.ConsumeContext, err error) {
            fmt.Println(err)
        }))
    if err != nil {
        err = fmt.Errorf("failed to consume, %w", err)
        return err
    }
    _, err = consBatch.Consume(s.processBatch,
        jetstream.ConsumeErrHandler(func(consumeCtx jetstream.ConsumeContext, err error) {
            fmt.Println(err)
        }))
    if err != nil {
        err = fmt.Errorf("failed to consume, %w", err)
        return err
    }
    _, err = consBatch.Consume(s.processBatch,
        jetstream.ConsumeErrHandler(func(consumeCtx jetstream.ConsumeContext, err error) {
            fmt.Println(err)
        }))
    if err != nil {
        err = fmt.Errorf("failed to consume, %w", err)
        return err
    }
    _, err = consBatch.Consume(s.processBatch,
        jetstream.ConsumeErrHandler(func(consumeCtx jetstream.ConsumeContext, err error) {
            fmt.Println(err)
        }))
    if err != nil {
        err = fmt.Errorf("failed to consume, %w", err)
        return err
    }

Expected behavior

I need only one time delivering

Server and client version

github.com/nats-io/nats.go v1.37.0

server:Version: 2.10.20

Host environment

in docker

Steps to reproduce

No response

alifpay commented 1 month ago

log 2024/09/22 14:12:26 batch message 641500 6 0 pushbatch 4501 1283 NumDelivered: 6 Sequence.Consumer: 4501 Sequence.Stream: 1283

total message: 2000

alifpay commented 1 month ago

check every time database if exists it will be expensive so how to get only one time

Jarema commented 1 month ago

Hey!

Your example does not show how you actually process the acks. Additionally - Consume will not stop after single batch is processed, but will under the hood continue polling with configures batch settings. This means that you've created few parallel processing callbacks. That is fine, if you wanted to simulate many parallel apps consuming messges from one consumer. Just pointing that out :).

Regarding the double ack: There is a know limitation: https://github.com/nats-io/nats-server/issues/4786 which means that ack send back after the ack_wait window has expired, server will accept it, while redelivery is already inflight.

alifpay commented 1 month ago

func (s *PushService) processBatch(msg jetstream.Msg) {
    var nt models.Notification
    err := pkg.JsonUnmarshal(msg.Data(), &nt)
    if err != nil {
        log.Println("failed to unmarshal message:", err, string(msg.Data()))
        return
    }
    md, err := msg.Metadata()
    log.Println("batch message", nt.BatchId, err, md.NumDelivered, md.NumPending, md.Consumer, md.Sequence.Consumer, md.Sequence.Stream)
    vals := make([][]any, 0)
    code, resp, err := sendBatchFCM(nt)
    log.Println(nt.BatchId, code, err)
    if code == 406 {
        errStr := ""
        if err != nil {
            errStr = err.Error()
        }
        for _, r := range nt.Receivers {
            vals = append(vals, []any{nt.BatchId, r.Id, r.Token, errStr, code})
        }
        err = pkg.BulkInsert(s.ctx, s.db, "pushes", []string{"batch_id", "user_id", "token", "error", "status"}, vals)
        if err != nil {
            log.Println("failed pkg.BulkInsert:", err)
        }
        err1 := msg.DoubleAck(s.ctx)
        if err1 != nil {
            log.Println("failed to ack message:", err1, string(msg.Data()))
        }
        return
    } else if code > 0 {
        var sms = models.Notification{BatchId: nt.BatchId, Title: nt.Title, Message: nt.Message, Receivers: make([]models.Receiver, 0)}
        //todo retry for timeout and failed for 5xx
        errStr := ""
        for k, rs := range resp.Responses {
            if rs.Success {
                vals = append(vals, []any{nt.BatchId, nt.Receivers[k].Id, nt.Receivers[k].Token, rs.MessageID, 200})
            } else {
                if rs.Error != nil {
                    errStr = fmt.Sprintf("%s %s", rs.Error, rs.MessageID)
                }
                sms.Receivers = append(sms.Receivers, nt.Receivers[k])
                vals = append(vals, []any{nt.BatchId, nt.Receivers[k].Id, nt.Receivers[k].Token, errStr, 417})
            }
        }
        err = pkg.BulkInsert(s.ctx, s.db, "pushes", []string{"batch_id", "user_id", "token", "error", "status"}, vals)
        if err != nil {
            log.Println("failed pkg.BulkInsert:", err)
        }
        if len(sms.Receivers) > 0 {
            bt, err := pkg.JsonMarshal(sms)
            if err != nil {
                log.Println("failed to marshal message:", err)
            } else {
                err = s.js.Publish(s.ctx, "sms.batch", bt)
                if err != nil {
                    log.Println("failed to publish message:", err)
                }
            }
        }
        err1 := msg.DoubleAck(s.ctx)
        if err1 != nil {
            log.Println("failed to ack message:", err1, string(msg.Data()))
        }
    }
}
alifpay commented 1 month ago

about ackwait is bigger than timeout of http ackwait 50 second http timeout 30 second

I logged response code all is is bigger 0

alifpay commented 1 month ago

Retention: jetstream.WorkQueuePolicy, must be delete after ACK

but it delivered more than 1