Closed nodece closed 3 months ago
Hi @nodece ,
I think calling p.internalClose(cp)
is not a good idea to fix this bug, 'cause it breaks the thread/goroutine model of the SDK(the CSP model: DO NOT COMMUNICATE BY SHARING MEMORY; INSTEAD, SHARE MEMORY BY COMMUNICATING.). The purpose of the cmd channel is to isolate different goroutines, but now we just bypass it.
The root cause of this bug is that the event loop is stuck in an infinite loop in reconnectToBroker(), I think infinite loop in a case of select{case...} is not a good design, we can run the infinite loop in a seperate goroutine, like this:
case connectionClosed := <-p.connectClosedCh:
p.log.Info("runEventsLoop will reconnect in producer")
// Start a new goroutine for reconnecting to broker
go p.reconnectToBroker(connectionClosed)
when the producer is closed, it will quit at:
if p.getProducerState() != producerReady {
// Producer is already closing
p.log.Info("producer state not ready, exit reconnect")
return
}
Reconnect
and Close
are different things.
The reconnect request should block the send and flush requests. We can also use go p.reconnectToBroker(connection closed)
, but this doesn't make sense, as the connection is closed and the send/flush request cannot be sent.
The close request shouldn't be added to the event loop, otherwise, the close request will be blocked when reconnecting.
The reconnect request should bloc
OK, since The reconnect request should block the send and flush requests.
and The close request shouldn't be added to the event loop
, we need to make sure that the runEventsLoop
completely stopped after we call internalClose()
(to be precise, it should be after calling p.casProducerState(producerReady, producerClosing)), Or when the state is set to producerClosing, the reconnecting case of the event loop will return and other cases will begin to run, these case may have race condition with internalClose()
, which will lead to panic or some other bugs, it may not happen now, but it is hard to say that it won't happen in the future when we extend the code of event loop cases or the logic of internalClose().
@gunli Good catch, we should improve the event loop, once the producer is closed, the event loop should be stopped.
we need to make sure that the runEventsLoop completely stopped after we call internalClose()
@gunli +1
@gunli @RobertIndie The runEventsLoop
can be stopped, when the producer is closed.
Event:
reconnect request This method already checks the producer state: https://github.com/apache/pulsar-client-go/blob/v0.13.0/pulsar/producer_partition.go#L468
send and close request
Close methods will close the dataChan
and cmdChan
, the event loop can check this close status:
@gunli @RobertIndie The
runEventsLoop
can be stopped, when the producer is closed.Event:
- reconnect request This method already checks the producer state: https://github.com/apache/pulsar-client-go/blob/v0.13.0/pulsar/producer_partition.go#L468
send and close request Close methods will close the
dataChan
andcmdChan
, the event loop can check this close status:
But the behavior is unpredictable as they are run in different goroutines, after doClose()
update the state
to closing
, reconnecting()
will return immediately, but now dataChan
and cmdChan
are not closed yet, neither the batchFlushTicker, the event loop will coninue to run, it can still run into internalSend()
and internalFlushCurrentBatch()
.
Updated.
This PR may be related to issue #1190
Closes #1190
Motivation
When the producer keeps reconnecting to the broker in the event loop, at the same time, the user calls
producer.Close
, the producer cannot be closed, because the event loop is executing the reconnect request.Modifications
testcontainers
requires go 1.21.Verifying this change
Added test.