Skip to content

Commit 1d6d854

Browse files
committed
proxy-io.h: add Waiter::m_mutex thread safety annotations
1 parent cd6d95a commit 1d6d854

File tree

4 files changed

+16
-17
lines changed

4 files changed

+16
-17
lines changed

include/mp/proxy-io.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -273,16 +273,16 @@ struct Waiter
273273
template <typename Fn>
274274
void post(Fn&& fn)
275275
{
276-
const std::unique_lock<std::mutex> lock(m_mutex);
276+
const Lock lock(m_mutex);
277277
assert(!m_fn);
278278
m_fn = std::forward<Fn>(fn);
279279
m_cv.notify_all();
280280
}
281281

282282
template <class Predicate>
283-
void wait(std::unique_lock<std::mutex>& lock, Predicate pred)
283+
void wait(Lock& lock, Predicate pred)
284284
{
285-
m_cv.wait(lock, [&] {
285+
m_cv.wait(lock.m_lock, [&] {
286286
// Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
287287
// a lost-wakeup bug. A new m_fn and m_cv notification might be sent
288288
// after the fn() call and before the lock.lock() call in this loop
@@ -305,7 +305,7 @@ struct Waiter
305305
//! mutexes than necessary. This mutex can be held at the same time as
306306
//! EventLoop::m_mutex as long as Waiter::mutex is locked first and
307307
//! EventLoop::m_mutex is locked second.
308-
std::mutex m_mutex;
308+
Mutex m_mutex;
309309
std::condition_variable m_cv;
310310
std::optional<kj::Function<void()>> m_fn;
311311
};
@@ -544,7 +544,7 @@ using ConnThread = ConnThreads::iterator;
544544
// Retrieve ProxyClient<Thread> object associated with this connection from a
545545
// map, or create a new one and insert it into the map. Return map iterator and
546546
// inserted bool.
547-
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread);
547+
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, Mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread);
548548

549549
//! The thread_local ThreadContext g_thread_context struct provides information
550550
//! about individual threads and a way of communicating between them. Because
@@ -609,7 +609,7 @@ struct ThreadContext
609609
//! request thread argument passed in that request.
610610
//!
611611
//! Synchronization note: \ref callback_threads note applies here as well.
612-
ConnThreads request_threads;
612+
ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex);
613613

614614
//! Whether this thread is a capnp event loop thread. Not really used except
615615
//! to assert false if there's an attempt to execute a blocking operation

include/mp/proxy-types.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
617617
const char* disconnected = nullptr;
618618
proxy_client.m_context.loop->sync([&]() {
619619
if (!proxy_client.m_context.connection) {
620-
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
620+
const Lock lock(thread_context.waiter->m_mutex);
621621
done = true;
622622
disconnected = "IPC client method called after disconnect.";
623623
thread_context.waiter->m_cv.notify_all();
@@ -644,7 +644,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
644644
} catch (...) {
645645
exception = std::current_exception();
646646
}
647-
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
647+
const Lock lock(thread_context.waiter->m_mutex);
648648
done = true;
649649
thread_context.waiter->m_cv.notify_all();
650650
},
@@ -656,13 +656,13 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
656656
proxy_client.m_context.loop->logPlain()
657657
<< "{" << thread_context.thread_name << "} IPC client exception " << kj_exception;
658658
}
659-
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
659+
const Lock lock(thread_context.waiter->m_mutex);
660660
done = true;
661661
thread_context.waiter->m_cv.notify_all();
662662
}));
663663
});
664664

665-
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
665+
Lock lock(thread_context.waiter->m_mutex);
666666
thread_context.waiter->wait(lock, [&done]() { return done; });
667667
if (exception) std::rethrow_exception(exception);
668668
if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception;

include/mp/type-context.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
118118
// without violating lock order.
119119
ConnThreads::node_type removed;
120120
{
121-
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
121+
Lock lock(thread_context.waiter->m_mutex);
122122
removed = request_threads.extract(server.m_context.connection);
123123
}
124124
});

src/mp/proxy.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
#include <kj/memory.h>
2626
#include <map>
2727
#include <memory>
28-
#include <mutex>
2928
#include <optional>
3029
#include <stdexcept>
3130
#include <string>
@@ -308,13 +307,13 @@ bool EventLoop::done() const
308307
return m_num_clients == 0 && m_async_fns->empty();
309308
}
310309

311-
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread)
310+
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, Mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread)
312311
{
313312
assert(std::this_thread::get_id() == connection->m_loop->m_thread_id);
314313
ConnThread thread;
315314
bool inserted;
316315
{
317-
const std::unique_lock<std::mutex> lock(mutex);
316+
const Lock lock(mutex);
318317
std::tie(thread, inserted) = threads.try_emplace(connection);
319318
}
320319
if (inserted) {
@@ -331,7 +330,7 @@ std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex,
331330
thread->second->m_disconnect_cb.reset();
332331

333332
// Remove connection pointer about to be destroyed from the map
334-
const std::unique_lock<std::mutex> lock(mutex);
333+
const Lock lock(mutex);
335334
threads.erase(thread);
336335
});
337336
}
@@ -373,7 +372,7 @@ ProxyServer<Thread>::~ProxyServer()
373372
assert(m_thread_context.waiter.get());
374373
std::unique_ptr<Waiter> waiter;
375374
{
376-
const std::unique_lock<std::mutex> lock(m_thread_context.waiter->m_mutex);
375+
const Lock lock(m_thread_context.waiter->m_mutex);
377376
//! Reset thread context waiter pointer, as shutdown signal for done
378377
//! lambda passed as waiter->wait() argument in makeThread code below.
379378
waiter = std::move(m_thread_context.waiter);
@@ -407,7 +406,7 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
407406
g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
408407
g_thread_context.waiter = std::make_unique<Waiter>();
409408
thread_context.set_value(&g_thread_context);
410-
std::unique_lock<std::mutex> lock(g_thread_context.waiter->m_mutex);
409+
Lock lock(g_thread_context.waiter->m_mutex);
411410
// Wait for shutdown signal from ProxyServer<Thread> destructor (signal
412411
// is just waiter getting set to null.)
413412
g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });

0 commit comments

Comments
 (0)