Closed bill-torpey closed 7 years ago
I've submitted the pull request against OpenMAMA here: https://github.com/OpenMAMA/OpenMAMA/pull/312
Again, not sure if that's the best place for it. Although, even if qpid doesn't need it, it does no harm.
Yeah I think it's feasible enough - endpointpool is only really an example implementation anyway - people are free to use more thread unsafe approaches if they understand their threading model well enough. There are many issues which don't present themselves in qpid because qpid spends so much time blocking and waiting for messages. The issue was probably there in qpid too you were just much less likely to hit it.
The zmq transport references the collection of subscription endpoints in two threads: the IO thread reads from the zmq socket (in
zmqBridgeMamaTransportImpl_dispatchThread
, checks the endpoint pool to see if there are any subscribers for the message, and if so enqueues it once for each subscriber. The messages are dequeued inzmqBridgeMamaTransportImpl_queueCallback
, running on a different thread, which also queries the endpoint pool.However, application code running in the
zmqBridgeMamaTransportImpl_queueCallback
thread can also modify the endpoint pool, typically by deleting subscriptions. Mama subscriptions must be deleted from the thread that created them (by callingmamaSubscription_destroy
), or if that is not possible by callingmamaSubscription_destroyEx
, which queues the destroy on the callback thread, where it is destroyed bymamaSubscription_DestroyThroughQueueCB
.Additionally, subscriptions can be created on any thread, either directly or by calling functions that create subscriptions "under the covers" (like creating an inbox).
It would seem that the endpoint pool would need to be synchronized with a mutex, but it is not.
Also, for some reason, this synchronization does not appear to be necessary with the qpid transport (why?), but it is definitely required with zmq.
I have a pull request ready to go -- in my case I've made the change in the OpenMAMA code, so it would affect both qpid and zmq. However, if the synchronization is really not required for qpid (why?), then it might make more sense to make the change in zmq. In that case, it would be necessary to come up with a way to ensure that the correct (thread-safe) version of the endpoint pool code gets loaded at runtime for zmq (and any other transports that require it).
Please let me know your thoughts.