Skip to content

Commit b6321bb

Browse files
committed
bug: fix ProxyClient<Thread> deadlock if disconnected as IPC call is returning
This bug is currently causing mptest "disconnecting and blocking" test to occasionally hang as reported by maflcko in bitcoin/bitcoin#33244. The bug was actually first reported by Sjors in Sjors/bitcoin#90 (comment) and there are more details about it in #189. The bug is caused by the "disconnecting and blocking" test triggering a disconnect right before a server IPC call returns. This results in a race between the IPC server thread and the onDisconnect handler in the event loop thread both trying to destroy the server's request_threads ProxyClient<Thread> object when the IPC call is done. There was a lack of synchronization in this case, fixed here by adding loop->sync() a few places. Specifically the fixes were to: - Always call SetThread on the event loop thread using the loop->sync() method, to prevent a race between the ProxyClient<Thread> creation code and the connection shutdown code if there was an ill-timed disconnect. - Similarly in ~ProxyClient<Thread> and thread-context.h PassField(), use loop->sync() when destroying the thread object, in case a disconnect happens at that time. A few other changes were made in this commit to make the resulting code safer and simpler, even though they are not technically necessary for the fix: - In thread-context.h PassField(), Waiter::m_mutex is now unlocked while destroying ProxyClient<Thread> just to respect EventLoop::m_mutex and Waiter::m_mutex lock order and never lock the Waiter first. This is just for consistency. There is no actually possibility for a deadlock here due to the new sync() call. - This adds asserts to make sure functions expected to run on the event loop thread are only called on that thread. - This inlines the ProxyClient<Thread>::setDisconnectCallback function, just because it was a short function only called in a single place.
1 parent d66c21a commit b6321bb

File tree

3 files changed

+47
-27
lines changed

3 files changed

+47
-27
lines changed

include/mp/proxy-io.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@ struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
6666
ProxyClient(const ProxyClient&) = delete;
6767
~ProxyClient();
6868

69-
void setDisconnectCallback(const std::function<void()>& fn);
70-
7169
//! Reference to callback function that is run if there is a sudden
7270
//! disconnect and the Connection object is destroyed before this
7371
//! ProxyClient<Thread> object. The callback will destroy this object and

include/mp/type-context.h

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,13 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
8989
// need to update the map.
9090
auto& thread_context = g_thread_context;
9191
auto& request_threads = thread_context.request_threads;
92-
auto [request_thread, inserted]{SetThread(
93-
GuardedRef{thread_context.waiter->m_mutex, request_threads},
94-
server.m_context.connection,
95-
[&] { return context_arg.getCallbackThread(); })};
92+
ConnThread request_thread;
93+
bool inserted;
94+
server.m_context.loop->sync([&] {
95+
std::tie(request_thread, inserted) = SetThread(
96+
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
97+
[&] { return context_arg.getCallbackThread(); });
98+
});
9699

97100
// If an entry was inserted into the requests_threads map,
98101
// remove it after calling fn.invoke. If an entry was not
@@ -101,17 +104,24 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
101104
// makes another IPC call), so avoid modifying the map.
102105
const bool erase_thread{inserted};
103106
KJ_DEFER(if (erase_thread) {
104-
Lock lock(thread_context.waiter->m_mutex);
105-
// Call erase here with a Connection* argument instead
106-
// of an iterator argument, because the `request_thread`
107-
// iterator may be invalid if the connection is closed
108-
// during this function call. More specifically, the
109-
// iterator may be invalid because SetThread adds a
110-
// cleanup callback to the Connection destructor that
111-
// erases the thread from the map, and also because the
112-
// ProxyServer<Thread> destructor calls
113-
// request_threads.clear().
114-
request_threads.erase(server.m_context.connection);
107+
// Erase the request_threads entry on the event loop
108+
// thread with loop->sync(), so if the connection is
109+
// broken there is not a race between this thread and
110+
// the disconnect handler trying to destroy the thread
111+
// client object.
112+
server.m_context.loop->sync([&] {
113+
// Look up the thread again without using existing
114+
// iterator since entry may no longer be there after
115+
// a disconnect. Destroy node after releasing
116+
// Waiter::m_mutex, so the ProxyClient<Thread>
117+
// destructor is able to use EventLoop::mutex
118+
// without violating lock order.
119+
ConnThreads::node_type removed;
120+
{
121+
Lock lock(thread_context.waiter->m_mutex);
122+
removed = request_threads.extract(server.m_context.connection);
123+
}
124+
});
115125
});
116126
fn.invoke(server_context, args...);
117127
}

src/mp/proxy.cpp

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
#include <atomic>
1414
#include <capnp/capability.h>
15+
#include <capnp/common.h> // IWYU pragma: keep
1516
#include <capnp/rpc.h>
1617
#include <condition_variable>
1718
#include <functional>
@@ -80,6 +81,11 @@ ProxyContext::ProxyContext(Connection* connection) : connection(connection), loo
8081

8182
Connection::~Connection()
8283
{
84+
// Connection destructor is always called on the event loop thread. If this
85+
// is a local disconnect, it will trigger I/O, so this needs to run on the
86+
// event loop thread, and if there was a remote disconnect, this is called
87+
// by an onDisconnect callback directly from the event loop thread.
88+
assert(std::this_thread::get_id() == m_loop->m_thread_id);
8389
// Shut down RPC system first, since this will garbage collect any
8490
// ProxyServer objects that were not freed before the connection was closed.
8591
// Typically all ProxyServer objects associated with this connection will be
@@ -155,6 +161,9 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
155161

156162
void Connection::removeSyncCleanup(CleanupIt it)
157163
{
164+
// Require cleanup functions to be removed on the event loop thread to avoid
165+
// needing to deal with them being removed in the middle of a disconnect.
166+
assert(std::this_thread::get_id() == m_loop->m_thread_id);
158167
const Lock lock(m_loop->m_mutex);
159168
m_sync_cleanup_fns.erase(it);
160169
}
@@ -306,6 +315,7 @@ bool EventLoop::done() const
306315

307316
std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread)
308317
{
318+
assert(std::this_thread::get_id() == connection->m_loop->m_thread_id);
309319
ConnThread thread;
310320
bool inserted;
311321
{
@@ -314,7 +324,7 @@ std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connecti
314324
}
315325
if (inserted) {
316326
thread->second.emplace(make_thread(), connection, /* destroy_connection= */ false);
317-
thread->second->setDisconnectCallback([threads, thread] {
327+
thread->second->m_disconnect_cb = connection->addSyncCleanup([threads, thread] {
318328
// Note: it is safe to use the `thread` iterator in this cleanup
319329
// function, because the iterator would only be invalid if the map entry
320330
// was removed, and if the map entry is removed the ProxyClient<Thread>
@@ -323,9 +333,10 @@ std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connecti
323333
// Connection is being destroyed before thread client is, so reset
324334
// thread client m_disconnect_cb member so thread client destructor does not
325335
// try to unregister this callback after connection is destroyed.
336+
thread->second->m_disconnect_cb.reset();
337+
326338
// Remove connection pointer about to be destroyed from the map
327339
const Lock lock(threads.mutex);
328-
thread->second->m_disconnect_cb.reset();
329340
threads.ref.erase(thread);
330341
});
331342
}
@@ -338,17 +349,18 @@ ProxyClient<Thread>::~ProxyClient()
338349
// cleanup callback that was registered to handle the connection being
339350
// destroyed before the thread being destroyed.
340351
if (m_disconnect_cb) {
341-
m_context.connection->removeSyncCleanup(*m_disconnect_cb);
352+
// Remove disconnect callback on the event loop thread with
353+
// loop->sync(), so if the connection is broken there is not a race
354+
// between this thread trying to remove the callback and the disconnect
355+
// handler attempting to call it.
356+
m_context.loop->sync([&]() {
357+
if (m_disconnect_cb) {
358+
m_context.connection->removeSyncCleanup(*m_disconnect_cb);
359+
}
360+
});
342361
}
343362
}
344363

345-
void ProxyClient<Thread>::setDisconnectCallback(const std::function<void()>& fn)
346-
{
347-
assert(fn);
348-
assert(!m_disconnect_cb);
349-
m_disconnect_cb = m_context.connection->addSyncCleanup(fn);
350-
}
351-
352364
ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
353365
: m_thread_context(thread_context), m_thread(std::move(thread))
354366
{

0 commit comments

Comments
 (0)