Skip to content

Commit 2b05ed7

Browse files
committed
bug: fix ProxyClient<Thread> deadlock if disconnected as IPC call is returning
This bug is currently causing mptest "disconnecting and blocking" test to occasionally hang as reported by maflcko in bitcoin/bitcoin#33244. The bug was actually first reported by Sjors in Sjors/bitcoin#90 (comment) and there are more details about it in #189. The bug is caused by the "disconnecting and blocking" test triggering a disconnect right before a server IPC call returns. This results in a race between the IPC server thread and the onDisconnect handler in the event loop thread both trying to destroy the server's request_threads ProxyClient<Thread> object when the IPC call is done. There was a lack of synchronization in this case, fixed here by adding loop->sync() various places. There were also lock order issues where Waiter::m_mutex could be incorrectly locked before EventLoop::m_mutex resulting in a deadlock.
1 parent 27a7203 commit 2b05ed7

File tree

3 files changed

+77
-53
lines changed

3 files changed

+77
-53
lines changed

include/mp/proxy-io.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,15 @@ struct ServerInvokeContext : InvokeContext
5858
template <typename Interface, typename Params, typename Results>
5959
using ServerContext = ServerInvokeContext<ProxyServer<Interface>, ::capnp::CallContext<Params, Results>>;
6060

61+
//! Map from Connection to local or remote thread handle which will be used over
62+
//! that connection. This map will typically only contain one entry, but can
63+
//! contain multiple if a single thread makes IPC calls over multiple
64+
//! connections. A std::optional value type is used to avoid the map needing to
65+
//! be locked while ProxyClient<Thread> objects are constructred, see
66+
//! ThreadContext "Synchronization note" below.
67+
using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
68+
using ConnThread = ConnThreads::iterator;
69+
6170
template <>
6271
struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
6372
{
@@ -66,8 +75,6 @@ struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
6675
ProxyClient(const ProxyClient&) = delete;
6776
~ProxyClient();
6877

69-
void setDisconnectCallback(const std::function<void()>& fn);
70-
7178
//! Reference to callback function that is run if there is a sudden
7279
//! disconnect and the Connection object is destroyed before this
7380
//! ProxyClient<Thread> object. The callback will destroy this object and
@@ -534,9 +541,6 @@ void ProxyServerBase<Interface, Impl>::invokeDestroy()
534541
CleanupRun(m_context.cleanup_fns);
535542
}
536543

537-
using ConnThreads = std::map<Connection*, ProxyClient<Thread>>;
538-
using ConnThread = ConnThreads::iterator;
539-
540544
// Retrieve ProxyClient<Thread> object associated with this connection from a
541545
// map, or create a new one and insert it into the map. Return map iterator and
542546
// inserted bool.

include/mp/type-context.h

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ void CustomBuildField(TypeList<>,
4747
&connection, make_request_thread)};
4848

4949
auto context = output.init();
50-
context.setThread(request_thread->second.m_client);
51-
context.setCallbackThread(callback_thread->second.m_client);
50+
context.setThread(request_thread->second->m_client);
51+
context.setCallbackThread(callback_thread->second->m_client);
5252
}
5353

5454
//! PassField override for mp.Context arguments. Return asynchronously and call
@@ -89,10 +89,13 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
8989
// need to update the map.
9090
auto& thread_context = g_thread_context;
9191
auto& request_threads = thread_context.request_threads;
92-
auto [request_thread, inserted]{SetThread(
93-
request_threads, thread_context.waiter->m_mutex,
94-
server.m_context.connection,
95-
[&] { return context_arg.getCallbackThread(); })};
92+
ConnThread request_thread;
93+
bool inserted;
94+
server.m_context.loop->sync([&] {
95+
std::tie(request_thread, inserted) = SetThread(
96+
request_threads, thread_context.waiter->m_mutex, server.m_context.connection,
97+
[&] { return context_arg.getCallbackThread(); });
98+
});
9699

97100
// If an entry was inserted into the requests_threads map,
98101
// remove it after calling fn.invoke. If an entry was not
@@ -101,17 +104,27 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
101104
// makes another IPC call), so avoid modifying the map.
102105
const bool erase_thread{inserted};
103106
KJ_DEFER(if (erase_thread) {
104-
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
105-
// Call erase here with a Connection* argument instead
106-
// of an iterator argument, because the `request_thread`
107-
// iterator may be invalid if the connection is closed
108-
// during this function call. More specifically, the
109-
// iterator may be invalid because SetThread adds a
110-
// cleanup callback to the Connection destructor that
111-
// erases the thread from the map, and also because the
112-
// ProxyServer<Thread> destructor calls
113-
// request_threads.clear().
114-
request_threads.erase(server.m_context.connection);
107+
// Erase the request_threads entry on the event loop
108+
// thread with loop->sync(), so if the connection is
109+
// broken there is not a race between this thread and
110+
// the disconnect handler trying to destroy the thread
111+
// client object.
112+
server.m_context.loop->sync([&] {
113+
// Look up the thread again since it may no longer
114+
// be there after a disconnect.
115+
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
116+
ConnThread thread = request_threads.find(server.m_context.connection);
117+
if (thread != request_threads.end()) {
118+
// Release Waiter::m_mutex while calling the
119+
// ProxyClient<Thread> destructor so it can use
120+
// EventLoop::m_mutex.
121+
lock.unlock();
122+
thread->second.reset();
123+
// Reacquire Waiter::m_mutex to update the request_threads map.
124+
lock.lock();
125+
request_threads.erase(thread);
126+
}
127+
});
115128
});
116129
fn.invoke(server_context, args...);
117130
}

src/mp/proxy.cpp

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,9 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
156156

157157
void Connection::removeSyncCleanup(CleanupIt it)
158158
{
159+
// Require cleanup functions to be removed to avoid needing to deal with
160+
// them being removed in the middle of a disconnect.
161+
assert(std::this_thread::get_id() == m_loop->m_thread_id);
159162
const Lock lock(m_loop->m_mutex);
160163
m_sync_cleanup_fns.erase(it);
161164
}
@@ -307,46 +310,50 @@ bool EventLoop::done() const
307310

308311
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread)
309312
{
310-
const std::unique_lock<std::mutex> lock(mutex);
311-
auto thread = threads.find(connection);
312-
if (thread != threads.end()) return {thread, false};
313-
thread = threads.emplace(
314-
std::piecewise_construct, std::forward_as_tuple(connection),
315-
std::forward_as_tuple(make_thread(), connection, /* destroy_connection= */ false)).first;
316-
thread->second.setDisconnectCallback([&threads, &mutex, thread] {
317-
// Note: it is safe to use the `thread` iterator in this cleanup
318-
// function, because the iterator would only be invalid if the map entry
319-
// was removed, and if the map entry is removed the ProxyClient<Thread>
320-
// destructor unregisters the cleanup.
321-
322-
// Connection is being destroyed before thread client is, so reset
323-
// thread client m_disconnect_cb member so thread client destructor does not
324-
// try to unregister this callback after connection is destroyed.
325-
// Remove connection pointer about to be destroyed from the map
313+
assert(std::this_thread::get_id() == connection->m_loop->m_thread_id);
314+
ConnThread thread;
315+
bool inserted;
316+
{
326317
const std::unique_lock<std::mutex> lock(mutex);
327-
thread->second.m_disconnect_cb.reset();
328-
threads.erase(thread);
329-
});
330-
return {thread, true};
318+
std::tie(thread, inserted) = threads.try_emplace(connection);
319+
}
320+
if (inserted) {
321+
thread->second.emplace(make_thread(), connection, /* destroy_connection= */ false);
322+
thread->second->m_disconnect_cb = connection->addSyncCleanup([&threads, &mutex, thread] {
323+
// Note: it is safe to use the `thread` iterator in this cleanup
324+
// function, because the iterator would only be invalid if the map entry
325+
// was removed, and if the map entry is removed the ProxyClient<Thread>
326+
// destructor unregisters the cleanup.
327+
328+
// Connection is being destroyed before thread client is, so reset
329+
// thread client m_disconnect_cb member so thread client destructor does not
330+
// try to unregister this callback after connection is destroyed.
331+
thread->second->m_disconnect_cb.reset();
332+
333+
// Remove connection pointer about to be destroyed from the map
334+
const std::unique_lock<std::mutex> lock(mutex);
335+
threads.erase(thread);
336+
});
337+
}
338+
return {thread, inserted};
331339
}
332340

333341
ProxyClient<Thread>::~ProxyClient()
334342
{
335-
// If thread is being destroyed before connection is destroyed, remove the
336-
// cleanup callback that was registered to handle the connection being
337-
// destroyed before the thread being destroyed.
338343
if (m_disconnect_cb) {
339-
m_context.connection->removeSyncCleanup(*m_disconnect_cb);
344+
// Remove disconnect callback on the event loop thread with
345+
// loop->sync(), so if the connection is broken there is not a race
346+
// between this thread trying to remove the callback and the disconnect
347+
// handler attempting to call it.
348+
m_context.loop->sync([&]() {
349+
if (m_disconnect_cb) {
350+
m_context.connection->removeSyncCleanup(*m_disconnect_cb);
351+
m_disconnect_cb.reset();
352+
}
353+
});
340354
}
341355
}
342356

343-
void ProxyClient<Thread>::setDisconnectCallback(const std::function<void()>& fn)
344-
{
345-
assert(fn);
346-
assert(!m_disconnect_cb);
347-
m_disconnect_cb = m_context.connection->addSyncCleanup(fn);
348-
}
349-
350357
ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
351358
: m_thread_context(thread_context), m_thread(std::move(thread))
352359
{

0 commit comments

Comments
 (0)