nsqio / go-nsq

The official Go package for NSQ
MIT License
2.59k stars 444 forks source link

consumer: optimize memory allocation in DecodeMessage and ReadResponse #174

Closed Dieterbe closed 7 years ago

Dieterbe commented 8 years ago

here's a profile from a production service (if you're interested, the code lives at https://github.com/raintank/raintank-metric/tree/master/metric_tank)

> go tool pprof --alloc_space bin-pointslicepool pointslicepool-5min
Entering interactive mode (type "help" for commands)
(pprof) top30
4054.86MB of 4386.44MB total (92.44%)
Dropped 423 nodes (cum <= 21.93MB)
Showing top 30 nodes out of 87 (cum >= 34.32MB)
      flat  flat%   sum%        cum   cum%
 1596.64MB 36.40% 36.40%  1596.64MB 36.40%  bytes.makeSlice
  856.81MB 19.53% 55.93%   856.81MB 19.53%  main.fix
  537.95MB 12.26% 68.20%   537.98MB 12.26%  github.com/nsqio/go-nsq.ReadResponse
  384.03MB  8.76% 76.95%   384.03MB  8.76%  github.com/tinylib/msgp/msgp.ReadStringBytes
  103.44MB  2.36% 79.31%   103.44MB  2.36%  github.com/dgryski/go-tsz.(*bstream).writeBits
   88.87MB  2.03% 81.34%   491.88MB 11.21%  github.com/raintank/raintank-metric/schema.(*MetricDataArray).UnmarshalMsg
   64.34MB  1.47% 82.80%   250.20MB  5.70%  github.com/gocql/gocql.(*Conn).executeQuery
   53.96MB  1.23% 84.03%    53.96MB  1.23%  github.com/gocql/gocql.glob.func2
   49.77MB  1.13% 85.17%   133.50MB  3.04%  main.(*AggMetric).Add
   36.58MB  0.83% 86.00%    36.58MB  0.83%  github.com/gocql/gocql.(*Session).Query
   33.98MB  0.77% 86.78%    34.10MB  0.78%  time.NewTimer
   26.94MB  0.61% 87.39%    26.94MB  0.61%  strconv.formatBits
   26.92MB  0.61% 88.00%   156.57MB  3.57%  main.getSeries
   25.97MB  0.59% 88.60%    25.97MB  0.59%  github.com/gocql/gocql.(*framer).writeQueryParams
   23.85MB  0.54% 89.14%    30.40MB  0.69%  fmt.Sprintf
   19.82MB  0.45% 89.59%   304.76MB  6.95%  main.(*cassandraStore).processWriteQueue
   18.97MB  0.43% 90.02%   403.01MB  9.19%  github.com/raintank/raintank-metric/schema.(*MetricData).UnmarshalMsg
   17.36MB   0.4% 90.42%   131.93MB  3.01%  github.com/gocql/gocql.(*Conn).exec
   17.07MB  0.39% 90.81%    67.65MB  1.54%  github.com/dgryski/go-tsz.(*Series).Iter
   15.69MB  0.36% 91.17%   247.20MB  5.64%  main.(*cassandraStore).insertChunk
   14.93MB  0.34% 91.51%    41.35MB  0.94%  main.(*cassandraStore).Search
   11.72MB  0.27% 91.77%    34.17MB  0.78%  net/http.readRequest
    6.59MB  0.15% 91.92%    42.23MB  0.96%  net/http.(*conn).readRequest
    6.27MB  0.14% 92.07%   100.34MB  2.29%  main.Get
    6.08MB  0.14% 92.21%    35.11MB   0.8%  main.graphiteRaintankJSON
    3.93MB  0.09% 92.30%    25.15MB  0.57%  github.com/gocql/gocql.(*framer).parseResultFrame
    1.98MB 0.045% 92.34%    57.19MB  1.30%  github.com/dgryski/go-tsz.(*Series).Push
    1.73MB 0.039% 92.38%    36.01MB  0.82%  encoding/json.Marshal
    1.60MB 0.036% 92.42%  1511.41MB 34.46%  io/ioutil.readAll
    1.06MB 0.024% 92.44%    34.32MB  0.78%  main.(*PersistMessageBatch).flush

we can probably address the first and 3rd item pretty easily by using a sync.Pool the first one comes from msg.Body, err = ioutil.ReadAll(buf) in go-nsq.DecodeMessage

(pprof) list DecodeMessage
Total: 14226.66GB
ROUTINE ======================== github.com/nsqio/go-nsq.DecodeMessage in /home/ubuntu/.go_workspace/src/github.com/nsqio/go-nsq/message.go
   11.93GB  6575.64GB (flat, cum) 46.22% of Total
         .          .    139:   return total, nil
         .          .    140:}
         .          .    141:
         .          .    142:// DecodeMessage deserializes data (as []byte) and creates a new Message
         .          .    143:func DecodeMessage(b []byte) (*Message, error) {
    5.56GB     5.56GB    144:   var msg Message
         .          .    145:
         .          .    146:   msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
         .          .    147:   msg.Attempts = binary.BigEndian.Uint16(b[8:10])
         .          .    148:
    6.37GB     6.37GB    149:   buf := bytes.NewBuffer(b[10:])
         .          .    150:
         .          .    151:   _, err := io.ReadFull(buf, msg.ID[:])
         .          .    152:   if err != nil {
         .          .    153:       return nil, err
         .          .    154:   }
         .          .    155:
         .  6563.71GB    156:   msg.Body, err = ioutil.ReadAll(buf) // DIETER here's the other big alloc in DecodeMessage
         .          .    157:   if err != nil {
         .          .    158:       return nil, err
         .          .    159:   }
         .          .    160:
         .          .    161:   return &msg, nil
(pprof)
(pprof) list ReadResponse
Total: 14226.66GB
ROUTINE ======================== github.com/nsqio/go-nsq.ReadResponse in /home/ubuntu/.go_workspace/src/github.com/nsqio/go-nsq/protocol.go
 1923.76GB  1924.20GB (flat, cum) 13.53% of Total
         .          .     46://    |  (int32) || (binary)
         .          .     47://    |  4-byte  || N-byte
         .          .     48://    ------------------------...
         .          .     49://        size       data
         .          .     50:func ReadResponse(r io.Reader) ([]byte, error) {
  478.01MB   478.01MB     51:   var msgSize int32
         .          .     52:
         .          .     53:   // message size
         .   450.01MB     54:   err := binary.Read(r, binary.BigEndian, &msgSize)
         .          .     55:   if err != nil {
         .          .     56:       return nil, err
         .          .     57:   }
         .          .     58:
         .          .     59:   // message binary data
 1923.30GB  1923.30GB     60:   buf := make([]byte, msgSize)
         .          .     61:   _, err = io.ReadFull(r, buf)
         .          .     62:   if err != nil {
         .          .     63:       return nil, err
         .          .     64:   }
twmb commented 8 years ago

The message must be drained from the conn before go-nsq can read the next message. Eliminating ioutil.ReadAll and returning the reader (the conn) must mean there must be one message handler per conn, and it must always be processing.

sync.Pool would be decent if you have a relatively stable size for incoming responses. If body size varies significantly, half the Get's from the pool will need resized on reading, resulting in no benefit.

I think a nice approach would be to provide an interface for custom message decoding. This eliminates the need to ReadAll in the first place by decoding directly into what you'll be using the body for.

Dieterbe commented 8 years ago

If body size varies significantly, half the Get's from the pool will need resized on reading, resulting in no benefit.

remember that you could put the bigger buffers back into the pool for reuse. so if you have high sizing variety it just means you'll be using more memory (until the next GC run) and there will definitely be benefit in terms of allocations and amount of GC runs.

i like your idea of having a special api for it though, and leave allocations up to the user of the library!

mreiferson commented 8 years ago

Thanks for looking into this @Dieterbe and apologies for the delayed response.

go-nsq was designed around the idea of making it really easy to get started and not having to write too much boilerplate for consumers. We haven't spent much time at all optimizing it (compared to e.g. nsqd).

I'm guessing there are lots of opportunities like this throughout the library.

With respect to this specific issue, why don't we just experiment with some of the proposed solutions and see how far we get? I would love some help contributing patches for these!

Dieterbe commented 8 years ago

I just tried out this patch:

diff --git a/message.go b/message.go
index a5a7fa4..67ba41d 100644
--- a/message.go
+++ b/message.go
@@ -1,10 +1,9 @@
 package nsq

 import (
-       "bytes"
        "encoding/binary"
+       "errors"
        "io"
-       "io/ioutil"
        "sync/atomic"
        "time"
 )
@@ -139,24 +138,19 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) {
        return total, nil
 }

-// DecodeMessage deseralizes data (as []byte) and creates a new Message
+// DecodeMessage deserializes data (as []byte) and creates a new Message
 func DecodeMessage(b []byte) (*Message, error) {
-       var msg Message
+       var msg Message // DIETER 0.1% of alloc in this fn

-       msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
-       msg.Attempts = binary.BigEndian.Uint16(b[8:10])
-
-       buf := bytes.NewBuffer(b[10:])
-
-       _, err := io.ReadFull(buf, msg.ID[:])
-       if err != nil {
-               return nil, err
+       if len(b) < 11+MsgIDLength {
+               return nil, errors.New("not enough data to decode valid message")
        }

-       msg.Body, err = ioutil.ReadAll(buf)
-       if err != nil {
-               return nil, err
-       }
+       msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
+       msg.Attempts = binary.BigEndian.Uint16(b[8:10])
+       //msg.ID = MessageID(b[10 : 10+MsgIDLength][:])
+       copy(msg.ID[:], b[10:10+MsgIDLength])
+       msg.Body = b[10+MsgIDLength:]

        return &msg, nil
 }

i haven't had much time to work on this yet or to look at the other spots, but this looked like low hanging fruit. in screenshot below is 2 runs of our service with the same input load coming from NSQ. first half of the graphs is with patch applied, second half is without the patch. note the difference in how fast "alloc not freed" rises and the resulting difference in frequency of GC runs go-nsq-after-befone i'm also seeing lower cpu usage with the patch applied.

mreiferson commented 8 years ago

@Dieterbe those results look promising and the diff looks simple and safe to me, want to open up a PR so we can get that in?

(we can leave this issue open to track overall optimization progress)