From 90d4a42a39fe0e6062e0a06ce784c582ac99cb3f Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Tue, 2 Sep 2025 15:35:30 -0400 Subject: [PATCH 1/3] doc: describe ThreadContext struct and synchronization requirements --- include/mp/proxy-io.h | 52 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 48 insertions(+), 4 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 367a9be..a9c9027 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -534,6 +534,10 @@ void ProxyServerBase::invokeDestroy() CleanupRun(m_context.cleanup_fns); } +//! Map from Connection to local or remote thread handle which will be used over +//! that connection. This map will typically only contain one entry, but can +//! contain multiple if a single thread makes IPC calls over multiple +//! connections. using ConnThreads = std::map>; using ConnThread = ConnThreads::iterator; @@ -542,21 +546,59 @@ using ConnThread = ConnThreads::iterator; // inserted bool. std::tuple SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function& make_thread); +//! The thread_local ThreadContext g_thread_context struct provides information +//! about individual threads and a way of communicating between them. Because +//! it's a thread local struct, each ThreadContext instance is initialized by +//! the thread that owns it. +//! +//! ThreadContext is used for any client threads created externally which make +//! IPC calls, and for server threads created by +//! ProxyServer::makeThread() which execute IPC calls for clients. +//! +//! In both cases, the struct holds information like the thread name, and a +//! Waiter object where the EventLoop can post incoming IPC requests to execute +//! on the thread. The struct also holds ConnThread maps associating the thread +//! with local and remote ProxyClient objects. struct ThreadContext { //! Identifying string for debug. std::string thread_name; - //! Waiter object used to allow client threads blocked waiting for a server - //! response to execute callbacks made from the client's corresponding - //! server thread. + //! Waiter object used to allow remote clients to execute code on this + //! thread. For server threads created by + //! ProxyServer::makeThread(), this is initialized in that + //! function. Otherwise, for client threads created externally, this is + //! initialized the first time the thread tries to make an IPC call. Having + //! a waiter is necessary for threads making IPC calls in case a server they + //! are calling expects them to execute a callback during the call, before + //! it sends a response. + //! + //! For IPC client threads, the Waiter pointer is never cleared and the Waiter + //! just gets destroyed when the thread does. For server threads created by + //! makeThread(), this pointer is set to null in the ~ProxyServer as + //! a signal for the thread to exit and destroy itself. In both cases, the + //! same Waiter object is used across different calls and only created and + //! destroyed once for the lifetime of the thread. std::unique_ptr waiter = nullptr; //! When client is making a request to a server, this is the //! `callbackThread` argument it passes in the request, used by the server //! in case it needs to make callbacks into the client that need to execute //! while the client is waiting. This will be set to a local thread object. - ConnThreads callback_threads; + //! + //! Synchronization note: The callback_thread and request_thread maps are + //! only ever accessed internally by this thread's destructor and externally + //! by Cap'n Proto event loop threads. Since it's possible for IPC client + //! threads to make calls over different connections that could have + //! different event loops, these maps are guarded by Waiter::m_mutex in case + //! different event loop threads add or remove map entries simultaneously. + //! However, individual ProxyClient objects in the maps will only be + //! associated with one event loop and guarded by EventLoop::m_mutex. So + //! Waiter::m_mutex does not need to be held while accessing individual + //! ProxyClient instances, and may even need to be released to + //! respect lock order and avoid locking Waiter::m_mutex before + //! EventLoop::m_mutex. + ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex); //! When client is making a request to a server, this is the `thread` //! argument it passes in the request, used to control which thread on @@ -565,6 +607,8 @@ struct ThreadContext //! by makeThread. If a client call is being made from a thread currently //! handling a server request, this will be set to the `callbackThread` //! request thread argument passed in that request. + //! + //! Synchronization note: \ref callback_threads note applies here as well. ConnThreads request_threads; //! Whether this thread is a capnp event loop thread. Not really used except From 7a8a99c50feca10be4e42a1cdb135864ce376f71 Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Tue, 2 Sep 2025 15:39:27 -0400 Subject: [PATCH 2/3] bug: fix ProxyClient deadlock if disconnected as IPC call is returning This bug is currently causing mptest "disconnecting and blocking" test to occasionally hang as reported by maflcko in https://github.com/bitcoin/bitcoin/issues/33244. The bug was actually first reported by Sjors in https://github.com/Sjors/bitcoin/pull/90#issuecomment-3024942087 and there are more details about it in https://github.com/bitcoin-core/libmultiprocess/issues/189. The bug is caused by the "disconnecting and blocking" test triggering a disconnect right before a server IPC call returns. This results in a race between the IPC server thread and the onDisconnect handler in the event loop thread both trying to destroy the server's request_threads ProxyClient object when the IPC call is done. There was a lack of synchronization in this case, fixed here by adding loop->sync() various places. There were also lock order issues where Waiter::m_mutex could be incorrectly locked before EventLoop::m_mutex resulting in a deadlock. --- include/mp/proxy-io.h | 8 ++--- include/mp/type-context.h | 44 ++++++++++++++---------- src/mp/proxy.cpp | 70 +++++++++++++++++++++++---------------- 3 files changed, 73 insertions(+), 49 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index a9c9027..8e02590 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -66,8 +66,6 @@ struct ProxyClient : public ProxyClientBase ProxyClient(const ProxyClient&) = delete; ~ProxyClient(); - void setDisconnectCallback(const std::function& fn); - //! Reference to callback function that is run if there is a sudden //! disconnect and the Connection object is destroyed before this //! ProxyClient object. The callback will destroy this object and @@ -537,8 +535,10 @@ void ProxyServerBase::invokeDestroy() //! Map from Connection to local or remote thread handle which will be used over //! that connection. This map will typically only contain one entry, but can //! contain multiple if a single thread makes IPC calls over multiple -//! connections. -using ConnThreads = std::map>; +//! connections. A std::optional value type is used to avoid the map needing to +//! be locked while ProxyClient objects are constructed, see +//! ThreadContext "Synchronization note" below. +using ConnThreads = std::map>>; using ConnThread = ConnThreads::iterator; // Retrieve ProxyClient object associated with this connection from a diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 894daad..ee77f17 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -47,8 +47,8 @@ void CustomBuildField(TypeList<>, &connection, make_request_thread)}; auto context = output.init(); - context.setThread(request_thread->second.m_client); - context.setCallbackThread(callback_thread->second.m_client); + context.setThread(request_thread->second->m_client); + context.setCallbackThread(callback_thread->second->m_client); } //! PassField override for mp.Context arguments. Return asynchronously and call @@ -89,10 +89,13 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // need to update the map. auto& thread_context = g_thread_context; auto& request_threads = thread_context.request_threads; - auto [request_thread, inserted]{SetThread( - request_threads, thread_context.waiter->m_mutex, - server.m_context.connection, - [&] { return context_arg.getCallbackThread(); })}; + ConnThread request_thread; + bool inserted; + server.m_context.loop->sync([&] { + std::tie(request_thread, inserted) = SetThread( + request_threads, thread_context.waiter->m_mutex, server.m_context.connection, + [&] { return context_arg.getCallbackThread(); }); + }); // If an entry was inserted into the requests_threads map, // remove it after calling fn.invoke. If an entry was not @@ -101,17 +104,24 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // makes another IPC call), so avoid modifying the map. const bool erase_thread{inserted}; KJ_DEFER(if (erase_thread) { - std::unique_lock lock(thread_context.waiter->m_mutex); - // Call erase here with a Connection* argument instead - // of an iterator argument, because the `request_thread` - // iterator may be invalid if the connection is closed - // during this function call. More specifically, the - // iterator may be invalid because SetThread adds a - // cleanup callback to the Connection destructor that - // erases the thread from the map, and also because the - // ProxyServer destructor calls - // request_threads.clear(). - request_threads.erase(server.m_context.connection); + // Erase the request_threads entry on the event loop + // thread with loop->sync(), so if the connection is + // broken there is not a race between this thread and + // the disconnect handler trying to destroy the thread + // client object. + server.m_context.loop->sync([&] { + // Look up the thread again without using existing + // iterator since entry may no longer be there after + // a disconnect. Destroy node after releasing + // Waiter::m_mutex, so the ProxyClient + // destructor is able to use EventLoop::mutex + // without violating lock order. + ConnThreads::node_type removed; + { + std::unique_lock lock(thread_context.waiter->m_mutex); + removed = request_threads.extract(server.m_context.connection); + } + }); }); fn.invoke(server_context, args...); } diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index c9fecf5..6b13682 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -81,6 +81,11 @@ ProxyContext::ProxyContext(Connection* connection) : connection(connection), loo Connection::~Connection() { + // Connection destructor is always called on the event loop thread. If this + // is a local disconnect, it will trigger I/O, so this needs to run on the + // event loop thread, and if there was a remote disconnect, this is called + // by an onDisconnect callback directly from the event loop thread. + assert(std::this_thread::get_id() == m_loop->m_thread_id); // Shut down RPC system first, since this will garbage collect any // ProxyServer objects that were not freed before the connection was closed. // Typically all ProxyServer objects associated with this connection will be @@ -156,6 +161,9 @@ CleanupIt Connection::addSyncCleanup(std::function fn) void Connection::removeSyncCleanup(CleanupIt it) { + // Require cleanup functions to be removed on the event loop thread to avoid + // needing to deal with them being removed in the middle of a disconnect. + assert(std::this_thread::get_id() == m_loop->m_thread_id); const Lock lock(m_loop->m_mutex); m_sync_cleanup_fns.erase(it); } @@ -307,27 +315,32 @@ bool EventLoop::done() const std::tuple SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function& make_thread) { - const std::unique_lock lock(mutex); - auto thread = threads.find(connection); - if (thread != threads.end()) return {thread, false}; - thread = threads.emplace( - std::piecewise_construct, std::forward_as_tuple(connection), - std::forward_as_tuple(make_thread(), connection, /* destroy_connection= */ false)).first; - thread->second.setDisconnectCallback([&threads, &mutex, thread] { - // Note: it is safe to use the `thread` iterator in this cleanup - // function, because the iterator would only be invalid if the map entry - // was removed, and if the map entry is removed the ProxyClient - // destructor unregisters the cleanup. - - // Connection is being destroyed before thread client is, so reset - // thread client m_disconnect_cb member so thread client destructor does not - // try to unregister this callback after connection is destroyed. - // Remove connection pointer about to be destroyed from the map + assert(std::this_thread::get_id() == connection->m_loop->m_thread_id); + ConnThread thread; + bool inserted; + { const std::unique_lock lock(mutex); - thread->second.m_disconnect_cb.reset(); - threads.erase(thread); - }); - return {thread, true}; + std::tie(thread, inserted) = threads.try_emplace(connection); + } + if (inserted) { + thread->second.emplace(make_thread(), connection, /* destroy_connection= */ false); + thread->second->m_disconnect_cb = connection->addSyncCleanup([&threads, &mutex, thread] { + // Note: it is safe to use the `thread` iterator in this cleanup + // function, because the iterator would only be invalid if the map entry + // was removed, and if the map entry is removed the ProxyClient + // destructor unregisters the cleanup. + + // Connection is being destroyed before thread client is, so reset + // thread client m_disconnect_cb member so thread client destructor does not + // try to unregister this callback after connection is destroyed. + thread->second->m_disconnect_cb.reset(); + + // Remove connection pointer about to be destroyed from the map + const std::unique_lock lock(mutex); + threads.erase(thread); + }); + } + return {thread, inserted}; } ProxyClient::~ProxyClient() @@ -336,17 +349,18 @@ ProxyClient::~ProxyClient() // cleanup callback that was registered to handle the connection being // destroyed before the thread being destroyed. if (m_disconnect_cb) { - m_context.connection->removeSyncCleanup(*m_disconnect_cb); + // Remove disconnect callback on the event loop thread with + // loop->sync(), so if the connection is broken there is not a race + // between this thread trying to remove the callback and the disconnect + // handler attempting to call it. + m_context.loop->sync([&]() { + if (m_disconnect_cb) { + m_context.connection->removeSyncCleanup(*m_disconnect_cb); + } + }); } } -void ProxyClient::setDisconnectCallback(const std::function& fn) -{ - assert(fn); - assert(!m_disconnect_cb); - m_disconnect_cb = m_context.connection->addSyncCleanup(fn); -} - ProxyServer::ProxyServer(ThreadContext& thread_context, std::thread&& thread) : m_thread_context(thread_context), m_thread(std::move(thread)) { From a5056a45b43b550f6a8f526f33e33fc2027e6e62 Mon Sep 17 00:00:00 2001 From: Sjors Provoost Date: Wed, 10 Sep 2025 11:20:24 +0200 Subject: [PATCH 3/3] Replace KJ_DEFER with kj::defer Review hint: git show --color-moved=dimmed-zebra --color-moved-ws=ignore-space-change --- include/mp/type-context.h | 40 ++++++++++++++++++++------------------- src/mp/proxy.cpp | 4 +++- 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/include/mp/type-context.h b/include/mp/type-context.h index ee77f17..ad2f3d6 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -103,25 +103,27 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // recursive call (IPC call calling back to the caller which // makes another IPC call), so avoid modifying the map. const bool erase_thread{inserted}; - KJ_DEFER(if (erase_thread) { - // Erase the request_threads entry on the event loop - // thread with loop->sync(), so if the connection is - // broken there is not a race between this thread and - // the disconnect handler trying to destroy the thread - // client object. - server.m_context.loop->sync([&] { - // Look up the thread again without using existing - // iterator since entry may no longer be there after - // a disconnect. Destroy node after releasing - // Waiter::m_mutex, so the ProxyClient - // destructor is able to use EventLoop::mutex - // without violating lock order. - ConnThreads::node_type removed; - { - std::unique_lock lock(thread_context.waiter->m_mutex); - removed = request_threads.extract(server.m_context.connection); - } - }); + [[maybe_unused]] const auto _cleanup = kj::defer([&] { + if (erase_thread) { + // Erase the request_threads entry on the event loop + // thread with loop->sync(), so if the connection is + // broken there is not a race between this thread and + // the disconnect handler trying to destroy the thread + // client object. + server.m_context.loop->sync([&] { + // Look up the thread again without using existing + // iterator since entry may no longer be there after + // a disconnect. Destroy node after releasing + // Waiter::m_mutex, so the ProxyClient + // destructor is able to use EventLoop::mutex + // without violating lock order. + ConnThreads::node_type removed; + { + std::unique_lock lock(thread_context.waiter->m_mutex); + removed = request_threads.extract(server.m_context.connection); + } + }); + } }); fn.invoke(server_context, args...); } diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 6b13682..ea07f70 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -223,7 +223,9 @@ void EventLoop::loop() { assert(!g_thread_context.loop_thread); g_thread_context.loop_thread = true; - KJ_DEFER(g_thread_context.loop_thread = false); + [[maybe_unused]] const auto _cleanup = kj::defer([&] { + g_thread_context.loop_thread = false; + }); { const Lock lock(m_mutex);