Skip to content

Commit f219f1b

Browse files
committed
proxy-io.h: add Waiter::m_mutex thread safety annotations
1 parent 7a8a99c commit f219f1b

File tree

4 files changed

+16
-17
lines changed

4 files changed

+16
-17
lines changed

include/mp/proxy-io.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -273,16 +273,16 @@ struct Waiter
273273
template <typename Fn>
274274
void post(Fn&& fn)
275275
{
276-
const std::unique_lock<std::mutex> lock(m_mutex);
276+
const Lock lock(m_mutex);
277277
assert(!m_fn);
278278
m_fn = std::forward<Fn>(fn);
279279
m_cv.notify_all();
280280
}
281281

282282
template <class Predicate>
283-
void wait(std::unique_lock<std::mutex>& lock, Predicate pred)
283+
void wait(Lock& lock, Predicate pred)
284284
{
285-
m_cv.wait(lock, [&] {
285+
m_cv.wait(lock.m_lock, [&] {
286286
// Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
287287
// a lost-wakeup bug. A new m_fn and m_cv notification might be sent
288288
// after the fn() call and before the lock.lock() call in this loop
@@ -305,7 +305,7 @@ struct Waiter
305305
//! mutexes than necessary. This mutex can be held at the same time as
306306
//! EventLoop::m_mutex as long as Waiter::mutex is locked first and
307307
//! EventLoop::m_mutex is locked second.
308-
std::mutex m_mutex;
308+
Mutex m_mutex;
309309
std::condition_variable m_cv;
310310
std::optional<kj::Function<void()>> m_fn;
311311
};
@@ -544,7 +544,7 @@ using ConnThread = ConnThreads::iterator;
544544
// Retrieve ProxyClient<Thread> object associated with this connection from a
545545
// map, or create a new one and insert it into the map. Return map iterator and
546546
// inserted bool.
547-
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread);
547+
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, Mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread);
548548

549549
//! The thread_local ThreadContext g_thread_context struct provides information
550550
//! about individual threads and a way of communicating between them. Because
@@ -609,7 +609,7 @@ struct ThreadContext
609609
//! request thread argument passed in that request.
610610
//!
611611
//! Synchronization note: \ref callback_threads note applies here as well.
612-
ConnThreads request_threads;
612+
ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex);
613613

614614
//! Whether this thread is a capnp event loop thread. Not really used except
615615
//! to assert false if there's an attempt to execute a blocking operation

include/mp/proxy-types.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
617617
const char* disconnected = nullptr;
618618
proxy_client.m_context.loop->sync([&]() {
619619
if (!proxy_client.m_context.connection) {
620-
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
620+
const Lock lock(thread_context.waiter->m_mutex);
621621
done = true;
622622
disconnected = "IPC client method called after disconnect.";
623623
thread_context.waiter->m_cv.notify_all();
@@ -644,7 +644,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
644644
} catch (...) {
645645
exception = std::current_exception();
646646
}
647-
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
647+
const Lock lock(thread_context.waiter->m_mutex);
648648
done = true;
649649
thread_context.waiter->m_cv.notify_all();
650650
},
@@ -656,13 +656,13 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
656656
proxy_client.m_context.loop->logPlain()
657657
<< "{" << thread_context.thread_name << "} IPC client exception " << kj_exception;
658658
}
659-
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
659+
const Lock lock(thread_context.waiter->m_mutex);
660660
done = true;
661661
thread_context.waiter->m_cv.notify_all();
662662
}));
663663
});
664664

665-
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
665+
Lock lock(thread_context.waiter->m_mutex);
666666
thread_context.waiter->wait(lock, [&done]() { return done; });
667667
if (exception) std::rethrow_exception(exception);
668668
if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception;

include/mp/type-context.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
118118
// without violating lock order.
119119
ConnThreads::node_type removed;
120120
{
121-
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
121+
Lock lock(thread_context.waiter->m_mutex);
122122
removed = request_threads.extract(server.m_context.connection);
123123
}
124124
});

src/mp/proxy.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
#include <kj/memory.h>
2626
#include <map>
2727
#include <memory>
28-
#include <mutex>
2928
#include <optional>
3029
#include <stdexcept>
3130
#include <string>
@@ -313,13 +312,13 @@ bool EventLoop::done() const
313312
return m_num_clients == 0 && m_async_fns->empty();
314313
}
315314

316-
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread)
315+
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, Mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread)
317316
{
318317
assert(std::this_thread::get_id() == connection->m_loop->m_thread_id);
319318
ConnThread thread;
320319
bool inserted;
321320
{
322-
const std::unique_lock<std::mutex> lock(mutex);
321+
const Lock lock(mutex);
323322
std::tie(thread, inserted) = threads.try_emplace(connection);
324323
}
325324
if (inserted) {
@@ -336,7 +335,7 @@ std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex,
336335
thread->second->m_disconnect_cb.reset();
337336

338337
// Remove connection pointer about to be destroyed from the map
339-
const std::unique_lock<std::mutex> lock(mutex);
338+
const Lock lock(mutex);
340339
threads.erase(thread);
341340
});
342341
}
@@ -378,7 +377,7 @@ ProxyServer<Thread>::~ProxyServer()
378377
assert(m_thread_context.waiter.get());
379378
std::unique_ptr<Waiter> waiter;
380379
{
381-
const std::unique_lock<std::mutex> lock(m_thread_context.waiter->m_mutex);
380+
const Lock lock(m_thread_context.waiter->m_mutex);
382381
//! Reset thread context waiter pointer, as shutdown signal for done
383382
//! lambda passed as waiter->wait() argument in makeThread code below.
384383
waiter = std::move(m_thread_context.waiter);
@@ -412,7 +411,7 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
412411
g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
413412
g_thread_context.waiter = std::make_unique<Waiter>();
414413
thread_context.set_value(&g_thread_context);
415-
std::unique_lock<std::mutex> lock(g_thread_context.waiter->m_mutex);
414+
Lock lock(g_thread_context.waiter->m_mutex);
416415
// Wait for shutdown signal from ProxyServer<Thread> destructor (signal
417416
// is just waiter getting set to null.)
418417
g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });

0 commit comments

Comments
 (0)