@@ -81,6 +81,11 @@ ProxyContext::ProxyContext(Connection* connection) : connection(connection), loo
81
81
82
82
Connection::~Connection ()
83
83
{
84
+ // Connection destructor is always called on the event loop thread. If this
85
+ // is a local disconnect, it will trigger I/O, so this needs to run on the
86
+ // event loop thread, and if there was a remote disconnect this, is called
87
+ // by an onDisconnect callback directly from the event loop thrread.
88
+ assert (std::this_thread::get_id () == m_loop->m_thread_id );
84
89
// Shut down RPC system first, since this will garbage collect any
85
90
// ProxyServer objects that were not freed before the connection was closed.
86
91
// Typically all ProxyServer objects associated with this connection will be
@@ -156,6 +161,9 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
156
161
157
162
void Connection::removeSyncCleanup (CleanupIt it)
158
163
{
164
+ // Require cleanup functions to be removed on the event loop thread to avoid
165
+ // needing to deal with them being removed in the middle of a disconnect.
166
+ assert (std::this_thread::get_id () == m_loop->m_thread_id );
159
167
const Lock lock (m_loop->m_mutex );
160
168
m_sync_cleanup_fns.erase (it);
161
169
}
@@ -307,27 +315,32 @@ bool EventLoop::done() const
307
315
308
316
std::tuple<ConnThread, bool > SetThread (ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread)
309
317
{
310
- const std::unique_lock<std::mutex> lock (mutex);
311
- auto thread = threads.find (connection);
312
- if (thread != threads.end ()) return {thread, false };
313
- thread = threads.emplace (
314
- std::piecewise_construct, std::forward_as_tuple (connection),
315
- std::forward_as_tuple (make_thread (), connection, /* destroy_connection= */ false )).first ;
316
- thread->second .setDisconnectCallback ([&threads, &mutex, thread] {
317
- // Note: it is safe to use the `thread` iterator in this cleanup
318
- // function, because the iterator would only be invalid if the map entry
319
- // was removed, and if the map entry is removed the ProxyClient<Thread>
320
- // destructor unregisters the cleanup.
321
-
322
- // Connection is being destroyed before thread client is, so reset
323
- // thread client m_disconnect_cb member so thread client destructor does not
324
- // try to unregister this callback after connection is destroyed.
325
- // Remove connection pointer about to be destroyed from the map
318
+ assert (std::this_thread::get_id () == connection->m_loop ->m_thread_id );
319
+ ConnThread thread;
320
+ bool inserted;
321
+ {
326
322
const std::unique_lock<std::mutex> lock (mutex);
327
- thread->second .m_disconnect_cb .reset ();
328
- threads.erase (thread);
329
- });
330
- return {thread, true };
323
+ std::tie (thread, inserted) = threads.try_emplace (connection);
324
+ }
325
+ if (inserted) {
326
+ thread->second .emplace (make_thread (), connection, /* destroy_connection= */ false );
327
+ thread->second ->m_disconnect_cb = connection->addSyncCleanup ([&threads, &mutex, thread] {
328
+ // Note: it is safe to use the `thread` iterator in this cleanup
329
+ // function, because the iterator would only be invalid if the map entry
330
+ // was removed, and if the map entry is removed the ProxyClient<Thread>
331
+ // destructor unregisters the cleanup.
332
+
333
+ // Connection is being destroyed before thread client is, so reset
334
+ // thread client m_disconnect_cb member so thread client destructor does not
335
+ // try to unregister this callback after connection is destroyed.
336
+ thread->second ->m_disconnect_cb .reset ();
337
+
338
+ // Remove connection pointer about to be destroyed from the map
339
+ const std::unique_lock<std::mutex> lock (mutex);
340
+ threads.erase (thread);
341
+ });
342
+ }
343
+ return {thread, inserted};
331
344
}
332
345
333
346
ProxyClient<Thread>::~ProxyClient ()
@@ -336,17 +349,18 @@ ProxyClient<Thread>::~ProxyClient()
336
349
// cleanup callback that was registered to handle the connection being
337
350
// destroyed before the thread being destroyed.
338
351
if (m_disconnect_cb) {
339
- m_context.connection ->removeSyncCleanup (*m_disconnect_cb);
352
+ // Remove disconnect callback on the event loop thread with
353
+ // loop->sync(), so if the connection is broken there is not a race
354
+ // between this thread trying to remove the callback and the disconnect
355
+ // handler attempting to call it.
356
+ m_context.loop ->sync ([&]() {
357
+ if (m_disconnect_cb) {
358
+ m_context.connection ->removeSyncCleanup (*m_disconnect_cb);
359
+ }
360
+ });
340
361
}
341
362
}
342
363
343
- void ProxyClient<Thread>::setDisconnectCallback(const std::function<void ()>& fn)
344
- {
345
- assert (fn);
346
- assert (!m_disconnect_cb);
347
- m_disconnect_cb = m_context.connection ->addSyncCleanup (fn);
348
- }
349
-
350
364
ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
351
365
: m_thread_context(thread_context), m_thread(std::move(thread))
352
366
{
0 commit comments