nats-io / nats-streaming-server

NATS Streaming System Server
https://nats.io
Apache License 2.0
2.51k stars 283 forks source link

crazy memory peak, and pod evicted #1047

Open mcuadros opened 4 years ago

mcuadros commented 4 years ago

In GKE, a cluster of 3 nodes, deployed with the operator, persisting to a network SSD disk.

On in the middle of a queue processing from a single machine of 10M of the message of a size of 1kb or less, and a workload in from 20machines publishing around 50k message of a size of 10kb. All the nodes went to a crazy peak of memory and k8s evicted it.

As a result, many of the messages were lost. Which is the origin of this memory? The messages are not regularly persisted to the disk?

The running image is nats-streaming:0.17.0 and the operator configuration is:

    store_limits: {
      max_channels: 100
      max_msgs: 0
      max_bytes: 100GB
    }

Screenshot_2020-05-15_12-35-09 Screenshot_2020-05-15_12-34-44 Screenshot_2020-05-15_12-33-38

kozlovic commented 4 years ago

@mcuadros Sorry about that. I don't have an explanation for it at this time. Let me ask you some questions. Could you elaborate on:

in the middle of a queue processing from a single machine of 10M of the message of a size of 1kb or less

Does that mean that some subscription was started receiving messages from a channel with 10M messages of size <=1kb? This totals to 10GB (well, if they are close to 1kb). But even if that was the case, the leader only should be the one to have to load messages from disk and then send them to the client subscription. Messages that are loaded only stay in the filestore implementation cache for 1sec. How are your subscription(s) for that channel configured? (MaxInflight, etc). Were they consuming fast?

a workload in from 20machines publishing around 50k message of a size of 10kb

So as the above processing was going on, 50k total (or is it per machine, so 1M messages?) were being processed by each node. If it is 50k total, it is no more than 500MB. If it was per machine, it would be a total of 10GB.

Messages are not kept in memory (at least no longer than 1sec for the filestore implementation). But I wonder if this could be that the Go GC is not kicking because it assumes that there is a lot of memory on that machine? That is, it may have set its target really high because of the amount of mem avail?

I assume that those reports are 1 per node and that only the streaming server is running there, not the applications, etc..

mcuadros commented 4 years ago

First done worry, this is a stagging environment and this was a test run.

The consumers are allocated in another namespace with dedicated nodes, the nats-streaming are in a services namespace with other services like NATS.

The consumers were consuming the records (at a slow rate, due to the nature of the job), and producing a new one in another channel with the result. Producing at a high rate and consumed by another kind of consumer.

The messages lost were in the channel where the consumers were dumping the result.

The reports belong to the pod itself, not the node. Checking the graphs of the node I figured out that the 3 nodes of the cluster were scheduled in the same node.

pod/nats-streaming-cluster-1          0/2     Evicted            0          31h    <none>       gke-primary-primary-node-pool-common-008136b3-xnzn   <none>           <none>
pod/nats-streaming-cluster-2          0/2     Evicted            0          31h    <none>       gke-primary-primary-node-pool-common-008136b3-xnzn   <none>           <none>
pod/nats-streaming-cluster-3          2/2     Running            0          25h    10.40.2.13   gke-primary-primary-node-pool-common-008136b3-xnzn  

Screenshot_2020-05-16_01-10-39

As you can see in the graph, the memory explosion was a chain effect on the cluster, one node went done and the effect went to the next one.

Also, something very relevant is that since the moment I started to publish events the memory was growing.

The MaxInflight it's set to zero.

kozlovic commented 4 years ago

First done worry, this is a stagging environment and this was a test run.

"Good". So if you can reproduce we could maybe get some metrics. If you are embedding the NAT Server (and not connecting to a remote NATS cluster), then you could add the -profile <port> command line (or prof_port in config outside of streaming{}) so you could run go tool pprof http://<streaming server host showing mem increas>:<port>/debug/pprof/heap to collect heap content. From there, you can do "top" or "web" to generate a .svg file.

The consumers were consuming the records (at a slow rate, due to the nature of the job), and producing a new one in another channel with the result. Producing at a high rate and consumed by another kind of consumer.

That is a bit confusing. If the consumption is at a slow rate (due to nature of the job) and this produces a new message in another channel (the result of processing), how can that be a "producing at a high rate"? Meaning that the producing rate here should be as slow as the consuming rate, no?

The messages lost were in the channel where the consumers were dumping the result.

Mind sharing the server config? By default, in clustering mode, there is no sync on each write because it is unexpected that all nodes crash at the same time. But you can always force sync on each write with the cluster{sync: true} or -cluster_sync option.

Also, something very relevant is that since the moment I started to publish events the memory was growing.

You are using filestore (or SQL) are you? There is a defect that allows the MEMORY store to be used in clustering mode although it is not supported. I wonder if that's what is happening here? Because with file/sql, the messages stored on disk and not kept in memory (except for some small cache). So Go should gc them. Again, could be that GC is not kicking fast enough because of the apparent amount of memory avail?

The MaxInflight it's set to zero.

This is not a valid value and server would reject the subscription. I am guessing that you meant that you did not specify it, which means that default is 1024.

mcuadros commented 4 years ago

I am running nats and stan operator, so hard to debug nothing. About the jobs, it consumes one job but may generate 600k new messages on the other channel.

The server config is empty besides of this:

 store_limits: {
      max_channels: 100
      max_msgs: 0
      max_bytes: 100GB
    }

This is the natsstreamingcluster resource: https://gist.github.com/mcuadros/d57a36c44e4191528c83a9dc3e876a17

I am using filestore writing to a remote SSD. (Now that I am reviewing the config, I don't see any way that it's file store config, maybe the problem is that stupid that I am using memory store? )

kozlovic commented 4 years ago

(Now that I am reviewing the config, I don't see any way that it's file store config, maybe the problem is that stupid that I am using memory store? )

This is the nats streaming operator, right? What you could do is check the logfile to see the startup banner that should say if it uses memory or not. Look for something like: Message store is.

mcuadros commented 4 years ago

[1] 2020/05/15 17:30:50.945303 [INF] STREAM: Message store is RAFT_FILE

kozlovic commented 4 years ago

So yes, it is file based.

If you want to force sync (which will dramatically reduce performance), you would need to add that to the configuration file because I think because not many options can be set with the operator. So config could be:

streaming {
  cluster {
    sync: true
  }
  store_limits: {
      max_channels: 100
      max_msgs: 0
      max_bytes: 100GB
  }
}

But to explain memory usage, not sure how to go about it without using the go tool pprof (and since you are using dedicated NATS, the profile "server" would not be started in the streaming server). I will think more about this and see if I can come up with something.

mcuadros commented 4 years ago

I was able to replicate it, what I did this time was:

  1. Fill the queue A with several million, and leave it there for several hours, everything was stable.
  2. Start the consumer B that also generates a message and... boom!

I am deploying and embed version. And I will try to debug the problem.

Also during a review of the config and saw that due to a misconfiguration all the nodes were using the same HD, that's why were scheduled in the same pod.

mcuadros commented 4 years ago

Screenshot_2020-05-17_20-33-16

This corresponds to the top of the peak, as you can see the memory reported by k8s and the memory in the heap is the half.

Showing nodes accounting for 4552.94MB, 97.62% of 4664.02MB total
Dropped 141 nodes (cum <= 23.32MB)
Showing top 10 nodes out of 18
      flat  flat%   sum%        cum   cum%
 2933.34MB 62.89% 62.89%  2933.34MB 62.89%  github.com/nats-io/nats-streaming-server/server.(*subState).makeSortedPendingMsgs
 1362.35MB 29.21% 92.10%  1362.35MB 29.21%  github.com/nats-io/nats-streaming-server/vendor/github.com/nats-io/stan.go/pb.(*MsgProto).Unmarshal
  159.61MB  3.42% 95.52%   162.12MB  3.48%  github.com/nats-io/nats-streaming-server/server.(*subStore).Remove
   63.51MB  1.36% 96.89%  1481.14MB 31.76%  github.com/nats-io/nats-streaming-server/stores.(*FileMsgStore).readAheadMsgs
   33.13MB  0.71% 97.60%    33.13MB  0.71%  github.com/nats-io/nats-streaming-server/stores.(*msgsCache).add
    0.50MB 0.011% 97.61%  1481.64MB 31.77%  github.com/nats-io/nats-streaming-server/server.(*StanServer).getMsgForRedelivery
    0.50MB 0.011% 97.62%    31.48MB  0.67%  github.com/nats-io/nats-streaming-server/server.(*StanServer).sendMsgToSub
         0     0% 97.62%   162.12MB  3.48%  github.com/nats-io/nats-streaming-server/server.(*StanServer).closeClient
         0     0% 97.62%  4444.45MB 95.29%  github.com/nats-io/nats-streaming-server/server.(*StanServer).performAckExpirationRedelivery
         0     0% 97.62%   161.54MB  3.46%  github.com/nats-io/nats-streaming-server/server.(*StanServer).processConnect

And this is the channelz at the same time (more or less) of the heap top.

Let me know if you need more info, I have the heap and the channels for every node every 30 secs for the whole execution.

kozlovic commented 4 years ago

Start the consumer B that also generates a message and... boom!

From channelz I see more than 1 consumer. Is that the queue group you are referring to? I see 53 members on the queue "queue_name": "analyzer.analyze_events:analyzer.analyze_events".

Both channelz output and the top result you provided show that most of the memory is used because of message redelivery. It seems that your queue members are very slow to consume messages but they have default maxInflight of 1024. My guess is that the server is consistently trying to redeliver messages to those queue subscriptions. I also noticed that some members have pending_count in excess of their maxInflight, which would be symptomatic of some queue members leaving the group with unacknowledged messages and therefore those messages being transferred to existing members.

Few things to try:

Reducing the MaxInflight to 1 would make the second issue listed above less of a problem, since it would be at most 1 message per member.

Note that current heap is at about 4GB, but it seem that about 12GB were allocated during the lifetime of this process. Some will be garbage collected I presume. I never had to tweak the Go garbage collection settings, so hopefully we don't have to get there if we solve/understand better why there are so many redeliveries.

mcuadros commented 4 years ago

I already tried reducing the MaxInflight to the number of gorutines taking care of the tasks. And the result was almost the same. Also, let you know that the tasks may take several minutes even hours.

About the Close() we have a problem with the memory in the analyzer/consumer parts and are killed by memory so sadly can't we close properly the connections. So this make the problem even worst.

kozlovic commented 4 years ago

I already tried reducing the MaxInflight to the number of gorutines taking care of the tasks. And the result was almost the same.

How can that be? The top shows that it is all about server trying to redeliver messages. And the slice size is based on the number of unacknowledged messages. So the more messages delivered but unacknowledged, the bigger the slice allocation will be.

Also, let you know that the tasks may take several minutes even hours.

I understand, but having more than MaxInflight==1 in this case is counter productive. The server will send messages that applications have no chance to process for a while. Furthermore, when a message is sent to a member while this member is already spending a long time processing an older message, that message is not available to other members that you may start to try to increase parallel processing. Also, all messages that are redelivered would possibly cause even more issues if your application does not detect those as duplicates and generates output messages for that already processed message.

About the Close() we have a problem with the memory in the analyzer/consumer parts and are killed by memory so sadly can't we close properly the connections. So this make the problem even worst.

Again, having the MaxInflight to 1 may help reduce the memory usage for each member because they deal with 1 message at a time. If they are killed due to memory constraint, this is a problem, especially if each member had thousands of unacknowledged messages. You may want to reduce the server-to-client heartbeats value so that the server can close these consumers faster (look for hb_interval, hb_timeout and hb_fail_count in the config table: https://docs.nats.io/nats-streaming-server/configuring/cfgfile).