Closed songweijia closed 3 years ago
Looking at the code, another option would be to increment and remember the id earlier in the code block, then drop the lock and await the future, then send the reply using the id that was generated earlier.
Yes, as long as no other thread grabs the lock and make changes to the P2PConnectionManager::p2p_connections
that might invalidate a later call on the P2PConnectionManager::update_incoming_seq_num()
. @sagarjha, what do you think?
Need to look at it in more detail. I will let you know soon, Weijia.
Seems like the issue arises because a node delivered the message containing the RPC stub and replied to the sender even before the sender could deliver the message locally.
Yes. You are right. And that is because a new external connection request arrived after a node sends an ordered send (in the sender thread) but before it could deliver the ordered send (in the predicate thread.) The new connection request occupied the predicate thread. Meanwhile, the remote thread had finished processing the RPC and replied to the sender node. That's why we might need to release the p2p_connections_mutex
lock for p2p_message_handler()
so that the predicate thread can continue with its work.
Hi Weijia,
I think your fix will work if no thread removes any p2p connections while p2p_message_handler(...)
is running without a lock. Otherwise, it is possible that the p2p_message_handler()
tries to access memory that is simultaneously released by another thread that cleans up the associated connection.
There are functions in rpc_manager that do remove connections on a view change (also not sure how external connections are cleaned up automatically), so I am worried that the fix is potentially harmful. What if a view change occurs that removes the node from which we just received a reply from? I know that if the message was delivered on the remote node that failed, the current node will definitely deliver the message during view cleanup. Still the entire pipeline is not totally clear to me or fresh in my memory.
If it is possible that we can avoid handling a p2p reply for an rpc request until that request is also processed locally, we would have an easy fix. Although that might complicate the code. What do you think?
That's what I worried about. And that's why I labeled the workaround as 'unsafe' and put it in a separate branch for my performance experiment. For the suggestion, I'm wondering what you plan to do to provide that guarantee because the current code already waits until the request is processed locally. The point is it waits with the lock on the p2p_connections_mutex
.
Another possible fix is to recheck if the entry corresponding to the reply is still available and valid after p2p_message_handler
regain the lock. Testing whether that entry exists or not is simple, but we need to make sure if that entry is the original one---a view change with a new node replacing the node sending the reply is unlikely but theoretically possible. Can we detect that? if we can, we only update the sequence numbers when it's valid.
I wonder if we can just restructure this logic as follows:
1) The current code would grab a mutex and add a record to a "pending requests queue" for this RPC. 2) Release all locks. 3) Later when the future is satisfied, reacquire the mutex, find the pending requests queue entry, send the reply (filling in the needed fields from the saved info plus the current counter value), and then delete the queue entry and release the mutex.
I support this logic. My workaround only introduced 2. My previous suggestion is for step 3: besides finding the entry by node id, we need to validate it is the original one because of possible view change.
I have a suggestion that is arguably simpler and will not require much code changes if we can get the logic right and have the right information available at the right time and place. The P2P polling thread probes all channels for P2P requests, P2P replies and RPC replies for each node and returns a buffer that contains the next request/reply we need to handle in p2p_message_handler
. If using header space for these P2P stubs, we can figure whether an RPC reply's original request has not been delivered locally and as a result, not consider it available to be processed, nothing else will need to be changed. This will require some special-casing for RPC replies only. We will need to figure out the subgroup id and index from the RPC reply and have access to the SST delivered_num
counter to make this work.
Important information that anyone reading this needs to know is that there are separate channels for P2P requests, P2P replies and RPC replies for each node, but not for each subgroup. Thus, not processing the head of the RPC reply channel will not block the processing of other types of requests. And any subsequent RPC reply will not be blocked either because that can't be processed too.
I discussed this problem with Weijia, since I wrote most of the RPCManager code, and there is another possible solution: use finer-grained locking on the P2P connections object. Right now, p2p_connections_mutex
is a single big lock that controls all access to the P2PConnectionManager, and the deadlock seems to result from the fact that one thread is holding the lock in order to use one of the P2P connections (to send a reply) while another thread tries to take the lock in order to add a new connection. These actions shouldn't be mutually exclusive -- we only need mutual exclusion to prevent a connection from being removed while another thread is still using it.
The reason I originally used a single big lock for the P2P connections is that they are stored in a std::map
indexed by node ID, and std::map
is not thread-safe even for concurrent actions on disjoint keys. However, if we stored the P2P connections in a std::vector
instead, where the vector index was assumed to be the node ID, we could use one mutex per P2P connection (i.e. one mutex per vector entry), and concurrent access to two different vector entries would be safe. The downside is that we would need to leave empty entries in the vector for unused node IDs (if node IDs are not contiguous), and we would need to reserve space in the vector for all possible node IDs (so that increasing its size doesn't trigger a copy), but since the P2P connections are stored by unique_ptr
this won't be too expensive - each vector entry is just the size of a std::unique_ptr
and a std::mutex
.
I like this proposal because from the description, it could really be a minimal software change and hence less likely to introduce new bugs than some of the other options. Moreover, I really don't see us having hundreds of concurrently active RPC requests in a single server. So with a relatively small vector, we should be able to solve the whole issue and the code would be minimally perturbed.
If it is discovered that large discontinuities exist or preallocation of space isn't feasible, then there are always plenty of concurrent-safe C++ hashmaps to choose from. Consider for example https://github.com/preshing/junction or https://github.com/efficient/libcuckoo
The concurrent-safe C++ hashmaps are very cool! I think this is a nice solution. I haven't check its performance yet. But it should be very interesting to do so.
A simple vector might also be viable. Test shows that sizeof(std::mutex)
is 40 bytes. With an 8-byte point to the connection object, each entry is 48 bytes. If we enforce the node id to be less than 1024, the total size of the vector is only 48KB.
However, it's up to Edward to decide which data structure is best for this scenario.
At 48kb max size I seriously doubt there's any reason to use one of the fancy maps. Good point!
For anyone subscribed to this issue by e-mail only, I implemented the fine-grained-locking solution in the branch fix_p2p_mutex, and created a pull request for it.
A non-deterministic deadlock might happen when a new external client connecting to a shard member which is doing ordered sends. As the new external client connects to the shard member, the shard member's "new_socket" predicate will be triggered. Its predicate thread will call the "new_socket" trigger function (
ViewManager::register_predicates()
) to handle it. Unfortunately, the predicate thread will get stuck on acquiringp2p_connections_mutex
, which protects the p2p connection sets. Why? because the p2p_receive_loop (RPCManager::p2p_receive_loop()
) is holding the lock and waiting inp2p_message_handler()
for the promise of a pending results.However,
p2p_message_handler()
happens to handle an RPC reply from another shard member, responding to a previous RPC call sent by this node. So it needs to fill the results into a promise of the pending result. However, the pending result is initialized in the delivery trigger in the predicate thread. For more details, please see the code here. However, due to perfect timing, the predicate thread is handling the incoming connection request from the new external client, waiting on thep2p_connections_mutex
, as explained above.An ideal deadlock.
I'm wondering if when can release the lock in
p2p_message_handler()
earlier inp2p_receive_loop()
? Specifically, before this line. I noticed that Sagar needs to update the incoming sequence number one line after that, which might need the lock onp2p_connections_mutex
. How about let's reacquire the lock afterp2p_message_handler()
finishes and do some validation before we update the incoming sequence number? I tested that and it of course fixed the deadlock. Sagar and Edward might have more ideas on the implications.