nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
16k stars 1.41k forks source link

NATS consumers can't pull any messages from Jetstream #6099

Closed tong3jie closed 1 week ago

tong3jie commented 1 week ago

Observed behavior

NATS consumers can't pull any messages from Jetstream

Expected behavior

example: the name of stream is ABC the topic of message is ABC.ORDER the topic of sub Messages is ABC.ORDER

I can't get any message.

Server and client version

server: v2.10.22 client: v1.37.0

Host environment

No response

Steps to reproduce

type Nats struct {
    js     nats.JetStreamContext
    stream string
    mu     sync.Mutex

    readers map[string]*reader
}

type reader struct {
    topic      string
    js         nats.JetStreamContext
    speed      uint32
    bucket     *rate.Limiter
    callback   func(msg []byte, topic string, header ...[2]string)
    sub        *nats.Subscription
    closedChan chan struct{}
    pool       *ants.PoolWithFunc
    consumer   *nats.ConsumerInfo
    closed     *atomic.Bool
}

func InitNats(conf *config.NatsConf) *Nats {
    opts := []nats.Option{
        nats.Name("NATS"),
        nats.UserInfo(conf.User, conf.Pwd),
        nats.ReconnectWait(time.Second * 2),
        nats.ReconnectJitter(time.Second*2, time.Second*5),
        nats.MaxReconnects(-1),
        nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
            log.Printf("Disconnected from NATS: %v", err)
        }),
    }
    nc, err := nats.Connect(conf.Host, opts...)
    if err != nil {
        panic(fmt.Errorf("connect nats failed: %s ! host: %s", err, conf.Host))
    }
    js, err := nc.JetStream()
    if err != nil {
        panic(fmt.Errorf("connect nats failed: %s ! host: %s", err, conf.Host))
    }

    jsconfig := &nats.StreamConfig{
        Name:              conf.Topic,
        Subjects:          []string{conf.Topic + ".*"},
        Retention:         nats.WorkQueuePolicy,
        Storage:           nats.FileStorage,
        MaxAge:            time.Hour * 24 * 7,
        MaxMsgs:           100_000_000,
        MaxBytes:          1024 * 1024 * 1024 * int64(conf.MaxStorage),
        MaxMsgsPerSubject: 100_000_000,
        Replicas:          1,
    }
    stream, err := js.StreamInfo(conf.Topic)
    if err != nil && err != nats.ErrStreamNotFound {
        fmt.Println(err)
    }

    if stream == nil {
        _, err = js.AddStream(jsconfig)
        if err != nil {
            panic(err)
        }
    } else {
        js.UpdateStream(jsconfig)
    }

    return &Nats{
        js:      js,
        stream:  conf.Topic,
        mu:      sync.Mutex{},
        readers: make(map[string]*reader),
    }
}

func (ns *Nats) Write(topic string, msg []byte, headers ...[2]string) error {
    if ns.js == nil {
        panic("NatsConn is nil")
    }
    nheaders := nats.Header{}
    for _, header := range headers {
        nheaders.Add(header[0], header[1])
    }
    message := &nats.Msg{
        Subject: ns.stream + "." + topic,
        Data:    msg,
        Header:  nheaders,
    }
    _, err := ns.js.PublishMsg(message)
    return err
}

func (ns *Nats) newReader(topic, consumerGroup string, speed uint32, callback func(msg []byte, topic string, header ...[2]string)) (*reader, error) {

    consumer, err := ns.js.ConsumerInfo(ns.stream, consumerGroup)
    if err != nil && err != nats.ErrConsumerNotFound {
        return nil, err
    }

    consumerConfig := &nats.ConsumerConfig{
        Durable:         consumerGroup,
        AckPolicy:       nats.AckExplicitPolicy,
        DeliverPolicy:   nats.DeliverAllPolicy,
        AckWait:         time.Second * 10,
        MaxDeliver:      5,
        MaxAckPending:   100,
        FilterSubject:   ns.stream + "." + topic,
        MaxRequestBatch: 100,
        // RateLimit:         uint64(speed),
    }
    if consumer == nil {
        _, err = ns.js.AddConsumer(ns.stream, consumerConfig)
        if err != nil {
            return nil, err
        }
    } else {
        _, err := ns.js.UpdateConsumer(ns.stream, consumerConfig)
        if err != nil {
            return nil, err
        }
    }

    pool, err := ants.NewPoolWithFunc(1000, func(i interface{}) {
        if msg, ok := i.(*nats.Msg); ok {
            header := make([][2]string, 0, len(msg.Header))
            if msg.Header != nil {
                for k, v := range msg.Header {
                    header = append(header, [2]string{k, v[0]})
                }
            }
            callback(msg.Data, msg.Subject[len(ns.stream)+1:], header...)
        }
    })

    if err != nil {
        return nil, err
    }

    sub, err := ns.js.PullSubscribe(ns.stream+"."+topic, consumerGroup, nats.AckExplicit(), nats.Bind(ns.stream, consumerGroup))
    if err != nil {
        return nil, err
    }

    return &reader{
        topic:      topic,
        js:         ns.js,
        speed:      speed,
        bucket:     rate.NewLimiter(rate.Limit(speed), int(speed)),
        callback:   callback,
        sub:        sub,
        closedChan: make(chan struct{}),
        pool:       pool,
        consumer:   consumer,
        closed:     &atomic.Bool{},
    }, nil
}

func (ns *Nats) Read(topics []string, group string, speed uint32, handle func(msg []byte, topic string, header ...[2]string)) error {
    if ns.js == nil {
        return errors.New("nats conn is nil")
    }

    for _, topic := range topics {

        ns.mu.Lock()

        rd, ok := ns.readers[topic]
        if !ok || rd == nil {
            newrd, err := ns.newReader(topic, group, speed, handle)
            if err != nil {
                return err
            }
            ns.readers[topic] = newrd
            rd = newrd
        }

        ns.mu.Unlock()

        go func(rd *reader) {
            for {
                select {
                case <-rd.closedChan:
                    return
                default:
                    msgBatch, err := rd.sub.FetchBatch(10, nats.PullMaxBytes(1024*1024*1024))
                    if err != nil {
                        time.Sleep(time.Second)
                        continue
                    }

                    if msgBatch == nil {
                        continue
                    }
                    if len(msgBatch.Messages()) == 0 {
                        time.Sleep(time.Second)
                        continue
                    }
                    for msg := range msgBatch.Messages() {

                        rd.bucket.Wait(context.TODO())
                        rd.pool.Invoke(msg)
                        msg.Ack()

                    }
                }

            }
        }(rd)
    }
    return nil
}
Jarema commented 1 week ago

Please provide stream info and consumer info so we can be sure about the subject filters.

tong3jie commented 1 week ago

@Jarema thanks

the config of stream

 jsconfig := &nats.StreamConfig{
        Name:              "ABC",
        Subjects:          []string{ "ABC.*"},
        }

the config of consumer

consumerConfig := &nats.ConsumerConfig{
        Durable:        "test",
        AckPolicy:       nats.AckExplicitPolicy,
        DeliverPolicy:   nats.DeliverAllPolicy,
        AckWait:         time.Second * 10,
        MaxDeliver:      5,
        MaxAckPending:   100,
        FilterSubject:   "ABC.test",
        MaxRequestBatch: 100,
    }

the data of message

message := &nats.Msg{
        Subject: "ABC.test",
        Data:    msg,
        Header:  nheaders,
    }
    _, err := ns.js.PublishMsg(message)
    return err
}

thanks

astmix commented 1 week ago

@tong3jie it is worth checking if there is a "Slow consumer detected" in the logs. This may be due to the large values of maxBatch and maxPullBytes. It is worth trying to set these values lower. Judging by the internal size of the queue in nats (64 MB), maxPullBytes should be no more than 64 MB

Jarema commented 1 week ago

@tong3jie Please check the actual config of stream and consumer in the server (by checking the returned info or using the CLI nats stream info and nats consumer info). It is really easy to make a mistake. Server won't complain if consumer does not match subjects in stream.

Also, Checking len of Messages(), which returns a channel probably will always return 0 if called immediately after FetchBatch - messages might not yet start flowing in, as the FetchBatch returns immediately after sending the request to the server and do not wait for anything. You do not need to check the channel length either - channel is closed when fetch completes.

Seeing your logic, I think you should migrate to the new API and use Consume or Messages from its API - it will handle all the logic of sending you Fetches after previous finished etc.

https://github.com/nats-io/nats.go/blob/main/jetstream/README.md

tong3jie commented 1 week ago

I checked the consumer and fond that the number of ack_pending was so slow .

thanks ,I'll try new api .

Jarema commented 1 week ago

If I understood you correctly, this issue can be closed. If I'm mistaken, feel free to reopen it with a follow up.