Open nuttert opened 5 months ago
what is the stream info of the stream?
@wallyqs 👋
Information for Stream addresses created 2024-06-14 06:41:44
Subjects: addresses, snapshot.addresses, meta.addresses
Replicas: 1
Storage: Memory
Options:
Retention: Limits
Acknowledgments: true
Discard Policy: Old
Duplicate Window: 1m0s
Allows Msg Delete: true
Allows Purge: true
Allows Rollups: false
Limits:
Maximum Messages: unlimited
Maximum Per Subject: unlimited
Maximum Bytes: unlimited
Maximum Age: 1m0s
Maximum Message Size: unlimited
Maximum Consumers: unlimited
State:
Messages: 12
Bytes: 46 KiB
First Sequence: 25,887 @ 2024-06-14 09:19:13 UTC
Last Sequence: 25,898 @ 2024-06-14 09:20:03 UTC
Active Consumers: 12,859
Number of Subjects: 2
Can you please provide some information about the hardware you are running on?
Are you creating/deleting consumers at a high rate?
By the way, I do not know why it shows Active Consumers: 12,859
, I stopped the consumers and I see it in the connection info:
Short info about configuration for the nats server:
Mem: 64 GB
8 cores
Are you creating/deleting consumers at a high rate?
Yes, when I run my system, there are thousands of consumers trying to connect to the server at the same time(the server holds 1 connection to nats, and creates subscriptions for the clients)
How are you creating the consumers?
Do your clients re-bind to existing named consumers to pick up where they left off, or are they creating new consumers each time they connect? (If so, you might want to set an inactive threshold to clean up old consumers when they go idle.)
natsCfg := &nats.StreamConfig{
Name: name,
Subjects: config.Subjects,
Storage: nats.MemoryStorage,
ConsumerLimits: nats.StreamConsumerLimits{
InactiveThreshold: time.Hour * 24,
MaxAckPending: 100000000, // Max messages in-flight before consumer is considered slow
},
}
_, err := c.jetStream.AddStream(natsCfg)
I tried to use InactiveThreshold to fix the initial problem with timeout error and context deadline exceeded
when I used file storage option. Probably it does not help and I can remove it.
However I explicitly close all subscriptions:
sub.WaitForCancellationAsync(func() {
log.Info().Msgf("Unsubscribed %s", channel)
jetSub.Unsubscribe()
})
Here is how I create a consumer:
jetSub, err = c.jetStream.Subscribe(channel, func(msg *nats.Msg) {
go c.HandleMessage(ctx, msg, sub)
}, opt...)
(I already wrote it above in the description)
Hey, I got the error in my golang app:
At the same time I see in the nats-server logs:
Configuration:
I have thousands of subscriptions at the same time and I don't care if the processing time is a few seconds, in this context it's normal, I just want a guarantee that messages will be delivered even after 30 seconds without errors.
I tried different settings, even something ubnormal like this:
or added this to the nats config:
But it did't help.
Here is how I create the stream:
(By the way, FileStorage absolutely fails on the setup and crashes after a few seconds under a production load with
context deadline exceeded
)And here is how I consume the stream:
What I excpect
I expect that there is a parameter for the deadline, so that after for example 10-40 seconds the queue will be processed and all clients will receive their messages.