Closed pricelessrabbit closed 4 years ago
@PricelessRabbit can you point me where exactly you put the sleep()
and it worked
you say that route.Messages
gets overwritten?
@PricelessRabbit can you point me where exactly you put the
sleep()
and it worked you say thatroute.Messages
gets overwritten?
@mteodor
In this point there is a for loop that use local var r
https://github.com/mainflux/export/blob/cdbc6f22b1890ae32da86eeccbe962b17ac741de/pkg/export/service.go#L116-L126
but the r.Consume
create a pointer reference to r
.
https://github.com/mainflux/export/blob/cdbc6f22b1890ae32da86eeccbe962b17ac741de/pkg/export/route.go#L86-L88
so then when the loop in the main goroutine continues, it updates r fields with the next route data. But the worker has a pointer to r and not a copy, so all the r fields are updated also in the worker goroutine (also the channel r.Messages
).
However, seems (but i'm not shure of that) that if the channel is already filled when the for msg := range
line is executed (so there is no "wait" in the first iteration, that the range "keeps" an internal reference of the channel, also if then the channel reference changes, so in some cases (tried with a sleep) the channel is filled before the initialization of the worker and things work as expected also in the bugged implementation
i think that change should be like
https://github.com/mainflux/export/blob/master/pkg/export/service.go#L42
here map[string]*Route
and here
func NewRoute(rc config.Route, log logger.Logger, pub messages.Publisher) *Route {
w := rc.Workers
if w == 0 {
w = workers
}
r := Route{
NatsTopic: rc.NatsTopic + "." + NatsAll,
MqttTopic: rc.MqttTopic,
Subtopic: rc.SubTopic,
Type: rc.Type,
Workers: w,
Messages: make(chan *nats.Msg, w),
logger: log,
pub: pub,
}
return &r
}
i think that change should be like https://github.com/mainflux/export/blob/master/pkg/export/service.go#L42 here
map[string]*Route
and herefunc NewRoute(rc config.Route, log logger.Logger, pub messages.Publisher) *Route { w := rc.Workers if w == 0 { w = workers } r := Route{ NatsTopic: rc.NatsTopic + "." + NatsAll, MqttTopic: rc.MqttTopic, Subtopic: rc.SubTopic, Type: rc.Type, Workers: w, Messages: make(chan *nats.Msg, w), logger: log, pub: pub, } return &r }
this morning i evaluate and try also in that way. It works, but multiple workers goroutines gets the same route reference, and share the state. This is not a good thing imho because if the route struct change in some manner, all the workers will be affected and this can lead to unexpected behaviours.
that is the idea, there should be only one instance of each route, and it should not change during the runtime, workers should only process messages and read the route info to know where to send
yep it is for that very reason that i think that provide every worker with an immutable value-copy of the route is the most solid solution to avoid possible accidentally modification of data, but if you are ok with the shared state i refactor the fix in that way and update the PR
@PricelessRabbit yes, please do so, and I'll rethink this what you pointed out and I want to thank you so much
Fixed #29 : reference type cause the route struct channel to be overwritten by the goroutine that subscribe the routes. The
for := range
channel reference is changed if not already used and consumer got stuckChanged the Consume func receiver to value receiver. In this way each worker has owns a copy of route data
Signed-off-by: PricelessRabbit PricelessRabbit@gmail.com