Closed huangwei1024 closed 5 years ago
and the nsq web show the message number was 40,837, less than 100,000. its lost some msgs because the conn closed.
if the problem was normally, how could I keep the msg be pub safely? resend it when ProducerTransaction was error?
It looks like you're filling up the producer network socket's send buffer, which results in back pressure on writes, and ends up in a timeout and disconnect. To nsqd
, this appears as a client that disconnected.
Looking at your benchmark code I think this might be happening because doneCh
, with its small buffer size equal to the number of CPU cores, is creating an artificial choke point where go-nsq tries to send ProducerTransaction
responses to the caller.
If there is indeed an issue, it would likely be in go-nsq
, so I'm going to move this issue to that repo.
p.PublishAsync("test", body, doneCh)
From the docs on PublishAsync
"When the Producer eventually receives the response from nsqd
doneChan
will receive a ProducerTransaction
instance"
So what you are seeing is that as soon as the first message is published, you hit wg.Done()
and your script ends. You should use Publish
and call wg.Done
after you finish for i := 0; i < b.N/parallel; i++ {... }
I'm not sure that's the case? The benchmark code sets wg.Add(b.N)
up front (it's not accumulating wait group additions as each publish happens).
(I should probably just run the code but I'm being lazy)
You might be right; I don't have any issues running this bench code
$ go test -bench=.
2019/02/26 19:16:08 INF 1 (127.0.0.1:4150) connecting to nsqd
goos: linux
goarch: amd64
BenchmarkProducer-2 100000 21220 ns/op
--- BENCH: BenchmarkProducer-2
bench_test.go:20: 2
PASS
ok _/tmp/go-nsq_247 2.130s
and from nsqd
[nsqd] 2019/02/26 19:16:08.046354 INFO: TCP: new client(127.0.0.1:45114)
[nsqd] 2019/02/26 19:16:08.046383 INFO: CLIENT(127.0.0.1:45114): desired protocol magic ' V2'
[nsqd] 2019/02/26 19:16:08.047072 INFO: [127.0.0.1:45114] IDENTIFY: {ClientID:dev Hostname:dev HeartbeatInterval:30000 OutputBufferSize:16384 OutputBufferTimeout:250 FeatureNegotiation:true TLSv1:false Deflate:false DeflateLevel:6 Snappy:false SampleRate:0 UserAgent:go-nsq/1.0.7.alpha+build.14.a53d495e MsgTimeout:0}
[nsqd] 2019/02/26 19:16:08.051929 INFO: TOPIC(test): created
[nsqd] 2019/02/26 19:16:08.052022 INFO: CI: querying nsqlookupd http://127.0.0.1:4161/channels?topic=test
[nsqd] 2019/02/26 19:16:08.053628 INFO: NSQ: persisting topic/channel metadata to /data/nsqd/nsqd.dat
[nsqd] 2019/02/26 19:16:08.064685 INFO: LOOKUPD(127.0.0.1:4160): topic REGISTER test
[nsqd] 2019/02/26 19:16:08.143023 INFO: DISKQUEUE(test): writeOne() opened /data/nsqd/test.diskqueue.000000.dat
[nsqd] 2019/02/26 19:16:08.143110 INFO: DISKQUEUE(test): readOne() opened /data/nsqd/test.diskqueue.000000.dat
[nsqd] 2019/02/26 19:16:10.142770 INFO: PROTOCOL(V2): [127.0.0.1:45114] exiting ioloop
[nsqd] 2019/02/26 19:16:10.142892 INFO: PROTOCOL(V2): [127.0.0.1:45114] exiting messagePump
I found a way to figure out the large benchmark. I edit the conn.go, and enlarge the recv and write buffer, all msg be done.
// Connect dials and bootstraps the nsqd connection
// (including IDENTIFY) and returns the IdentifyResponse
func (c *Conn) Connect() (*IdentifyResponse, error) {
dialer := &net.Dialer{
LocalAddr: c.config.LocalAddr,
Timeout: c.config.DialTimeout,
}
conn, err := dialer.Dial("tcp", c.addr)
if err != nil {
return nil, err
}
c.conn = conn.(*net.TCPConn)
c.r = conn
c.w = conn
c.conn.SetReadBuffer(512*1024*1024)
c.conn.SetWriteBuffer(512*1024*1024)
2019/02/27 15:17:33 INF 1 (192.168.1.125:4150) connecting to nsqd
goos: windows
goarch: amd64
BenchmarkProducer-8 100000 13710 ns/op 262 B/op 9 allocs/op
--- BENCH: BenchmarkProducer-8
t_test.go:23: 8
PASS
so I thought support some buff size option in Config
like OutputBufferSize
are better.
It looks like you're filling up the producer network socket's send buffer, which results in back pressure on writes, and ends up in a timeout and disconnect. To
nsqd
, this appears as a client that disconnected.Looking at your benchmark code I think this might be happening because
doneCh
, with its small buffer size equal to the number of CPU cores, is creating an artificial choke point where go-nsq tries to sendProducerTransaction
responses to the caller.If there is indeed an issue, it would likely be in
go-nsq
, so I'm going to move this issue to that repo.
yes, buffer was problem, but the doneCh
was not.
the io error always be raised even if I make a big buffered chan.
doneCh := make(chan *nsq.ProducerTransaction, b.N)
2019/02/27 15:31:56 INF 1 (192.168.1.125:4150) connecting to nsqd
2019/02/27 15:32:29 ERR 1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:16540->192.168.1.125:4150: wsasend: An existing connection was forcibly closed by the remote host.
I confused why the IO more quicker than CPU?
I don't think you would want to try to set it "high enough" that you "never get errors" from PublishAsync() ... I think it would be more robust to just Publish() synchronously (from some number of coroutines, perhaps num cores or 20 or something appropriate), or to detect the error on the done chan and retry PublishAsync()
. In realistic usage you would check for the error in order to retry or at least log it.
You might be right; I don't have any issues running this bench code
$ go test -bench=. 2019/02/26 19:16:08 INF 1 (127.0.0.1:4150) connecting to nsqd goos: linux goarch: amd64 BenchmarkProducer-2 100000 21220 ns/op --- BENCH: BenchmarkProducer-2 bench_test.go:20: 2 PASS ok _/tmp/go-nsq_247 2.130s
and from nsqd
[nsqd] 2019/02/26 19:16:08.046354 INFO: TCP: new client(127.0.0.1:45114) [nsqd] 2019/02/26 19:16:08.046383 INFO: CLIENT(127.0.0.1:45114): desired protocol magic ' V2' [nsqd] 2019/02/26 19:16:08.047072 INFO: [127.0.0.1:45114] IDENTIFY: {ClientID:dev Hostname:dev HeartbeatInterval:30000 OutputBufferSize:16384 OutputBufferTimeout:250 FeatureNegotiation:true TLSv1:false Deflate:false DeflateLevel:6 Snappy:false SampleRate:0 UserAgent:go-nsq/1.0.7.alpha+build.14.a53d495e MsgTimeout:0} [nsqd] 2019/02/26 19:16:08.051929 INFO: TOPIC(test): created [nsqd] 2019/02/26 19:16:08.052022 INFO: CI: querying nsqlookupd http://127.0.0.1:4161/channels?topic=test [nsqd] 2019/02/26 19:16:08.053628 INFO: NSQ: persisting topic/channel metadata to /data/nsqd/nsqd.dat [nsqd] 2019/02/26 19:16:08.064685 INFO: LOOKUPD(127.0.0.1:4160): topic REGISTER test [nsqd] 2019/02/26 19:16:08.143023 INFO: DISKQUEUE(test): writeOne() opened /data/nsqd/test.diskqueue.000000.dat [nsqd] 2019/02/26 19:16:08.143110 INFO: DISKQUEUE(test): readOne() opened /data/nsqd/test.diskqueue.000000.dat [nsqd] 2019/02/26 19:16:10.142770 INFO: PROTOCOL(V2): [127.0.0.1:45114] exiting ioloop [nsqd] 2019/02/26 19:16:10.142892 INFO: PROTOCOL(V2): [127.0.0.1:45114] exiting messagePump
may be your only had 2 cpu core and no enlarge the nsqd mem queue size.
--mem-queue-size=100000
I don't think you would want to try to set it "high enough" that you "never get errors" from PublishAsync() ... I think it would be more robust to just Publish() synchronously (from some number of coroutines, perhaps num cores or 20 or something appropriate), or to detect the error on the done chan and retry
PublishAsync()
. In realistic usage you would check for the error in order to retry or at least log it.
tks for your suggestion. yes, I known its not robust when I never check error. I only want to see the nsq QPS and cost per op at first, and I set a big number to test, but always failed. so I confused the problem at server-side or client-side in my benchmark.
Yeah, understandable. It is client-side in this case. PublishAsync()
won't block when the socket is blocked nor when the write buffer is full, so it does not provide back-pressure to slow down to the rate the network can handle, on its own. It's good for some cases, but for your purposes I would use synchronous Publish()
(concurrently from N goroutines).
func NewProducer(addr string, config *Config) (*Producer, error) {
config.assertInitialized()
err := config.Validate()
if err != nil {
return nil, err
}
p := &Producer{
id: atomic.AddInt64(&instCount, 1),
addr: addr,
config: *config,
logger: log.New(os.Stderr, "", log.Flags()),
logLvl: LogLevelInfo,
transactionChan: make(chan *ProducerTransaction),
exitChan: make(chan int),
responseChan: make(chan []byte),
errorChan: make(chan []byte),
}
return p, nil
}
the transactionChan
, responseChan
are unbuffered chan. there are may be choke point?
and the Producer.router
run in one goroutine. if my machine had many core and the LAN speed was quick, the balance of between req and resp will be break.
for example, I had an Producer
and use PublishXXX
and MultiPublishXXX
in many goroutines, but the response handle always in only one goroutine with unbuffered chan, the socket buffer be filled full easily, its right?
I google some web about my problem, and saw an solution like connection pool for enhance Producer
throughput
I did some benchmark works for nsq, and encounter some strange problem. I pub 100000 msgs to nsq by go-nsq, but always failed. and I check small count (like 10000 msgs) was ok. the go-nsq log show write to nsqd timeout, and nsqd log tell me the producer's connection be reset. what problem in such situation?
go-nsq log:
nsqd log:
go benchmark code:
nsqd config: