Skip to content

Commit 6d6096a

Browse files
committed
proxy-io.h: add Waiter::m_mutex thread safety annotations
This is just a refactoring, no behavior is changing. This is only adding annotations and switching from unannotated to annotated types. In SetThread, the first two parameters are combined into a single GuardedRef parameter to avoid -Wthread-safety-reference warnings at call sites like "warning: passing variable 'callback_threads' by reference requires holding mutex 'thread_context.waiter->m_mutex'"
1 parent 4e365b0 commit 6d6096a

File tree

5 files changed

+33
-27
lines changed

5 files changed

+33
-27
lines changed

include/mp/proxy-io.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -275,16 +275,16 @@ struct Waiter
275275
template <typename Fn>
276276
void post(Fn&& fn)
277277
{
278-
const std::unique_lock<std::mutex> lock(m_mutex);
278+
const Lock lock(m_mutex);
279279
assert(!m_fn);
280280
m_fn = std::forward<Fn>(fn);
281281
m_cv.notify_all();
282282
}
283283

284284
template <class Predicate>
285-
void wait(std::unique_lock<std::mutex>& lock, Predicate pred)
285+
void wait(Lock& lock, Predicate pred)
286286
{
287-
m_cv.wait(lock, [&] {
287+
m_cv.wait(lock.m_lock, [&]() MP_REQUIRES(m_mutex) {
288288
// Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
289289
// a lost-wakeup bug. A new m_fn and m_cv notification might be sent
290290
// after the fn() call and before the lock.lock() call in this loop
@@ -307,9 +307,9 @@ struct Waiter
307307
//! mutexes than necessary. This mutex can be held at the same time as
308308
//! EventLoop::m_mutex as long as Waiter::mutex is locked first and
309309
//! EventLoop::m_mutex is locked second.
310-
std::mutex m_mutex;
310+
Mutex m_mutex;
311311
std::condition_variable m_cv;
312-
std::optional<kj::Function<void()>> m_fn;
312+
std::optional<kj::Function<void()>> m_fn MP_GUARDED_BY(m_mutex);
313313
};
314314

315315
//! Object holding network & rpc state associated with either an incoming server
@@ -540,7 +540,7 @@ using ConnThread = ConnThreads::iterator;
540540
// Retrieve ProxyClient<Thread> object associated with this connection from a
541541
// map, or create a new one and insert it into the map. Return map iterator and
542542
// inserted bool.
543-
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread);
543+
std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread);
544544

545545
struct ThreadContext
546546
{
@@ -556,7 +556,7 @@ struct ThreadContext
556556
//! `callbackThread` argument it passes in the request, used by the server
557557
//! in case it needs to make callbacks into the client that need to execute
558558
//! while the client is waiting. This will be set to a local thread object.
559-
ConnThreads callback_threads;
559+
ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex);
560560

561561
//! When client is making a request to a server, this is the `thread`
562562
//! argument it passes in the request, used to control which thread on
@@ -565,7 +565,7 @@ struct ThreadContext
565565
//! by makeThread. If a client call is being made from a thread currently
566566
//! handling a server request, this will be set to the `callbackThread`
567567
//! request thread argument passed in that request.
568-
ConnThreads request_threads;
568+
ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex);
569569

570570
//! Whether this thread is a capnp event loop thread. Not really used except
571571
//! to assert false if there's an attempt to execute a blocking operation

include/mp/proxy-types.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
617617
const char* disconnected = nullptr;
618618
proxy_client.m_context.loop->sync([&]() {
619619
if (!proxy_client.m_context.connection) {
620-
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
620+
const Lock lock(thread_context.waiter->m_mutex);
621621
done = true;
622622
disconnected = "IPC client method called after disconnect.";
623623
thread_context.waiter->m_cv.notify_all();
@@ -644,7 +644,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
644644
} catch (...) {
645645
exception = std::current_exception();
646646
}
647-
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
647+
const Lock lock(thread_context.waiter->m_mutex);
648648
done = true;
649649
thread_context.waiter->m_cv.notify_all();
650650
},
@@ -656,13 +656,13 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
656656
proxy_client.m_context.loop->logPlain()
657657
<< "{" << thread_context.thread_name << "} IPC client exception " << kj_exception;
658658
}
659-
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
659+
const Lock lock(thread_context.waiter->m_mutex);
660660
done = true;
661661
thread_context.waiter->m_cv.notify_all();
662662
}));
663663
});
664664

665-
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
665+
Lock lock(thread_context.waiter->m_mutex);
666666
thread_context.waiter->wait(lock, [&done]() { return done; });
667667
if (exception) std::rethrow_exception(exception);
668668
if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception;

include/mp/type-context.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ void CustomBuildField(TypeList<>,
2525
// Also store the Thread::Client reference in the callback_threads map so
2626
// future calls over this connection can reuse it.
2727
auto [callback_thread, _]{SetThread(
28-
thread_context.callback_threads, thread_context.waiter->m_mutex, &connection,
28+
GuardedRef{thread_context.waiter->m_mutex, thread_context.callback_threads}, &connection,
2929
[&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})); })};
3030

3131
// Call remote ThreadMap.makeThread function so server will create a
@@ -43,7 +43,7 @@ void CustomBuildField(TypeList<>,
4343
return request.send().getResult(); // Nonblocking due to capnp request pipelining.
4444
}};
4545
auto [request_thread, _1]{SetThread(
46-
thread_context.request_threads, thread_context.waiter->m_mutex,
46+
GuardedRef{thread_context.waiter->m_mutex, thread_context.request_threads},
4747
&connection, make_request_thread)};
4848

4949
auto context = output.init();
@@ -90,7 +90,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
9090
auto& thread_context = g_thread_context;
9191
auto& request_threads = thread_context.request_threads;
9292
auto [request_thread, inserted]{SetThread(
93-
request_threads, thread_context.waiter->m_mutex,
93+
GuardedRef{thread_context.waiter->m_mutex, request_threads},
9494
server.m_context.connection,
9595
[&] { return context_arg.getCallbackThread(); })};
9696

@@ -101,7 +101,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
101101
// makes another IPC call), so avoid modifying the map.
102102
const bool erase_thread{inserted};
103103
KJ_DEFER(if (erase_thread) {
104-
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
104+
Lock lock(thread_context.waiter->m_mutex);
105105
// Call erase here with a Connection* argument instead
106106
// of an iterator argument, because the `request_thread`
107107
// iterator may be invalid if the connection is closed

include/mp/util.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,13 @@ class MP_SCOPED_CAPABILITY Lock {
182182
std::unique_lock<std::mutex> m_lock;
183183
};
184184

185+
template<typename T>
186+
struct GuardedRef
187+
{
188+
Mutex& mutex;
189+
T& ref MP_GUARDED_BY(mutex);
190+
};
191+
185192
//! Analog to std::lock_guard that unlocks instead of locks.
186193
template <typename Lock>
187194
struct UnlockGuard

src/mp/proxy.cpp

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
#include <kj/memory.h>
2626
#include <map>
2727
#include <memory>
28-
#include <mutex>
2928
#include <optional>
3029
#include <stdexcept>
3130
#include <string>
@@ -305,15 +304,15 @@ bool EventLoop::done() const
305304
return m_num_clients == 0 && m_async_fns->empty();
306305
}
307306

308-
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread)
307+
std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread)
309308
{
310-
const std::unique_lock<std::mutex> lock(mutex);
311-
auto thread = threads.find(connection);
312-
if (thread != threads.end()) return {thread, false};
313-
thread = threads.emplace(
309+
const Lock lock(threads.mutex);
310+
auto thread = threads.ref.find(connection);
311+
if (thread != threads.ref.end()) return {thread, false};
312+
thread = threads.ref.emplace(
314313
std::piecewise_construct, std::forward_as_tuple(connection),
315314
std::forward_as_tuple(make_thread(), connection, /* destroy_connection= */ false)).first;
316-
thread->second.setDisconnectCallback([&threads, &mutex, thread] {
315+
thread->second.setDisconnectCallback([threads, thread] {
317316
// Note: it is safe to use the `thread` iterator in this cleanup
318317
// function, because the iterator would only be invalid if the map entry
319318
// was removed, and if the map entry is removed the ProxyClient<Thread>
@@ -323,9 +322,9 @@ std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex,
323322
// thread client m_disconnect_cb member so thread client destructor does not
324323
// try to unregister this callback after connection is destroyed.
325324
// Remove connection pointer about to be destroyed from the map
326-
const std::unique_lock<std::mutex> lock(mutex);
325+
const Lock lock(threads.mutex);
327326
thread->second.m_disconnect_cb.reset();
328-
threads.erase(thread);
327+
threads.ref.erase(thread);
329328
});
330329
return {thread, true};
331330
}
@@ -364,7 +363,7 @@ ProxyServer<Thread>::~ProxyServer()
364363
assert(m_thread_context.waiter.get());
365364
std::unique_ptr<Waiter> waiter;
366365
{
367-
const std::unique_lock<std::mutex> lock(m_thread_context.waiter->m_mutex);
366+
const Lock lock(m_thread_context.waiter->m_mutex);
368367
//! Reset thread context waiter pointer, as shutdown signal for done
369368
//! lambda passed as waiter->wait() argument in makeThread code below.
370369
waiter = std::move(m_thread_context.waiter);
@@ -398,7 +397,7 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
398397
g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
399398
g_thread_context.waiter = std::make_unique<Waiter>();
400399
thread_context.set_value(&g_thread_context);
401-
std::unique_lock<std::mutex> lock(g_thread_context.waiter->m_mutex);
400+
Lock lock(g_thread_context.waiter->m_mutex);
402401
// Wait for shutdown signal from ProxyServer<Thread> destructor (signal
403402
// is just waiter getting set to null.)
404403
g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });

0 commit comments

Comments
 (0)