Skip to content

Commit fe1cbed

Browse files
committed
proxy-io.h: add Waiter::m_mutex thread safety annotations
1 parent 2b05ed7 commit fe1cbed

File tree

4 files changed

+16
-16
lines changed

4 files changed

+16
-16
lines changed

include/mp/proxy-io.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -282,16 +282,16 @@ struct Waiter
282282
template <typename Fn>
283283
void post(Fn&& fn)
284284
{
285-
const std::unique_lock<std::mutex> lock(m_mutex);
285+
const Lock lock(m_mutex);
286286
assert(!m_fn);
287287
m_fn = std::forward<Fn>(fn);
288288
m_cv.notify_all();
289289
}
290290

291291
template <class Predicate>
292-
void wait(std::unique_lock<std::mutex>& lock, Predicate pred)
292+
void wait(Lock& lock, Predicate pred)
293293
{
294-
m_cv.wait(lock, [&] {
294+
m_cv.wait(lock.m_lock, [&] {
295295
// Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
296296
// a lost-wakeup bug. A new m_fn and m_cv notification might be sent
297297
// after the fn() call and before the lock.lock() call in this loop
@@ -314,7 +314,7 @@ struct Waiter
314314
//! mutexes than necessary. This mutex can be held at the same time as
315315
//! EventLoop::m_mutex as long as Waiter::mutex is locked first and
316316
//! EventLoop::m_mutex is locked second.
317-
std::mutex m_mutex;
317+
Mutex m_mutex;
318318
std::condition_variable m_cv;
319319
std::optional<kj::Function<void()>> m_fn;
320320
};
@@ -544,7 +544,7 @@ void ProxyServerBase<Interface, Impl>::invokeDestroy()
544544
// Retrieve ProxyClient<Thread> object associated with this connection from a
545545
// map, or create a new one and insert it into the map. Return map iterator and
546546
// inserted bool.
547-
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread);
547+
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, Mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread);
548548

549549
//! The thread_local ThreadContext g_thread_context struct provides information
550550
//! about individual threads and a way of communicating between them. Because
@@ -609,7 +609,7 @@ struct ThreadContext
609609
//! request thread argument passed in that request.
610610
//!
611611
//! Synchronization note: \ref callback_threads note applies here as well.
612-
ConnThreads request_threads;
612+
ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex);
613613

614614
//! Whether this thread is a capnp event loop thread. Not really used except
615615
//! to assert false if there's an attempt to execute a blocking operation

include/mp/proxy-types.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
617617
const char* disconnected = nullptr;
618618
proxy_client.m_context.loop->sync([&]() {
619619
if (!proxy_client.m_context.connection) {
620-
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
620+
const Lock lock(thread_context.waiter->m_mutex);
621621
done = true;
622622
disconnected = "IPC client method called after disconnect.";
623623
thread_context.waiter->m_cv.notify_all();
@@ -644,7 +644,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
644644
} catch (...) {
645645
exception = std::current_exception();
646646
}
647-
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
647+
const Lock lock(thread_context.waiter->m_mutex);
648648
done = true;
649649
thread_context.waiter->m_cv.notify_all();
650650
},
@@ -656,13 +656,13 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
656656
proxy_client.m_context.loop->logPlain()
657657
<< "{" << thread_context.thread_name << "} IPC client exception " << kj_exception;
658658
}
659-
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
659+
const Lock lock(thread_context.waiter->m_mutex);
660660
done = true;
661661
thread_context.waiter->m_cv.notify_all();
662662
}));
663663
});
664664

665-
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
665+
Lock lock(thread_context.waiter->m_mutex);
666666
thread_context.waiter->wait(lock, [&done]() { return done; });
667667
if (exception) std::rethrow_exception(exception);
668668
if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception;

include/mp/type-context.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
112112
server.m_context.loop->sync([&] {
113113
// Look up the thread again since it may no longer
114114
// be there after a disconnect.
115-
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
115+
Lock lock(thread_context.waiter->m_mutex);
116116
ConnThread thread = request_threads.find(server.m_context.connection);
117117
if (thread != request_threads.end()) {
118118
// Release Waiter::m_mutex while calling the

src/mp/proxy.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -308,13 +308,13 @@ bool EventLoop::done() const
308308
return m_num_clients == 0 && m_async_fns->empty();
309309
}
310310

311-
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread)
311+
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, Mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread)
312312
{
313313
assert(std::this_thread::get_id() == connection->m_loop->m_thread_id);
314314
ConnThread thread;
315315
bool inserted;
316316
{
317-
const std::unique_lock<std::mutex> lock(mutex);
317+
const Lock lock(mutex);
318318
std::tie(thread, inserted) = threads.try_emplace(connection);
319319
}
320320
if (inserted) {
@@ -331,7 +331,7 @@ std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex,
331331
thread->second->m_disconnect_cb.reset();
332332

333333
// Remove connection pointer about to be destroyed from the map
334-
const std::unique_lock<std::mutex> lock(mutex);
334+
const Lock lock(mutex);
335335
threads.erase(thread);
336336
});
337337
}
@@ -371,7 +371,7 @@ ProxyServer<Thread>::~ProxyServer()
371371
assert(m_thread_context.waiter.get());
372372
std::unique_ptr<Waiter> waiter;
373373
{
374-
const std::unique_lock<std::mutex> lock(m_thread_context.waiter->m_mutex);
374+
const Lock lock(m_thread_context.waiter->m_mutex);
375375
//! Reset thread context waiter pointer, as shutdown signal for done
376376
//! lambda passed as waiter->wait() argument in makeThread code below.
377377
waiter = std::move(m_thread_context.waiter);
@@ -405,7 +405,7 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
405405
g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
406406
g_thread_context.waiter = std::make_unique<Waiter>();
407407
thread_context.set_value(&g_thread_context);
408-
std::unique_lock<std::mutex> lock(g_thread_context.waiter->m_mutex);
408+
Lock lock(g_thread_context.waiter->m_mutex);
409409
// Wait for shutdown signal from ProxyServer<Thread> destructor (signal
410410
// is just waiter getting set to null.)
411411
g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });

0 commit comments

Comments
 (0)