From 4e365b019a9feb3150ac8de089014e719f08c9c4 Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Wed, 10 Sep 2025 15:56:30 -0400 Subject: [PATCH 1/6] ci: Use -Wthread-safety not -Wthread-safety-analysis Use -Wthread-safety not -Wthread-safety-analysis in llvm and sanitize jobs. -Wthread-safety is a more general flag which adds additional checks outside the core thread safety analysis. Credit to hodlinator for noticing the bitcoin core build being stricter about thread safety checks than the libmultiprocess build here: https://github.com/bitcoin-core/libmultiprocess/pull/201#discussion_r2336632302 --- ci/configs/llvm.bash | 2 +- ci/configs/sanitize.bash | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ci/configs/llvm.bash b/ci/configs/llvm.bash index afa957e..ba34cbf 100644 --- a/ci/configs/llvm.bash +++ b/ci/configs/llvm.bash @@ -2,7 +2,7 @@ CI_DESC="CI job using LLVM-based libraries and tools (clang, libc++, clang-tidy, CI_DIR=build-llvm NIX_ARGS=(--arg enableLibcxx true) export CXX=clang++ -export CXXFLAGS="-Werror -Wall -Wextra -Wpedantic -Wthread-safety-analysis -Wno-unused-parameter" +export CXXFLAGS="-Werror -Wall -Wextra -Wpedantic -Wthread-safety -Wno-unused-parameter" CMAKE_ARGS=( -G Ninja -DMP_ENABLE_CLANG_TIDY=ON diff --git a/ci/configs/sanitize.bash b/ci/configs/sanitize.bash index ce920f4..a3e8e5b 100644 --- a/ci/configs/sanitize.bash +++ b/ci/configs/sanitize.bash @@ -1,7 +1,7 @@ CI_DESC="CI job running ThreadSanitizer" CI_DIR=build-sanitize export CXX=clang++ -export CXXFLAGS="-ggdb -Werror -Wall -Wextra -Wpedantic -Wthread-safety-analysis -Wno-unused-parameter -fsanitize=thread" +export CXXFLAGS="-ggdb -Werror -Wall -Wextra -Wpedantic -Wthread-safety -Wno-unused-parameter -fsanitize=thread" CMAKE_ARGS=() BUILD_ARGS=(-k -j4) BUILD_TARGETS=(mptest) From d60db601ed9badcebab54a860741f365f8219735 Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Wed, 10 Sep 2025 16:18:42 -0400 Subject: [PATCH 2/6] proxy-io.h: add Waiter::m_mutex thread safety annotations This is just a refactoring, no behavior is changing. This is only adding annotations and switching from unannotated to annotated types. In SetThread, the first two parameters are combined into a single GuardedRef parameter to avoid -Wthread-safety-reference warnings at call sites like "warning: passing variable 'callback_threads' by reference requires holding mutex 'thread_context.waiter->m_mutex'" --- include/mp/proxy-io.h | 16 ++++++++-------- include/mp/proxy-types.h | 8 ++++---- include/mp/type-context.h | 8 ++++---- include/mp/util.h | 11 +++++++++++ src/mp/proxy.cpp | 21 ++++++++++----------- 5 files changed, 37 insertions(+), 27 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 367a9be..4c9d519 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -275,16 +275,16 @@ struct Waiter template void post(Fn&& fn) { - const std::unique_lock lock(m_mutex); + const Lock lock(m_mutex); assert(!m_fn); m_fn = std::forward(fn); m_cv.notify_all(); } template - void wait(std::unique_lock& lock, Predicate pred) + void wait(Lock& lock, Predicate pred) { - m_cv.wait(lock, [&] { + m_cv.wait(lock.m_lock, [&]() MP_REQUIRES(m_mutex) { // Important for this to be "while (m_fn)", not "if (m_fn)" to avoid // a lost-wakeup bug. A new m_fn and m_cv notification might be sent // after the fn() call and before the lock.lock() call in this loop @@ -307,9 +307,9 @@ struct Waiter //! mutexes than necessary. This mutex can be held at the same time as //! EventLoop::m_mutex as long as Waiter::mutex is locked first and //! EventLoop::m_mutex is locked second. - std::mutex m_mutex; + Mutex m_mutex; std::condition_variable m_cv; - std::optional> m_fn; + std::optional> m_fn MP_GUARDED_BY(m_mutex); }; //! Object holding network & rpc state associated with either an incoming server @@ -540,7 +540,7 @@ using ConnThread = ConnThreads::iterator; // Retrieve ProxyClient object associated with this connection from a // map, or create a new one and insert it into the map. Return map iterator and // inserted bool. -std::tuple SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function& make_thread); +std::tuple SetThread(GuardedRef threads, Connection* connection, const std::function& make_thread); struct ThreadContext { @@ -556,7 +556,7 @@ struct ThreadContext //! `callbackThread` argument it passes in the request, used by the server //! in case it needs to make callbacks into the client that need to execute //! while the client is waiting. This will be set to a local thread object. - ConnThreads callback_threads; + ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex); //! When client is making a request to a server, this is the `thread` //! argument it passes in the request, used to control which thread on @@ -565,7 +565,7 @@ struct ThreadContext //! by makeThread. If a client call is being made from a thread currently //! handling a server request, this will be set to the `callbackThread` //! request thread argument passed in that request. - ConnThreads request_threads; + ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex); //! Whether this thread is a capnp event loop thread. Not really used except //! to assert false if there's an attempt to execute a blocking operation diff --git a/include/mp/proxy-types.h b/include/mp/proxy-types.h index de96d13..0d30621 100644 --- a/include/mp/proxy-types.h +++ b/include/mp/proxy-types.h @@ -617,7 +617,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel const char* disconnected = nullptr; proxy_client.m_context.loop->sync([&]() { if (!proxy_client.m_context.connection) { - const std::unique_lock lock(thread_context.waiter->m_mutex); + const Lock lock(thread_context.waiter->m_mutex); done = true; disconnected = "IPC client method called after disconnect."; thread_context.waiter->m_cv.notify_all(); @@ -644,7 +644,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel } catch (...) { exception = std::current_exception(); } - const std::unique_lock lock(thread_context.waiter->m_mutex); + const Lock lock(thread_context.waiter->m_mutex); done = true; thread_context.waiter->m_cv.notify_all(); }, @@ -656,13 +656,13 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel proxy_client.m_context.loop->logPlain() << "{" << thread_context.thread_name << "} IPC client exception " << kj_exception; } - const std::unique_lock lock(thread_context.waiter->m_mutex); + const Lock lock(thread_context.waiter->m_mutex); done = true; thread_context.waiter->m_cv.notify_all(); })); }); - std::unique_lock lock(thread_context.waiter->m_mutex); + Lock lock(thread_context.waiter->m_mutex); thread_context.waiter->wait(lock, [&done]() { return done; }); if (exception) std::rethrow_exception(exception); if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception; diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 894daad..ecf17ca 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -25,7 +25,7 @@ void CustomBuildField(TypeList<>, // Also store the Thread::Client reference in the callback_threads map so // future calls over this connection can reuse it. auto [callback_thread, _]{SetThread( - thread_context.callback_threads, thread_context.waiter->m_mutex, &connection, + GuardedRef{thread_context.waiter->m_mutex, thread_context.callback_threads}, &connection, [&] { return connection.m_threads.add(kj::heap>(thread_context, std::thread{})); })}; // Call remote ThreadMap.makeThread function so server will create a @@ -43,7 +43,7 @@ void CustomBuildField(TypeList<>, return request.send().getResult(); // Nonblocking due to capnp request pipelining. }}; auto [request_thread, _1]{SetThread( - thread_context.request_threads, thread_context.waiter->m_mutex, + GuardedRef{thread_context.waiter->m_mutex, thread_context.request_threads}, &connection, make_request_thread)}; auto context = output.init(); @@ -90,7 +90,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& auto& thread_context = g_thread_context; auto& request_threads = thread_context.request_threads; auto [request_thread, inserted]{SetThread( - request_threads, thread_context.waiter->m_mutex, + GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection, [&] { return context_arg.getCallbackThread(); })}; @@ -101,7 +101,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // makes another IPC call), so avoid modifying the map. const bool erase_thread{inserted}; KJ_DEFER(if (erase_thread) { - std::unique_lock lock(thread_context.waiter->m_mutex); + Lock lock(thread_context.waiter->m_mutex); // Call erase here with a Connection* argument instead // of an iterator argument, because the `request_thread` // iterator may be invalid if the connection is closed diff --git a/include/mp/util.h b/include/mp/util.h index d9f3ca3..a7f24ed 100644 --- a/include/mp/util.h +++ b/include/mp/util.h @@ -182,6 +182,17 @@ class MP_SCOPED_CAPABILITY Lock { std::unique_lock m_lock; }; +template +struct GuardedRef +{ + Mutex& mutex; + T& ref MP_GUARDED_BY(mutex); +}; + +// CTAD for Clang 16: GuardedRef{mutex, x} -> GuardedRef +template +GuardedRef(Mutex&, U&) -> GuardedRef; + //! Analog to std::lock_guard that unlocks instead of locks. template struct UnlockGuard diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index c9fecf5..45ec349 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -25,7 +25,6 @@ #include #include #include -#include #include #include #include @@ -305,15 +304,15 @@ bool EventLoop::done() const return m_num_clients == 0 && m_async_fns->empty(); } -std::tuple SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function& make_thread) +std::tuple SetThread(GuardedRef threads, Connection* connection, const std::function& make_thread) { - const std::unique_lock lock(mutex); - auto thread = threads.find(connection); - if (thread != threads.end()) return {thread, false}; - thread = threads.emplace( + const Lock lock(threads.mutex); + auto thread = threads.ref.find(connection); + if (thread != threads.ref.end()) return {thread, false}; + thread = threads.ref.emplace( std::piecewise_construct, std::forward_as_tuple(connection), std::forward_as_tuple(make_thread(), connection, /* destroy_connection= */ false)).first; - thread->second.setDisconnectCallback([&threads, &mutex, thread] { + thread->second.setDisconnectCallback([threads, thread] { // Note: it is safe to use the `thread` iterator in this cleanup // function, because the iterator would only be invalid if the map entry // was removed, and if the map entry is removed the ProxyClient @@ -323,9 +322,9 @@ std::tuple SetThread(ConnThreads& threads, std::mutex& mutex, // thread client m_disconnect_cb member so thread client destructor does not // try to unregister this callback after connection is destroyed. // Remove connection pointer about to be destroyed from the map - const std::unique_lock lock(mutex); + const Lock lock(threads.mutex); thread->second.m_disconnect_cb.reset(); - threads.erase(thread); + threads.ref.erase(thread); }); return {thread, true}; } @@ -364,7 +363,7 @@ ProxyServer::~ProxyServer() assert(m_thread_context.waiter.get()); std::unique_ptr waiter; { - const std::unique_lock lock(m_thread_context.waiter->m_mutex); + const Lock lock(m_thread_context.waiter->m_mutex); //! Reset thread context waiter pointer, as shutdown signal for done //! lambda passed as waiter->wait() argument in makeThread code below. waiter = std::move(m_thread_context.waiter); @@ -398,7 +397,7 @@ kj::Promise ProxyServer::makeThread(MakeThreadContext context) g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")"; g_thread_context.waiter = std::make_unique(); thread_context.set_value(&g_thread_context); - std::unique_lock lock(g_thread_context.waiter->m_mutex); + Lock lock(g_thread_context.waiter->m_mutex); // Wait for shutdown signal from ProxyServer destructor (signal // is just waiter getting set to null.) g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; }); From 9b079911355720626dacd28d80ec6efee05077a4 Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Wed, 10 Sep 2025 16:24:31 -0400 Subject: [PATCH 3/6] doc: describe ThreadContext struct and synchronization requirements --- include/mp/proxy-io.h | 50 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 47 insertions(+), 3 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 4c9d519..b0c523d 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -534,6 +534,10 @@ void ProxyServerBase::invokeDestroy() CleanupRun(m_context.cleanup_fns); } +//! Map from Connection to local or remote thread handle which will be used over +//! that connection. This map will typically only contain one entry, but can +//! contain multiple if a single thread makes IPC calls over multiple +//! connections. using ConnThreads = std::map>; using ConnThread = ConnThreads::iterator; @@ -542,20 +546,58 @@ using ConnThread = ConnThreads::iterator; // inserted bool. std::tuple SetThread(GuardedRef threads, Connection* connection, const std::function& make_thread); +//! The thread_local ThreadContext g_thread_context struct provides information +//! about individual threads and a way of communicating between them. Because +//! it's a thread local struct, each ThreadContext instance is initialized by +//! the thread that owns it. +//! +//! ThreadContext is used for any client threads created externally which make +//! IPC calls, and for server threads created by +//! ProxyServer::makeThread() which execute IPC calls for clients. +//! +//! In both cases, the struct holds information like the thread name, and a +//! Waiter object where the EventLoop can post incoming IPC requests to execute +//! on the thread. The struct also holds ConnThread maps associating the thread +//! with local and remote ProxyClient objects. struct ThreadContext { //! Identifying string for debug. std::string thread_name; - //! Waiter object used to allow client threads blocked waiting for a server - //! response to execute callbacks made from the client's corresponding - //! server thread. + //! Waiter object used to allow remote clients to execute code on this + //! thread. For server threads created by + //! ProxyServer::makeThread(), this is initialized in that + //! function. Otherwise, for client threads created externally, this is + //! initialized the first time the thread tries to make an IPC call. Having + //! a waiter is necessary for threads making IPC calls in case a server they + //! are calling expects them to execute a callback during the call, before + //! it sends a response. + //! + //! For IPC client threads, the Waiter pointer is never cleared and the Waiter + //! just gets destroyed when the thread does. For server threads created by + //! makeThread(), this pointer is set to null in the ~ProxyServer as + //! a signal for the thread to exit and destroy itself. In both cases, the + //! same Waiter object is used across different calls and only created and + //! destroyed once for the lifetime of the thread. std::unique_ptr waiter = nullptr; //! When client is making a request to a server, this is the //! `callbackThread` argument it passes in the request, used by the server //! in case it needs to make callbacks into the client that need to execute //! while the client is waiting. This will be set to a local thread object. + //! + //! Synchronization note: The callback_thread and request_thread maps are + //! only ever accessed internally by this thread's destructor and externally + //! by Cap'n Proto event loop threads. Since it's possible for IPC client + //! threads to make calls over different connections that could have + //! different event loops, these maps are guarded by Waiter::m_mutex in case + //! different event loop threads add or remove map entries simultaneously. + //! However, individual ProxyClient objects in the maps will only be + //! associated with one event loop and guarded by EventLoop::m_mutex. So + //! Waiter::m_mutex does not need to be held while accessing individual + //! ProxyClient instances, and may even need to be released to + //! respect lock order and avoid locking Waiter::m_mutex before + //! EventLoop::m_mutex. ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex); //! When client is making a request to a server, this is the `thread` @@ -565,6 +607,8 @@ struct ThreadContext //! by makeThread. If a client call is being made from a thread currently //! handling a server request, this will be set to the `callbackThread` //! request thread argument passed in that request. + //! + //! Synchronization note: \ref callback_threads note applies here as well. ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex); //! Whether this thread is a capnp event loop thread. Not really used except From ca9b380ea91a5989e2e3e5af02e9a9dde130e899 Mon Sep 17 00:00:00 2001 From: Sjors Provoost Date: Wed, 10 Sep 2025 11:50:12 +0200 Subject: [PATCH 4/6] Use std::optional in ConnThreads to allow shortening locks This is just a refactoring changing the ConnThreads data type. The optional value is not actually left in an unset state until the next commit. --- include/mp/proxy-io.h | 6 ++++-- include/mp/type-context.h | 4 ++-- src/mp/proxy.cpp | 8 +++++--- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index b0c523d..ee68ac7 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -537,8 +537,10 @@ void ProxyServerBase::invokeDestroy() //! Map from Connection to local or remote thread handle which will be used over //! that connection. This map will typically only contain one entry, but can //! contain multiple if a single thread makes IPC calls over multiple -//! connections. -using ConnThreads = std::map>; +//! connections. A std::optional value type is used to avoid the map needing to +//! be locked while ProxyClient objects are constructed, see +//! ThreadContext "Synchronization note" below. +using ConnThreads = std::map>>; using ConnThread = ConnThreads::iterator; // Retrieve ProxyClient object associated with this connection from a diff --git a/include/mp/type-context.h b/include/mp/type-context.h index ecf17ca..82ec052 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -47,8 +47,8 @@ void CustomBuildField(TypeList<>, &connection, make_request_thread)}; auto context = output.init(); - context.setThread(request_thread->second.m_client); - context.setCallbackThread(callback_thread->second.m_client); + context.setThread(request_thread->second->m_client); + context.setCallbackThread(callback_thread->second->m_client); } //! PassField override for mp.Context arguments. Return asynchronously and call diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 45ec349..8393496 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -311,8 +311,10 @@ std::tuple SetThread(GuardedRef threads, Connecti if (thread != threads.ref.end()) return {thread, false}; thread = threads.ref.emplace( std::piecewise_construct, std::forward_as_tuple(connection), - std::forward_as_tuple(make_thread(), connection, /* destroy_connection= */ false)).first; - thread->second.setDisconnectCallback([threads, thread] { + std::forward_as_tuple() + ).first; + thread->second.emplace(make_thread(), connection, /* destroy_connection= */ false); + thread->second->setDisconnectCallback([threads, thread] { // Note: it is safe to use the `thread` iterator in this cleanup // function, because the iterator would only be invalid if the map entry // was removed, and if the map entry is removed the ProxyClient @@ -323,7 +325,7 @@ std::tuple SetThread(GuardedRef threads, Connecti // try to unregister this callback after connection is destroyed. // Remove connection pointer about to be destroyed from the map const Lock lock(threads.mutex); - thread->second.m_disconnect_cb.reset(); + thread->second->m_disconnect_cb.reset(); threads.ref.erase(thread); }); return {thread, true}; From 85df96482c49ea9fc5416a7f018d60e909d1180b Mon Sep 17 00:00:00 2001 From: Sjors Provoost Date: Wed, 10 Sep 2025 13:46:32 +0200 Subject: [PATCH 5/6] Use try_emplace in SetThread instead of threads.find This commit easiest to review ignoring whitespace (git diff -w). This is a minor change in behavior, but the only change is shortening the duration that threads.mutex is locked while inserting a new entry in the threads.ref map. The lock is now only held while the entry is created and is released while the ProxyClient object is initialized. This change doesn't really fix any problems but it simplifies the next commit which deals with race conditions and deadlocks in this code, so it has been split out. --- src/mp/proxy.cpp | 45 +++++++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 8393496..989f992 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -306,29 +306,30 @@ bool EventLoop::done() const std::tuple SetThread(GuardedRef threads, Connection* connection, const std::function& make_thread) { - const Lock lock(threads.mutex); - auto thread = threads.ref.find(connection); - if (thread != threads.ref.end()) return {thread, false}; - thread = threads.ref.emplace( - std::piecewise_construct, std::forward_as_tuple(connection), - std::forward_as_tuple() - ).first; - thread->second.emplace(make_thread(), connection, /* destroy_connection= */ false); - thread->second->setDisconnectCallback([threads, thread] { - // Note: it is safe to use the `thread` iterator in this cleanup - // function, because the iterator would only be invalid if the map entry - // was removed, and if the map entry is removed the ProxyClient - // destructor unregisters the cleanup. - - // Connection is being destroyed before thread client is, so reset - // thread client m_disconnect_cb member so thread client destructor does not - // try to unregister this callback after connection is destroyed. - // Remove connection pointer about to be destroyed from the map + ConnThread thread; + bool inserted; + { const Lock lock(threads.mutex); - thread->second->m_disconnect_cb.reset(); - threads.ref.erase(thread); - }); - return {thread, true}; + std::tie(thread, inserted) = threads.ref.try_emplace(connection); + } + if (inserted) { + thread->second.emplace(make_thread(), connection, /* destroy_connection= */ false); + thread->second->setDisconnectCallback([threads, thread] { + // Note: it is safe to use the `thread` iterator in this cleanup + // function, because the iterator would only be invalid if the map entry + // was removed, and if the map entry is removed the ProxyClient + // destructor unregisters the cleanup. + + // Connection is being destroyed before thread client is, so reset + // thread client m_disconnect_cb member so thread client destructor does not + // try to unregister this callback after connection is destroyed. + // Remove connection pointer about to be destroyed from the map + const Lock lock(threads.mutex); + thread->second->m_disconnect_cb.reset(); + threads.ref.erase(thread); + }); + } + return {thread, inserted}; } ProxyClient::~ProxyClient() From 4a269b21b8c8c2c7138df82c8dc4103a388e339b Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Wed, 10 Sep 2025 16:38:42 -0400 Subject: [PATCH 6/6] bug: fix ProxyClient deadlock if disconnected as IPC call is returning This bug is currently causing mptest "disconnecting and blocking" test to occasionally hang as reported by maflcko in https://github.com/bitcoin/bitcoin/issues/33244. The bug was actually first reported by Sjors in https://github.com/Sjors/bitcoin/pull/90#issuecomment-3024942087 and there are more details about it in https://github.com/bitcoin-core/libmultiprocess/issues/189. The bug is caused by the "disconnecting and blocking" test triggering a disconnect right before a server IPC call returns. This results in a race between the IPC server thread and the onDisconnect handler in the event loop thread both trying to destroy the server's request_threads ProxyClient object when the IPC call is done. There was a lack of synchronization in this case, fixed here by adding loop->sync() a few places. Specifically the fixes were to: - Always call SetThread on the event loop thread using the loop->sync() method, to prevent a race between the ProxyClient creation code and the connection shutdown code if there was an ill-timed disconnect. - Similarly in ~ProxyClient and thread-context.h PassField(), use loop->sync() when destroying the thread object, in case a disconnect happens at that time. A few other changes were made in this commit to make the resulting code safer and simpler, even though they are not technically necessary for the fix: - In thread-context.h PassField(), Waiter::m_mutex is now unlocked while destroying ProxyClient just to respect EventLoop::m_mutex and Waiter::m_mutex lock order and never lock the Waiter first. This is just for consistency. There is no actually possibility for a deadlock here due to the new sync() call. - This adds asserts to make sure functions expected to run on the event loop thread are only called on that thread. - This inlines the ProxyClient::setDisconnectCallback function, just because it was a short function only called in a single place. --- include/mp/proxy-io.h | 2 -- include/mp/type-context.h | 42 ++++++++++++++++++++++++--------------- src/mp/proxy.cpp | 32 +++++++++++++++++++---------- 3 files changed, 48 insertions(+), 28 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index ee68ac7..728f76e 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -66,8 +66,6 @@ struct ProxyClient : public ProxyClientBase ProxyClient(const ProxyClient&) = delete; ~ProxyClient(); - void setDisconnectCallback(const std::function& fn); - //! Reference to callback function that is run if there is a sudden //! disconnect and the Connection object is destroyed before this //! ProxyClient object. The callback will destroy this object and diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 82ec052..078e1ce 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -89,29 +89,39 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // need to update the map. auto& thread_context = g_thread_context; auto& request_threads = thread_context.request_threads; - auto [request_thread, inserted]{SetThread( - GuardedRef{thread_context.waiter->m_mutex, request_threads}, - server.m_context.connection, - [&] { return context_arg.getCallbackThread(); })}; + ConnThread request_thread; + bool inserted; + server.m_context.loop->sync([&] { + std::tie(request_thread, inserted) = SetThread( + GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection, + [&] { return context_arg.getCallbackThread(); }); + }); - // If an entry was inserted into the requests_threads map, + // If an entry was inserted into the request_threads map, // remove it after calling fn.invoke. If an entry was not // inserted, one already existed, meaning this must be a // recursive call (IPC call calling back to the caller which // makes another IPC call), so avoid modifying the map. const bool erase_thread{inserted}; KJ_DEFER(if (erase_thread) { - Lock lock(thread_context.waiter->m_mutex); - // Call erase here with a Connection* argument instead - // of an iterator argument, because the `request_thread` - // iterator may be invalid if the connection is closed - // during this function call. More specifically, the - // iterator may be invalid because SetThread adds a - // cleanup callback to the Connection destructor that - // erases the thread from the map, and also because the - // ProxyServer destructor calls - // request_threads.clear(). - request_threads.erase(server.m_context.connection); + // Erase the request_threads entry on the event loop + // thread with loop->sync(), so if the connection is + // broken there is not a race between this thread and + // the disconnect handler trying to destroy the thread + // client object. + server.m_context.loop->sync([&] { + // Look up the thread again without using existing + // iterator since entry may no longer be there after + // a disconnect. Destroy node after releasing + // Waiter::m_mutex, so the ProxyClient + // destructor is able to use EventLoop::mutex + // without violating lock order. + ConnThreads::node_type removed; + { + Lock lock(thread_context.waiter->m_mutex); + removed = request_threads.extract(server.m_context.connection); + } + }); }); fn.invoke(server_context, args...); } diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 989f992..d1e1c33 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -12,6 +12,7 @@ #include #include +#include // IWYU pragma: keep #include #include #include @@ -80,6 +81,11 @@ ProxyContext::ProxyContext(Connection* connection) : connection(connection), loo Connection::~Connection() { + // Connection destructor is always called on the event loop thread. If this + // is a local disconnect, it will trigger I/O, so this needs to run on the + // event loop thread, and if there was a remote disconnect, this is called + // by an onDisconnect callback directly from the event loop thread. + assert(std::this_thread::get_id() == m_loop->m_thread_id); // Shut down RPC system first, since this will garbage collect any // ProxyServer objects that were not freed before the connection was closed. // Typically all ProxyServer objects associated with this connection will be @@ -155,6 +161,9 @@ CleanupIt Connection::addSyncCleanup(std::function fn) void Connection::removeSyncCleanup(CleanupIt it) { + // Require cleanup functions to be removed on the event loop thread to avoid + // needing to deal with them being removed in the middle of a disconnect. + assert(std::this_thread::get_id() == m_loop->m_thread_id); const Lock lock(m_loop->m_mutex); m_sync_cleanup_fns.erase(it); } @@ -306,6 +315,7 @@ bool EventLoop::done() const std::tuple SetThread(GuardedRef threads, Connection* connection, const std::function& make_thread) { + assert(std::this_thread::get_id() == connection->m_loop->m_thread_id); ConnThread thread; bool inserted; { @@ -314,7 +324,7 @@ std::tuple SetThread(GuardedRef threads, Connecti } if (inserted) { thread->second.emplace(make_thread(), connection, /* destroy_connection= */ false); - thread->second->setDisconnectCallback([threads, thread] { + thread->second->m_disconnect_cb = connection->addSyncCleanup([threads, thread] { // Note: it is safe to use the `thread` iterator in this cleanup // function, because the iterator would only be invalid if the map entry // was removed, and if the map entry is removed the ProxyClient @@ -323,9 +333,10 @@ std::tuple SetThread(GuardedRef threads, Connecti // Connection is being destroyed before thread client is, so reset // thread client m_disconnect_cb member so thread client destructor does not // try to unregister this callback after connection is destroyed. + thread->second->m_disconnect_cb.reset(); + // Remove connection pointer about to be destroyed from the map const Lock lock(threads.mutex); - thread->second->m_disconnect_cb.reset(); threads.ref.erase(thread); }); } @@ -338,17 +349,18 @@ ProxyClient::~ProxyClient() // cleanup callback that was registered to handle the connection being // destroyed before the thread being destroyed. if (m_disconnect_cb) { - m_context.connection->removeSyncCleanup(*m_disconnect_cb); + // Remove disconnect callback on the event loop thread with + // loop->sync(), so if the connection is broken there is not a race + // between this thread trying to remove the callback and the disconnect + // handler attempting to call it. + m_context.loop->sync([&]() { + if (m_disconnect_cb) { + m_context.connection->removeSyncCleanup(*m_disconnect_cb); + } + }); } } -void ProxyClient::setDisconnectCallback(const std::function& fn) -{ - assert(fn); - assert(!m_disconnect_cb); - m_disconnect_cb = m_context.connection->addSyncCleanup(fn); -} - ProxyServer::ProxyServer(ThreadContext& thread_context, std::thread&& thread) : m_thread_context(thread_context), m_thread(std::move(thread)) {