Open joshdvir opened 7 years ago
First step would probably be to get a heap and allocation profile of your application using pprof.
Please generate the profiles with the latest version of the package. The recent change to pool flate readers and writes should help.
Thanks guys,
I'm waiting for the next version of Centrifugo which includes your latest version and then will profile the application and upload here.
You can close this issue until then, but I hope to have the new version today or tomorrow the latest so it's up to you.
Here is the pprof dump
This is the heap profile
@kisielk @garyburd I've updated the gist please check now
https://gist.github.com/joshdvir/091229e3d3e4ade8d73b8cffe86c602b
I asked @joshdvir to send be cpu and memory profiles from production node, here is what we have:
CPU:
(pprof) top 20 --cum
28.99s of 62.03s total (46.74%)
Dropped 523 nodes (cum <= 0.31s)
Showing top 20 nodes out of 155 (cum >= 8.07s)
flat flat% sum% cum cum%
0 0% 0% 58.68s 94.60% runtime.goexit
0.05s 0.081% 0.081% 45.44s 73.25% github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessages
0.16s 0.26% 0.34% 44.23s 71.30% github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessage
0.16s 0.26% 0.6% 44.07s 71.05% github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*wsSession).Send
0.05s 0.081% 0.68% 43.82s 70.64% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage
0.01s 0.016% 0.69% 21.67s 34.93% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*flateWriteWrapper).Close
0.03s 0.048% 0.74% 20.19s 32.55% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).NextWriter
0.07s 0.11% 0.85% 19.79s 31.90% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.compressNoContextTakeover
17.56s 28.31% 29.16% 17.56s 28.31% runtime.memclr
0.04s 0.064% 29.23% 15.46s 24.92% compress/flate.(*Writer).Reset
0.03s 0.048% 29.28% 15.42s 24.86% compress/flate.(*compressor).reset
0 0% 29.28% 14.40s 23.21% compress/flate.(*Writer).Flush
0 0% 29.28% 14.40s 23.21% compress/flate.(*compressor).syncFlush
2.62s 4.22% 33.50% 14.01s 22.59% compress/flate.(*compressor).deflate
0.01s 0.016% 33.52% 11.05s 17.81% compress/flate.(*compressor).writeBlock
0.15s 0.24% 33.76% 11.04s 17.80% compress/flate.(*huffmanBitWriter).writeBlock
0.21s 0.34% 34.10% 9.05s 14.59% runtime.systemstack
0.06s 0.097% 34.19% 8.87s 14.30% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*messageWriter).flushFrame
0.07s 0.11% 34.31% 8.81s 14.20% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).write
Most of cpu time spent in WriteMessage:
(pprof) list WriteMessage
Total: 1.03mins
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/conn.go
50ms 43.82s (flat, cum) 70.64% of Total
. . 659:
. . 660:// WriteMessage is a helper method for getting a writer using NextWriter,
. . 661:// writing the message and closing the writer.
. . 662:func (c *Conn) WriteMessage(messageType int, data []byte) error {
. . 663:
50ms 50ms 664: if c.isServer && (c.newCompressionWriter == nil || !c.enableWriteCompression) {
. . 665:
. . 666: // Fast path with no allocations and single frame.
. . 667:
. 20ms 668: if err := c.prepWrite(messageType); err != nil {
. . 669: return err
. . 670: }
. . 671: mw := messageWriter{c: c, frameType: messageType, pos: maxFrameHeaderSize}
. 10ms 672: n := copy(c.writeBuf[mw.pos:], data)
. . 673: mw.pos += n
. . 674: data = data[n:]
. 1.69s 675: return mw.flushFrame(true, data)
. . 676: }
. . 677:
. 20.19s 678: w, err := c.NextWriter(messageType)
. . 679: if err != nil {
. . 680: return err
. . 681: }
. 190ms 682: if _, err = w.Write(data); err != nil {
. . 683: return err
. . 684: }
. 21.67s 685: return w.Close()
. . 686:}
NextWriter:
(pprof) list NextWriter
Total: 1.03mins
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).NextWriter in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/conn.go
30ms 20.19s (flat, cum) 32.55% of Total
. . 437:// method flushes the complete message to the network.
. . 438://
. . 439:// There can be at most one open writer on a connection. NextWriter closes the
. . 440:// previous writer if the application has not already done so.
. . 441:func (c *Conn) NextWriter(messageType int) (io.WriteCloser, error) {
. 90ms 442: if err := c.prepWrite(messageType); err != nil {
. . 443: return nil, err
. . 444: }
. . 445:
. . 446: mw := &messageWriter{
. . 447: c: c,
. . 448: frameType: messageType,
10ms 280ms 449: pos: maxFrameHeaderSize,
. . 450: }
. 10ms 451: c.writer = mw
. . 452: if c.newCompressionWriter != nil && c.enableWriteCompression && isData(messageType) {
10ms 19.80s 453: w := c.newCompressionWriter(c.writer)
. . 454: mw.compress = true
10ms 10ms 455: c.writer = w
. . 456: }
. . 457: return c.writer, nil
. . 458:}
. . 459:
. . 460:type messageWriter struct {
compressNoContextTakeover:
(pprof) list compressNoContextTakeover
Total: 1.03mins
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.compressNoContextTakeover in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/compression.go
70ms 19.79s (flat, cum) 31.90% of Total
. . 33: fr.(flate.Resetter).Reset(io.MultiReader(r, strings.NewReader(tail)), nil)
. . 34: return &flateReadWrapper{fr}
. . 35:}
. . 36:
. . 37:func compressNoContextTakeover(w io.WriteCloser) io.WriteCloser {
. 130ms 38: tw := &truncWriter{w: w}
40ms 3.93s 39: fw, _ := flateWriterPool.Get().(*flate.Writer)
10ms 15.47s 40: fw.Reset(tw)
20ms 260ms 41: return &flateWriteWrapper{fw: fw, tw: tw}
. . 42:}
And now heap profile:
(pprof) top 30 --cum
4794.23MB of 5414.45MB total (88.55%)
Dropped 238 nodes (cum <= 27.07MB)
Showing top 30 nodes out of 46 (cum >= 113.64MB)
flat flat% sum% cum cum%
0 0% 0% 5385.39MB 99.46% runtime.goexit
0 0% 0% 4277.82MB 79.01% sync.(*Pool).Get
0 0% 0% 4277.82MB 79.01% sync.(*Pool).getSlow
0 0% 0% 4182.80MB 77.25% github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessages
0 0% 0% 4181.80MB 77.23% github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessage
0 0% 0% 4181.80MB 77.23% github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*wsSession).Send
0 0% 0% 4181.80MB 77.23% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage
8MB 0.15% 0.15% 4168.27MB 76.98% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).NextWriter
12MB 0.22% 0.37% 4160.27MB 76.84% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.compressNoContextTakeover
3792.80MB 70.05% 70.42% 4148.27MB 76.61% compress/flate.NewWriter
0 0% 70.42% 4148.27MB 76.61% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.glob..func1
0.50MB 0.0092% 70.43% 1156.29MB 21.36% net/http.(*conn).serve
0 0% 70.43% 873.42MB 16.13% github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*HTTPServer).Logged.func1
0 0% 70.43% 873.42MB 16.13% net/http.HandlerFunc.ServeHTTP
0 0% 70.43% 872.92MB 16.12% github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*HTTPServer).WrapShutdown.func1
0 0% 70.43% 872.92MB 16.12% net/http.(*ServeMux).ServeHTTP
0 0% 70.43% 872.92MB 16.12% net/http.serverHandler.ServeHTTP
0 0% 70.43% 866.91MB 16.01% github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*HTTPServer).RawWebsocketHandler
0 0% 70.43% 866.91MB 16.01% github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*HTTPServer).RawWebsocketHandler-fm
0 0% 70.43% 404.78MB 7.48% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).ReadMessage
355.47MB 6.57% 76.99% 355.47MB 6.57% compress/flate.(*compressor).init
0 0% 76.99% 320.19MB 5.91% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Upgrader).Upgrade
0.50MB 0.0092% 77.00% 292.64MB 5.40% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).NextReader
1.50MB 0.028% 77.03% 291.64MB 5.39% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.decompressNoContextTakeover
215.85MB 3.99% 81.02% 216.35MB 4.00% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.newConn
159.10MB 2.94% 83.96% 159.10MB 2.94% compress/flate.(*decompressor).Reset
129.04MB 2.38% 86.34% 129.04MB 2.38% compress/flate.NewReader
0 0% 86.34% 129.04MB 2.38% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.glob..func2
119.46MB 2.21% 88.55% 119.46MB 2.21% net/http.newBufioWriterSize
0 0% 88.55% 113.64MB 2.10% io/ioutil.ReadAll
NextWriter:
(pprof) list WriteMessage
Total: 5.29GB
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/conn.go
0 4.08GB (flat, cum) 77.23% of Total
. . 673: mw.pos += n
. . 674: data = data[n:]
. . 675: return mw.flushFrame(true, data)
. . 676: }
. . 677:
. 4.07GB 678: w, err := c.NextWriter(messageType)
. . 679: if err != nil {
. . 680: return err
. . 681: }
. . 682: if _, err = w.Write(data); err != nil {
. . 683: return err
. . 684: }
. 13.53MB 685: return w.Close()
. . 686:}
compressNoContextTakeover:
(pprof) list compressNoContextTakeover
Total: 5.29GB
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.compressNoContextTakeover in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/compression.go
12MB 4.06GB (flat, cum) 76.84% of Total
. . 33: fr.(flate.Resetter).Reset(io.MultiReader(r, strings.NewReader(tail)), nil)
. . 34: return &flateReadWrapper{fr}
. . 35:}
. . 36:
. . 37:func compressNoContextTakeover(w io.WriteCloser) io.WriteCloser {
10MB 10MB 38: tw := &truncWriter{w: w}
. 4.05GB 39: fw, _ := flateWriterPool.Get().(*flate.Writer)
. . 40: fw.Reset(tw)
2MB 2MB 41: return &flateWriteWrapper{fw: fw, tw: tw}
. . 42:}
Possibly related: https://github.com/golang/go/issues/18625
@y3llowcake thanks for pointing on this issue.
I've written test case for Gorilla Websocket:
type testConn struct {
conn *Conn
messages chan []byte
}
func newTestConn(c *Conn, bufferSize int) *testConn {
return &testConn{
conn: c,
messages: make(chan []byte, bufferSize),
}
}
func printss() {
m := runtime.MemStats{}
runtime.ReadMemStats(&m)
fmt.Printf("inuse: %d sys: %d\n", m.StackInuse, m.StackSys)
}
func TestWriteWithCompression(t *testing.T) {
w := ioutil.Discard
done := make(chan struct{})
numConns := 1000
numMessages := 1000
conns := make([]*testConn, numConns)
var wg sync.WaitGroup
for i := 0; i < numConns; i++ {
c := newConn(fakeNetConn{Reader: nil, Writer: w}, false, 1024, 1024)
c.enableWriteCompression = true
c.newCompressionWriter = compressNoContextTakeover
conns[i] = newTestConn(c, 256)
wg.Add(1)
go func(c *testConn) {
defer wg.Done()
i := 0
for i < numMessages {
select {
case <-done:
return
case msg := <-c.messages:
c.conn.WriteMessage(TextMessage, msg)
i++
}
}
}(conns[i])
}
messages := textMessages(100)
for i := 0; i < numMessages; i++ {
if i%100 == 0 {
printss()
}
msg := messages[i%len(messages)]
for _, c := range conns {
c.messages <- msg
}
}
wg.Wait()
}
func textMessages(num int) [][]byte {
messages := make([][]byte, num)
for i := 0; i < num; i++ {
msg := fmt.Sprintf("planet: %d, country: %d, city: %d, street: %d", i, i, i, i)
messages[i] = []byte(msg)
}
return messages
}
It creates 1000 connections with compression enabled, each with buffered message channel. Then in a loop we write message into each connection.
Here is how it behaves with go1.7.4
fz@websocket: go test -test.run=TestWriteWithCompression
inuse: 4259840 sys: 4259840
inuse: 27394048 sys: 27394048
inuse: 246251520 sys: 246251520
inuse: 1048510464 sys: 1048510464
inuse: 1048510464 sys: 1048510464
inuse: 1049034752 sys: 1049034752
inuse: 1049034752 sys: 1049034752
inuse: 1049034752 sys: 1049034752
inuse: 1049034752 sys: 1049034752
inuse: 1049034752 sys: 1049034752
PASS
ok github.com/gorilla/websocket 11.053s
Using Go with commit https://github.com/golang/go/commit/9c3630f578db1d4331b367c3c7d284db299be3a6
fz@websocket: go1.8 test -test.run=TestWriteWithCompression
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
PASS
ok github.com/gorilla/websocket 12.023s
Though It's hard to say at moment will this fix solve original problem in this issue or not.
I also tried the same with flate
from https://github.com/klauspost/compress by @klauspost which already contains that array copy fix in master:
fz@websocket: go test -test.run=TestWriteWithCompression
inuse: 4358144 sys: 4358144
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
PASS
ok github.com/gorilla/websocket 3.426s
But actually even without that fix https://github.com/klauspost/compress library behaves without memory grows... I can't explain this.
Also here is benchmark result using https://github.com/klauspost/compress library:
BenchmarkWriteWithCompression-4 200000 5676 ns/op 149 B/op 3 allocs/op
It's 4x speedup comparing to standard lib compress/flate
results:
BenchmarkWriteWithCompression-4 50000 25362 ns/op 128 B/op 3 allocs/op
@garyburd I understand that having non-standard lib package in core could be a wrong step, but maybe we can consider mechanism to let it be plugged somehow by user's code?
even without that fix [it] behaves without memory grows... I can't explain this.
AFAICT, this package uses "level 3" compression (which is a good choice). In my package level 1-4 are a specialized, and do not use the "generic" code, which has the issue.
In Go 1.7 level 1 (Best speed) has a similar specialized function. I would think that if you use that, you will not experience the issue. That might be a solution you can use, so you do not have to import a specialized package (even if I wouldn't mind to give users the option). Performance for level 1 should be very close to my package.
@klauspost thanks for explaining, just tried what you said - yes, with compression level 1 performance is comparable to your library and has no memory problems in go1.7 (in test case above)
@garyburd what do you think about this? I see two solutions that can help us - make compression level exported variable or allow to plug custom flate implementation. Of course we can also wait for go1.8 but a way to improve compression performance still very important. Do you want us to try creating custom build with advices you gave and fixed array copy bug and see how it behaves in production?
@FZambia How about setting compression level to one for now? It's probably the best option for most applications at this time and avoids exposing more API surface area.
@garyburd I agree with @FZambia, having the option to set the compression would help greatly, for my specific use case I have a fan out of 80K messages per sec for 200K users which cause a lot of traffic out, if I will be able to set the compression level I could find the sweet spot between servers numbers and traffic out. Usually, servers cost much less than traffic when the traffic is so high so having this option configurable will be awesome Thanks
OK, let's add the following:
type CompressionOptions {
// Level specifies the compression level for the flate compressor. Valid levels range from
// -2 to 9. Level -2 will use Huffman compression only. Level -1 uses the default compression
// level. Level 0 does not attempt any compression. Levels 1 through 9 range from best
// speed to best compression.
//
// Applications should set this field. The default value of 0 does not attempt any compression.
Level int
}
type Dialer struct {
// CompressionOptions specifies options the client should use for
// per message compression (RFC 7692). If CompressionOptions is nil and
// EnableCompression is nil, then the client does not attempt to negotiate
// compression with the server.
CompressionOptions *CompressionOptions
// EnableCompression specifies if the client should attempt to negotiate
// per message compression (RFC 7692). Setting this value to true does not
// guarantee that compression will be supported. Currently only "no context
// takeover" modes are supported.
//
// Deprecated: Set CompressionOptions to non-nil value to enable compression
// negotiation.
EnableCompression bool
}
Modify Upgrader to match Dialer.
@garyburd I agree that level 1 is better for default value at moment because it fixes memory grows on Go1.7 and compression is so costly. But looks like on fanout level in such a big apps as @joshdvir has saving bandwidth saves a lot of money so having compression level configurable makes sense.
We made a custom build with compression level 1 and counters you suggested and put it into production. Counter values are:
Node 1:
"gorilla_websocket_flate_writer_from_pool": 1453147,
"gorilla_websocket_new_flate_writer": 6702
Node 2:
"gorilla_websocket_flate_writer_from_pool": 1820919,
"gorilla_websocket_new_flate_writer": 3676,
Node 3:
"gorilla_websocket_flate_writer_from_pool": 574187,
"gorilla_websocket_new_flate_writer": 321
...
It's aggregation over 1 minute. So pool looks pretty effective but...
...Compression is still the leader in allocs and CPU profiles and getting from sync.Pool is the most allocation-expensive operation for some reason.
Here is CPU profile now:
(pprof) top 30 --cum
27.28s of 52.42s total (52.04%)
Dropped 414 nodes (cum <= 0.26s)
Showing top 30 nodes out of 137 (cum >= 1.89s)
flat flat% sum% cum cum%
0 0% 0% 50.21s 95.78% runtime.goexit
0.16s 0.31% 0.31% 43.93s 83.80% github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessages
0.21s 0.4% 0.71% 42.52s 81.11% github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessage
0.19s 0.36% 1.07% 42.31s 80.71% github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*wsSession).Send
0.21s 0.4% 1.47% 41.87s 79.87% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage
0.01s 0.019% 1.49% 35.43s 67.59% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*flateWriteWrapper).Close
0.03s 0.057% 1.55% 24.69s 47.10% compress/flate.(*Writer).Flush
0 0% 1.55% 24.66s 47.04% compress/flate.(*compressor).syncFlush
0.04s 0.076% 1.62% 24.03s 45.84% compress/flate.(*compressor).encSpeed
0.08s 0.15% 1.77% 18.16s 34.64% compress/flate.(*huffmanBitWriter).writeBlockDynamic
0.12s 0.23% 2.00% 15.03s 28.67% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*messageWriter).flushFrame
0.11s 0.21% 2.21% 14.90s 28.42% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).write
0.91s 1.74% 3.95% 13.21s 25.20% compress/flate.(*huffmanEncoder).generate
0 0% 3.95% 12.72s 24.27% net.(*conn).Write
0.06s 0.11% 4.06% 12.72s 24.27% net.(*netFD).Write
11.78s 22.47% 26.54% 12.16s 23.20% syscall.Syscall
0.05s 0.095% 26.63% 12.09s 23.06% syscall.Write
0.02s 0.038% 26.67% 12.04s 22.97% syscall.write
0.61s 1.16% 27.83% 11.98s 22.85% compress/flate.(*huffmanBitWriter).indexTokens
0 0% 27.83% 10.61s 20.24% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*messageWriter).Close
5.20s 9.92% 37.75% 6.62s 12.63% compress/flate.(*huffmanEncoder).bitCounts
4.77s 9.10% 46.85% 5.44s 10.38% compress/flate.encodeBestSpeed
list WriteMessage
(pprof) list WriteMessage
Total: 52.42s
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/conn.go
210ms 41.87s (flat, cum) 79.87% of Total
. . 659:
. . 660:// WriteMessage is a helper method for getting a writer using NextWriter,
. . 661:// writing the message and closing the writer.
. . 662:func (c *Conn) WriteMessage(messageType int, data []byte) error {
. . 663:
160ms 160ms 664: if c.isServer && (c.newCompressionWriter == nil || !c.enableWriteCompression) {
. . 665:
. . 666: // Fast path with no allocations and single frame.
. . 667:
10ms 40ms 668: if err := c.prepWrite(messageType); err != nil {
. . 669: return err
. . 670: }
. . 671: mw := messageWriter{c: c, frameType: messageType, pos: maxFrameHeaderSize}
. 80ms 672: n := copy(c.writeBuf[mw.pos:], data)
. . 673: mw.pos += n
. . 674: data = data[n:]
10ms 4.43s 675: return mw.flushFrame(true, data)
. . 676: }
. . 677:
. 1.63s 678: w, err := c.NextWriter(messageType)
. . 679: if err != nil {
. . 680: return err
. . 681: }
30ms 100ms 682: if _, err = w.Write(data); err != nil {
. . 683: return err
. . 684: }
. 35.43s 685: return w.Close()
list Close
. . 104:func (w *flateWriteWrapper) Close() error {
. . 105: if w.fw == nil {
. . 106: return errWriteClosed
. . 107: }
. 24.69s 108: err1 := w.fw.Flush()
10ms 130ms 109: flateWriterPool.Put(w.fw)
. . 110: w.fw = nil
. . 111: if w.tw.p != [4]byte{0, 0, 0xff, 0xff} {
. . 112: return errors.New("websocket: internal error, unexpected bytes at end of flate stream")
. . 113: }
. 10.61s 114: err2 := w.tw.w.Close()
. . 115: if err1 != nil {
. . 116: return err1
. . 117: }
. . 118: return err2
. . 119:}
list Flush
. . 711:// In the terminology of the zlib library, Flush is equivalent to Z_SYNC_FLUSH.
. . 712:func (w *Writer) Flush() error {
. . 713: // For more about flushing:
. . 714: // http://www.bolet.org/~pornin/deflate-flush.html
30ms 24.69s 715: return w.d.syncFlush()
. . 716:}
. . 717:
. . 718:// Close flushes and closes the writer.
. . 719:func (w *Writer) Close() error {
. . 720: return w.d.close()
ROUTINE ======================== compress/flate.(*compressor).syncFlush in /Users/fz/go1.7/src/compress/flate/deflate.go
0 24.66s (flat, cum) 47.04% of Total
. . 555:func (d *compressor) syncFlush() error {
. . 556: if d.err != nil {
. . 557: return d.err
. . 558: }
. . 559: d.sync = true
. 24.03s 560: d.step(d)
. . 561: if d.err == nil {
. 490ms 562: d.w.writeStoredHeader(0, false)
. 140ms 563: d.w.flush()
. . 564: d.err = d.w.err
. . 565: }
. . 566: d.sync = false
. . 567: return d.err
. . 568:}
Memory usage is much better now but it still very high, compression allocates a lot:
fz@centrifugo: go tool pprof --alloc_space centrifugo heap_profile_extra
Entering interactive mode (type "help" for commands)
(pprof) top 30 --cum
518.97GB of 541.65GB total (95.81%)
Dropped 314 nodes (cum <= 2.71GB)
Showing top 30 nodes out of 35 (cum >= 3.33GB)
flat flat% sum% cum cum%
0 0% 0% 541.53GB 100% runtime.goexit
0 0% 0% 505.54GB 93.33% github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessages
0 0% 0% 504.45GB 93.13% github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessage
0 0% 0% 504.45GB 93.13% github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*wsSession).Send
0 0% 0% 504.45GB 93.13% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage
6.63GB 1.22% 1.22% 501.75GB 92.63% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).NextWriter
6.56GB 1.21% 2.44% 495.11GB 91.41% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.compressNoContextTakeover
0 0% 2.44% 491.89GB 90.81% sync.(*Pool).Get
0 0% 2.44% 491.89GB 90.81% sync.(*Pool).getSlow
359.74GB 66.42% 68.85% 488.55GB 90.20% compress/flate.NewWriter
0 0% 68.85% 488.55GB 90.20% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.glob..func1
128.81GB 23.78% 92.63% 128.81GB 23.78% compress/flate.(*compressor).init
0.01GB 0.0019% 92.64% 28.97GB 5.35% net/http.(*conn).serve
0 0% 92.64% 25.18GB 4.65% github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*HTTPServer).Logged.func1
0 0% 92.64% 25.18GB 4.65% net/http.(*ServeMux).ServeHTTP
0 0% 92.64% 25.18GB 4.65% net/http.HandlerFunc.ServeHTTP
0 0% 92.64% 25.18GB 4.65% net/http.serverHandler.ServeHTTP
list NextWriter
(pprof) list NextWriter
Total: 541.65GB
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).NextWriter in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/conn.go
6.63GB 501.75GB (flat, cum) 92.63% of Total
. . 444: }
. . 445:
. . 446: mw := &messageWriter{
. . 447: c: c,
. . 448: frameType: messageType,
6.63GB 6.63GB 449: pos: maxFrameHeaderSize,
. . 450: }
. . 451: c.writer = mw
. . 452: if c.newCompressionWriter != nil && c.enableWriteCompression && isData(messageType) {
. 495.11GB 453: w := c.newCompressionWriter(c.writer)
. . 454: mw.compress = true
. . 455: c.writer = w
. . 456: }
. . 457: return c.writer, nil
. . 458:}
list compressNoContextTakeover
(pprof) list compressNoContextTakeover
Total: 541.65GB
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.compressNoContextTakeover in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/compression.go
6.56GB 495.11GB (flat, cum) 91.41% of Total
. . 44: fr.(flate.Resetter).Reset(io.MultiReader(r, strings.NewReader(tail)), nil)
. . 45: return &flateReadWrapper{fr}
. . 46:}
. . 47:
. . 48:func compressNoContextTakeover(w io.WriteCloser) io.WriteCloser {
4.41GB 4.41GB 49: tw := &truncWriter{w: w}
. 488.55GB 50: fw, _ := flateWriterPool.Get().(*flate.Writer)
. . 51: plugin.Metrics.Counters.Inc("gorilla_websocket_flate_writer_from_pool")
. . 52: fw.Reset(tw)
2.15GB 2.15GB 53: return &flateWriteWrapper{fw: fw, tw: tw}
. . 54:}
--inuse_space
shows similar picture just values two order of magnitude less (2.69GB for flateWriterPool.Get().(*flate.Writer) line which is leader).
It's hard to say what can we do with such a big compression overhead...
@garyburd you suggest adding CompressionOptions
- I can do pull request with it, but maybe just a global exported variable like DefaultFlateCompressionLevel
that we can set on application start or setter method will do the work? We don't need per-connection compression level - and we can eventually do what you suggested if there will be a need later. And no deprecation will be required this way for now.
@FZambia Thank you for testing the pool effectiveness and reporting the profiles.
I don't want to add an API now that might replaced with another API later.
As I think about it more, it's better to add one method:
// SetCompressionLevel sets the flate compression level for the next message.
// Valid levels range from -2 to 9. Level -2 will use Huffman compression only.
// Level -1 uses the default compression level. Level 0 does not attempt any
// compression. Levels 1 through 9 range from best speed to best compression.
func (c *Conn) SetCompressionLevel(n int) error {
}
This is more flexible than other options. The implementation will replace flateWriterPool with flatWriterPools[12]. The flateWriterWrapper will need to store the level so the writer can be returned to the correct pool.
I think that having one method that changes default compression level still makes sense:
var defaultCompressionLevel int = 1
// SetDefaultCompressionLevel sets the flate compression level which will be used by
// default to compress messages when compression negotiated. This function must be
// called once before application starts.
//
// Valid levels range from -2 to 9. Level -2 will use Huffman compression only.
// Level -1 uses the default compression level. Level 0 does not attempt any
// compression. Levels 1 through 9 range from best speed to best compression.
func (c *Conn) SetDefaultCompressionLevel(n int) error {
defaultCompressionLevel = n
}
In most situations I suppose users that need custom compression need to set this default value once.
Then if someone needs compression level per connection/message we can add SetCompressionLevel
and a [12]flateWriterPool:
// SetCompressionLevel sets the flate compression level for the next message.
// Valid levels range from -2 to 9. Level -2 will use Huffman compression only.
// Level -1 uses the default compression level. Level 0 does not attempt any
// compression. Levels 1 through 9 range from best speed to best compression.
// If not set default compression level will be used.
func (c *Conn) SetCompressionLevel(n int) error {
}
If it's not called but compression negotiated defaultCompressionLevel
will be used. The only caveat I see is that SetDefaultCompressionLevel
should be called once before application starts in current implementation but it seems pretty opaque.
Just looked at size of each flate.Writer
instance:
package main
import "unsafe"
import "compress/flate"
func main() {
var w flate.Writer
println(unsafe.Sizeof(w))
}
600Kb! This size is surprising for me - I have not even assumed that it's such a big thing:)
Yeah - there is quite a number of tables needed to maintain state, produce huffman tables and buffer output. There isn't much that can be done about it, except for level -2(HuffmanOnly), 0 (No Compression) and 1(Best Speed) in the standard library.
For my own library I have been looking at reducing the memory requirements for encoding options that does not require as many buffers, see https://github.com/klauspost/compress/pull/70 (it still has issues, as can be seen by the crash I logged in the issue)
@FZambia I do not want to use package level variables for settings. The setting should be included with the other settings in Dialer and Upgrader or it should be part of the connection so it can be changed from message to message.
See b0dc45572b148296ddf01989da00a2923d213a56.
@garyburd many thanks for your time helping us with this. Looking at all these profiles do you see any way to reduce compression memory usage and allocations?
If the application sends the same message to multiple clients, then #182 is the next thing to try.
@garyburd #182 could do magic in my use case, where my fan out is between 50K-90K per sec where all messages are the same.
Thank you for your help.
Is this issue fixed by b0dc45572b148296ddf01989da00a2923d213a56 and 804cb600d06b10672f2fbc0a336a7bee507a428e?
@garyburd heap before changes:
After changes (compression level 1 and using PreparedMessage
):
The same picture for CPU. So we don't see compression in heap and cpu profiles on first place anymore.
Though memory usage reduced it's still pretty high when using compression - but this can be application problem, we will try to investigate
@garyburd just analized inuse_space
profile @joshdvir gave me.
The leaders are gorilla/websocket.newConnBRW
(36%), http.newBufioWriterSize
(17%), http.newBufioReader
(16%). I thought we used a recent improvement you added in #223 but looking at profiles I noticed that read and write buffers from hijack
are not reused for some reason. Then I found a bug in Centrifugo code - ReadBufferSize
and WriteBufferSize
were set to SockJS-go
default values (4096) in case of those sizes set to 0 in configuration. Will build with fix and come back with results.
There are some other things in profiles that might interest you - I'll try to upload svg graph visualizations from build with fix.
Here are some graphs using Centrifugo with latest Gorilla Websocket. This is from a Centrifugo node running by @joshdvir with many connections (about 100k) and websocket compression enabled (level 1), go1.7.5
CPU:
Here we see that most of cpu spent on syscalls - read, write - I don't think we can do a lot here, because we send a lot of fanout messages to different sockets. So this looks normal.
Alloc space:
Here we see a big impact of compress/flate.(*decompressor).Reset
- I thought a bit how to improve this, but have not found a way..
Inuse space:
@garyburd do you see a way to improve things further? If not then we can live with this I suppose.
P.S. I am going to create many ws connections on local machine with compression enabled/disabled and compare profiles.
I found some tweaks that could be done to the decoder to avoid allocations, and postpone allocation of about 4KB to when it is actually needed.
I don't see anything obvious to improve in the websocket package.
Just tried on localhost - looks like memory consumption comes from different places in more or less equal proportions so it's hard to diagnose bottleneck to work on. I think we are done here for the moment then.
@klauspost I tried your compress lib on local setup too (connect 1000 clients to node) - in my case memory consumption was 50mb without compression, 90mb with std lib compression, 90mb with your lib without patch and 85mb with your lib with patch https://github.com/klauspost/compress/pull/76 - I can't say how it can affect production instance though (as I just created many connections, without broadcasting messages, without messages coming from clients except needed to connect and subscribe on channel)- maybe in real scenario memory benefit will be more significant (especially as we saw a lot of memory allocated in reading compressed on alloc_space graph).
Updating to Go1.8.1 from Go 1.7.5 improved memory usage by about 15% (90mb -> 75mb) on my artificial local setup. But I can't say exactly where this gain came from.
On production instance with compression enabled memory usage also dropped on the same 15-20% with Go 1.8
@joshdvir @FZambia (and others), do we feel that this issue has been addressed with fixes in the Go standard library or is there still something to track down here?
@theckman hi, I have no ideas how we can improve compression performance further. I don't think that the problem is fully solved because compression still affects performance a lot. But if you plan to close this issue - feel free to do it as no obvious solution on horizon. Depending on situation PreparedMessage
can be a life saver but I personally don't like its concept a lot because it's too specific and breaks semantics of writing bytes into connection.
Maybe if you wrote a fairly standalone benchmark, meaning only using the stdlib and deflate which represented real-world scenarios I could use that as a benchmark for testing if a leaner encoder would actually help.
The PR itself would have to be rewritten, but having a good benchmark would be a starting point.
We could also add a []byte
-> []byte
API, where the complete frame is supplied that would eliminate the need to have an internal buffer (and the penalty of copying to it).
Given these performance issues, is compression worth enabling?
We could also add a []byte -> []byte API, where the complete frame is supplied that would eliminate the need to have an internal buffer (and the penalty of copying to it).
How would this API work? Would we allow writing directly to the net.Conn?
On the server without compression, the WriteMessage API mostly copies the data directly from the application supplied []byte to the underlying network connection. The prepared message API also copies a []byte directly to the underlying network connection.
I believe this issue is due to compress/flate not not exposing a way to adjust the sliding window. See https://golang.org/issue/3155
Thus, every flat writer allocates a lot of memory and even though they are reused via pool, this ends up causing the large memory usage spike with compression on.
Doesn't seem like there is anything this library can do further until that issue is closed.
In fact, according to this benchmark
func BenchmarkFlate(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
flate.NewWriter(nil, flate.BestSpeed)
}
}
$ go test -bench=BenchmarkFlate -run=^$
goos: darwin
goarch: amd64
pkg: scratch
BenchmarkFlate-8 10000 131504 ns/op 1200008 B/op 15 allocs/op
PASS
ok scratch 1.345s
flate.NewWriter allocates 1.2 MB! That's a lot of memory to allocate for compression when you're writing 600 byte WebSocket messages.
With BestCompression or DefaultCompress, the allocation drops 800 KB but that's still massive.
# DefaultCompression
$ go test -bench=BenchmarkFlate -run=^$
goos: darwin
goarch: amd64
pkg: nhooyr.io/websocket
BenchmarkFlate-8 20000 93332 ns/op 806784 B/op 13 allocs/op
PASS
ok nhooyr.io/websocket 2.848s
# BestCompression
$ go test -bench=BenchmarkFlate -run=^$
goos: darwin
goarch: amd64
pkg: nhooyr.io/websocket
BenchmarkFlate-8 20000 95197 ns/op 806784 B/op 13 allocs/op
PASS
ok nhooyr.io/websocket 2.879s
I've filed golang/go#32371
Have a look at https://github.com/klauspost/compress/pull/176
@klauspost that's really cool. What sort of memory overhead would be involved for each Write?
It will require a few hundred kilobytes, but contrary to before the other alternatives once it is compressed the memory will be freed.
For web sockets, that is what you need. Meaning no live allocations for inactive sockets. Just be sure all is written in a single write.
@klauspost thanks, I'll try to check it out as soon as I have time. I have a gist that allows to use custom flate with Gorilla WebSocket library - need to revisit it and experiment with your stateless compression branch.
https://github.com/klauspost/compress/pull/185 brings down heap allocation to a few hundred bytes during a Write by having an internal sync.Pool
.
My testing of @klauspost's library with stateless compression has shown great results.
See https://github.com/klauspost/compress/pull/216#issuecomment-586660128
About 9.5 KB allocated per message written for the simple 512 byte message []byte(strings.Repeat("1234", 128))
.
edit: It's now available for testing in master on my library.
To clarify, that's with a dictionary in context takeover mode, it's 50 B a message allocated if there is no dictionary.
Hi,
I've been started using with Centrifugo in the past week. I'm using the raw Websocket endpoint which uses this library under the hood.
I'm experiencing a situation where per-message-deflate is enabled there is a massive grow in memory up to the point the docker container crashes for using too much memory.
I'm running inside a docker container, with average 150K-200K concurrent users, my average message rate is between 30K-50K messages per sec, with average message size of 600 bytes.
Without the per-message-deflate the is no memory grow at all and performance is awesome, but the data transfer is very high.
Can anyone help me investigate it ?
Thank you.