cascadium / OpenMAMA-zmq

OpenMAMA ZeroMQ Bridge
MIT License
17 stars 8 forks source link

race condition in subscription deletion? #17

Closed WallStProg closed 3 years ago

WallStProg commented 7 years ago

It looks like there are some problems synchronizing subscription deletion with incoming messages for the subscription.

In a simple test program I started getting SEGV's, which resolved down to this stack trace:

#0  0x00007f9c9c5d37ce in zmqBridgeMamaTransportImpl_queueCallback (queue=0xce8d80, closure=0x110de00) at /home/bt/work/OpenMAMA-zmq/master/src/transport.c:914
        status = MAMA_STATUS_OK
        tmpMsg = 0x0
        bridgeMsg = 0x0
        pool = 0x0
        node = 0x110de00
        tmsg = 0x110de40
        bufferSize = 115
        buffer = 0x110de60
        subject = 0x110de60 "_INBOX.centos65vm.08882"
        subscription = 0xcf8be0
        impl = 0x0
        queueImpl = 0x0
#1  0x00007f9c9ca68499 in wombatQueue_dispatchInt (queue=0xce8f00, data=0x0, closure=0x0, isTimed=1 '\001', timout=500) at common/c_cpp/src/c/queue.c:319
        impl = 0xce8f00
        head = 0xce90d0
        cb = 0x7f9c9c5d3726 <zmqBridgeMamaTransportImpl_queueCallback>
        closure_ = 0x110de00
        data_ = 0xce8d80
#2  0x00007f9c9ca6851a in wombatQueue_timedDispatch (queue=0xce8f00, data=0x0, closure=0x0, timeout=500) at common/c_cpp/src/c/queue.c:335
No locals.
#3  0x00007f9c9c5d0ff8 in zmqBridgeMamaQueue_dispatch (queue=0xce8e60) at /home/bt/work/OpenMAMA-zmq/master/src/queue.c:261
        status = WOMBAT_QUEUE_OK
        impl = 0xce8e60
#4  0x00007f9c9ca45641 in mamaQueue_dispatch (queue=0xce8d80) at mama/c_cpp/src/c/queue.c:824
        impl = 0xce8d80
        status = MAMA_STATUS_OK
#5  0x00007f9c9ca46138 in dispatchThreadProc (closure=0xce9c20) at mama/c_cpp/src/c/queue.c:1294
        impl = 0xce9c20
#6  0x000000394c007aa1 in start_thread () from /lib64/libpthread.so.0
No symbol table info available.
#7  0x000000394b8e8aad in clone () from /lib64/libc.so.6
No symbol table info available.
(gdb) p *impl
Cannot access memory at address 0x0
(gdb) p *subscription
$1 = {mMamaCallback = {onCreate = 0x7f9c9d448410 <vtable for Transact::MamaMessageImpl+16>, onError = 0x4bb8e200, onMsg = 0x111dbd0, onQuality = 0x0, onGap = 0x1, onRecapRequest = 0x0, 
    onDestroy = 0x7f9c00000000}, mMamaSubscription = 0x0, mMamaQueue = 0xcf8c10, mZmqQueue = 0xcf8c10, mTransport = 0x0, mSymbol = 0x0, mSubjectKey = 0x0, mClosure = 0x0, mIsNotMuted = 0, 
  mIsValid = 1, mIsTportDisconnected = 0, mMsg = 0x8b, mEndpointIdentifier = 0x1120b60 "\220\304", <incomplete sequence \313>}

Note that impl is NULL, but that subscription appears well-formed, and its mIsNotMuted is set to 0, indicating that the subscription has been destroyed on another thread.

So, I tried changing the order of the code to check whether the subscription is muted before calling endpointPool_isRegistedByContent, but that just made the crash more intermittent, by reducing the size of the window. (I was hoping that subscription deletion was somehow synchronized with incoming subscription messages, and that the original crash was simply a bug in the order in which conditions were checked, but that appears not to be the case).

So. it looks to be a race condition between subscription deletion and processing incoming messages for the subscription.

Any hints or tips on how to fix it (or where to start looking) would be greatly appreciated!

fquinner commented 7 years ago

How is the subscription being destroyed? Note that mamaSubscription_destroy must be called from the subscription's thread. If you want to destroy from other threads you should use mamaSubscription_destroyEx.

WallStProg commented 7 years ago

By subscription's thread, do you mean the callback thread, or the thread on which it was created?

fquinner commented 7 years ago

I mean the thread associated with the queue that you pass to the subscription as an argument when you create it (which is also the subscription callback thread).

WallStProg commented 7 years ago

Well, the subscription is actually being created in zmqBridgeMamaInbox_createByIndex -- what I'm trying to do is come up with a fix for #16, using a single topic for all inboxes in a process (_INBOX.$host.$pid), and then delegating replies based on the actual inbox name from wUuid_generate_time.

It's been a bit of a slog up to this point, since reply addresses are apparently not considered part of the message per se, but I think I've got that part worked out.

I will give destroyEx a go, and while I'm at it, it seems that the call to mamaSubscription_deallocate should really be relocated to the zmqBridgeMamaInboxImpl_onDestroy callback -- do you agree?

Thanks for the help!

WallStProg commented 7 years ago

Well, changing to destroyEx and moving the deallocate (and free'ing of impl) to onDestroy has not resolved the problem. Still getting crashes in the same place:

(gdb) bt full
#0  0x00007f7140592708 in zmqBridgeMamaTransportImpl_queueCallback (queue=0x7f71280008c0, closure=0x7f713037d560)
    at /home/btorpey/work/OpenMAMA-zmq/master/src/transport.c:925
        status = MAMA_STATUS_OK
        tmpMsg = 0x0
        bridgeMsg = 0x0
        pool = 0x0
        node = 0x7f713037d560
        tmsg = 0x7f713037d590
        bufferSize = 99
        buffer = 0x7f713037d5b0
        subject = 0x7f713037d5b0 "_INBOX.bt.29307"
        subscription = 0xa4d2c0
        impl = 0x20
        queueImpl = 0x0
#1  0x00007f7140a26499 in wombatQueue_dispatchInt (queue=0x7f7128000a50, data=0x0, closure=0x0, isTimed=1 '\001', timout=500) at common/c_cpp/src/c/queue.c:319
        impl = 0x7f7128000a50
        head = 0x7f7130413a20
        cb = 0x7f7140592632 <zmqBridgeMamaTransportImpl_queueCallback>
        closure_ = 0x7f713037d560
        data_ = 0x7f71280008c0
#2  0x00007f7140a2651a in wombatQueue_timedDispatch (queue=0x7f7128000a50, data=0x0, closure=0x0, timeout=500) at common/c_cpp/src/c/queue.c:335
No locals.
#3  0x00007f714058ff04 in zmqBridgeMamaQueue_dispatch (queue=0x7f71280009b0) at /home/btorpey/work/OpenMAMA-zmq/master/src/queue.c:261
        status = WOMBAT_QUEUE_OK
        impl = 0x7f71280009b0
#4  0x00007f7140a03641 in mamaQueue_dispatch (queue=0x7f71280008c0) at mama/c_cpp/src/c/queue.c:824
        impl = 0x7f71280008c0
        status = MAMA_STATUS_OK
#5  0x00007f7140a04138 in dispatchThreadProc (closure=0x7f7128001760) at mama/c_cpp/src/c/queue.c:1294
        impl = 0x7f7128001760
#6  0x00000039dc807aa1 in start_thread () from /lib64/libpthread.so.0
No symbol table info available.
#7  0x00000039dc4e8aad in clone () from /lib64/libc.so.6
No symbol table info available.

I see that the description of onDestroy says:

the client can have confidence that no further messages will be placed on the queue for this subscription

But that doesn't address messages that are already queued for the subscription, which I suspect may be the root cause of the problem.

BTW, forget what I said earlier about reply addresses -- for some reason it was looking like they were not being deserialized into the impl, but that was a red herring.

fquinner commented 7 years ago

Did this go away with the thread safe endpointpool or is it still at large?

WallStProg commented 7 years ago

Making endpoint_pool thread-safe seems to have helped -- at least, I haven't seen this problem since. (No guarantee, obviously, but a good sign).

I notice that the zmq implementation doesn't appear to unsubscribe when a subscription is destroyed -- can I assume that this is deliberate?

I know that I've had plenty of crashes when un-subscribing, which makes sense given that zmq pub/sub sockets are not thread-safe. However, so far I haven't seen any issues caused by subscribing from a different thread than the dispatch thread. I'm curious if you have any thoughts on multi-threaded acess to zmq sockets other than what is in the docs (i.e., don't do it)?

If taken literally, the zmq docs would seem to require that subscribes and unsubscribes all be done on the dispatch thread (the thread that call zmq_msg_recv). That would imply that those operations should be enqueued, but that's a fair amount of work if it's not necessary. (I also don't see any obvious way to enqueue an event at the front of the queue, which could be handy for subscribe and unsubscribe requests).

Any thoughts would be appreciated.