Skip to content

Commit a3ae265

Browse files
committed
Extract some duplicated code for sync triggers and timers
1 parent 6a9a087 commit a3ae265

File tree

7 files changed

+101
-162
lines changed

7 files changed

+101
-162
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
-----------
1818

1919
### Internals
20-
* None.
20+
* Refactor the implementation of sync triggers and timers to eliminate some duplicated code. ([PR #7912](https://github.com/realm/realm-core/pull/7912))
2121

2222
----------------------------------------------
2323

src/realm/sync/client.cpp

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -502,10 +502,9 @@ void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper)
502502
return;
503503
}
504504

505-
REALM_ASSERT(m_actualize_and_finalize);
506505
m_unactualized_session_wrappers.push(util::bind_ptr(wrapper));
507506
}
508-
m_actualize_and_finalize->trigger();
507+
m_actualize_and_finalize.trigger();
509508
}
510509

511510

@@ -514,7 +513,6 @@ void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrappe
514513
// Thread safety required.
515514
{
516515
util::CheckedLockGuard lock{m_mutex};
517-
REALM_ASSERT(m_actualize_and_finalize);
518516
// The wrapper may have already been finalized before being abandoned
519517
// if we were stopped when it was created.
520518
if (wrapper->mark_abandoned())
@@ -530,7 +528,7 @@ void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrappe
530528
}
531529
m_abandoned_session_wrappers.push(std::move(wrapper));
532530
}
533-
m_actualize_and_finalize->trigger();
531+
m_actualize_and_finalize.trigger();
534532
}
535533

536534

@@ -1841,23 +1839,12 @@ ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ide
18411839
, m_ssl_verify_callback{std::move(ssl_verify_callback)} // DEPRECATED
18421840
, m_proxy_config{std::move(proxy_config)} // DEPRECATED
18431841
, m_reconnect_info{reconnect_info}
1842+
, m_on_idle{m_client, &Connection::on_idle, this}
18441843
, m_ident{ident}
18451844
, m_server_endpoint{std::move(endpoint)}
18461845
, m_authorization_header_name{authorization_header_name} // DEPRECATED
18471846
, m_custom_http_headers{custom_http_headers} // DEPRECATED
18481847
{
1849-
m_on_idle = m_client.create_trigger([this](Status status) {
1850-
if (status == ErrorCodes::OperationAborted)
1851-
return;
1852-
else if (!status.is_ok())
1853-
throw Exception(status);
1854-
1855-
REALM_ASSERT(m_activated);
1856-
if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
1857-
on_idle(); // Throws
1858-
// Connection object may be destroyed now.
1859-
}
1860-
});
18611848
}
18621849

18631850
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
@@ -1889,6 +1876,10 @@ void ClientImpl::Connection::resume_active_sessions()
18891876

18901877
void ClientImpl::Connection::on_idle()
18911878
{
1879+
REALM_ASSERT(m_activated);
1880+
if (m_state != ConnectionState::disconnected || m_num_active_sessions != 0)
1881+
return;
1882+
18921883
logger.debug(util::LogCategory::session, "Destroying connection object");
18931884
ClientImpl& client = get_client();
18941885
client.remove_connection(*this);

src/realm/sync/noinst/client_impl_base.cpp

Lines changed: 26 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,8 @@ ClientImpl::ClientImpl(ClientConfig config)
154154
, m_fix_up_object_ids{config.fix_up_object_ids}
155155
, m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
156156
, m_socket_provider{std::move(config.socket_provider)}
157-
, m_client_protocol{} // Throws
158157
, m_one_connection_per_session{config.one_connection_per_session}
159-
, m_random{}
158+
, m_actualize_and_finalize{*this, &ClientImpl::actualize_and_finalize_session_wrappers, this}
160159
{
161160
// FIXME: Would be better if seeding was up to the application.
162161
util::seed_prng_nondeterministically(m_random); // Throws
@@ -220,14 +219,6 @@ ClientImpl::ClientImpl(ClientConfig config)
220219
logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
221220
"never do this in production");
222221
}
223-
224-
m_actualize_and_finalize = create_trigger([this](Status status) {
225-
if (status == ErrorCodes::OperationAborted)
226-
return;
227-
else if (!status.is_ok())
228-
throw Exception(status);
229-
actualize_and_finalize_session_wrappers(); // Throws
230-
});
231222
}
232223

233224
void ClientImpl::incr_outstanding_posts()
@@ -297,25 +288,23 @@ void ClientImpl::drain_connections()
297288

298289

299290
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
300-
SyncSocketProvider::FunctionHandler&& handler)
291+
util::UniqueFunction<void()>&& handler)
301292
{
302293
REALM_ASSERT(m_socket_provider);
303294
incr_outstanding_posts();
304295
return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
305-
auto decr_guard = util::make_scope_exit([&]() noexcept {
296+
ScopeExit decr_guard([&]() noexcept {
306297
decr_outstanding_posts();
307298
});
308-
handler(status);
299+
if (status == ErrorCodes::OperationAborted)
300+
return;
301+
if (!status.is_ok())
302+
throw Exception(status);
303+
handler();
309304
});
310305
}
311306

312307

313-
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
314-
{
315-
REALM_ASSERT(m_socket_provider);
316-
return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
317-
}
318-
319308
Connection::~Connection()
320309
{
321310
if (m_websocket_sentinel) {
@@ -326,10 +315,9 @@ Connection::~Connection()
326315

327316
void Connection::activate()
328317
{
329-
REALM_ASSERT(m_on_idle);
330318
m_activated = true;
331319
if (m_num_active_sessions == 0)
332-
m_on_idle->trigger();
320+
m_on_idle.trigger();
333321
// We cannot in general connect immediately, because a prior failure to
334322
// connect may require a delay before reconnecting (see `m_reconnect_info`).
335323
initiate_reconnect_wait(); // Throws
@@ -371,7 +359,7 @@ void Connection::initiate_session_deactivation(Session* sess)
371359
}
372360
if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
373361
if (m_activated && m_state == ConnectionState::disconnected)
374-
m_on_idle->trigger();
362+
m_on_idle.trigger();
375363
}
376364
}
377365

@@ -695,22 +683,14 @@ void Connection::initiate_reconnect_wait()
695683
// We create a timer for the reconnect_disconnect timer even if the delay is zero because
696684
// we need it to be cancelable in case the connection is terminated before the timer
697685
// callback is run.
698-
m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
699-
// If the operation is aborted, the connection object may have been
700-
// destroyed.
701-
if (status != ErrorCodes::OperationAborted)
702-
handle_reconnect_wait(status); // Throws
703-
}); // Throws
686+
m_reconnect_disconnect_timer = m_client.create_timer(delay, [this] {
687+
handle_reconnect_wait(); // Throws
688+
}); // Throws
704689
}
705690

706691

707-
void Connection::handle_reconnect_wait(Status status)
692+
void Connection::handle_reconnect_wait()
708693
{
709-
if (!status.is_ok()) {
710-
REALM_ASSERT(status != ErrorCodes::OperationAborted);
711-
throw Exception(status);
712-
}
713-
714694
REALM_ASSERT(m_reconnect_delay_in_progress);
715695
m_reconnect_delay_in_progress = false;
716696

@@ -822,24 +802,15 @@ void Connection::initiate_connect_wait()
822802
// fully establish the connection (including SSL and WebSocket
823803
// handshakes). Without such a watchdog, connect operations could take very
824804
// long, or even indefinite time.
825-
milliseconds_type time = m_client.m_connect_timeout;
826-
827-
m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
828-
// If the operation is aborted, the connection object may have been
829-
// destroyed.
830-
if (status != ErrorCodes::OperationAborted)
831-
handle_connect_wait(status); // Throws
832-
}); // Throws
805+
std::chrono::milliseconds time(m_client.m_connect_timeout);
806+
m_connect_timer = m_client.create_timer(time, [this] {
807+
handle_connect_wait(); // Throws
808+
}); // Throws
833809
}
834810

835811

836-
void Connection::handle_connect_wait(Status status)
812+
void Connection::handle_connect_wait()
837813
{
838-
if (!status.is_ok()) {
839-
REALM_ASSERT(status != ErrorCodes::OperationAborted);
840-
throw Exception(status);
841-
}
842-
843814
REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
844815
logger.info("Connect timeout"); // Throws
845816
SessionErrorInfo error_info({ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
@@ -933,12 +904,7 @@ void Connection::initiate_ping_delay(milliseconds_type now)
933904

934905
m_ping_delay_in_progress = true;
935906

936-
m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
937-
if (status == ErrorCodes::OperationAborted)
938-
return;
939-
else if (!status.is_ok())
940-
throw Exception(status);
941-
907+
m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this] {
942908
handle_ping_delay(); // Throws
943909
}); // Throws
944910
logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
@@ -968,12 +934,7 @@ void Connection::initiate_pong_timeout()
968934
m_pong_wait_started_at = monotonic_clock_now();
969935

970936
milliseconds_type time = m_client.m_pong_keepalive_timeout;
971-
m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
972-
if (status == ErrorCodes::OperationAborted)
973-
return;
974-
else if (!status.is_ok())
975-
throw Exception(status);
976-
937+
m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this] {
977938
handle_pong_timeout(); // Throws
978939
}); // Throws
979940
}
@@ -1124,23 +1085,15 @@ void Connection::initiate_disconnect_wait()
11241085

11251086
milliseconds_type time = m_client.m_connection_linger_time;
11261087

1127-
m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
1128-
// If the operation is aborted, the connection object may have been
1129-
// destroyed.
1130-
if (status != ErrorCodes::OperationAborted)
1131-
handle_disconnect_wait(status); // Throws
1132-
}); // Throws
1088+
m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this] {
1089+
handle_disconnect_wait(); // Throws
1090+
}); // Throws
11331091
m_disconnect_delay_in_progress = true;
11341092
}
11351093

11361094

1137-
void Connection::handle_disconnect_wait(Status status)
1095+
void Connection::handle_disconnect_wait()
11381096
{
1139-
if (!status.is_ok()) {
1140-
REALM_ASSERT(status != ErrorCodes::OperationAborted);
1141-
throw Exception(status);
1142-
}
1143-
11441097
m_disconnect_delay_in_progress = false;
11451098

11461099
REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
@@ -2704,12 +2657,7 @@ void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
27042657
try_again_interval = std::chrono::milliseconds{1000};
27052658
}
27062659
logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
2707-
m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
2708-
if (status == ErrorCodes::OperationAborted)
2709-
return;
2710-
else if (!status.is_ok())
2711-
throw Exception(status);
2712-
2660+
m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this] {
27132661
m_try_again_activation_timer.reset();
27142662
cancel_resumption_delay();
27152663
});

src/realm/sync/noinst/client_impl_base.hpp

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -216,10 +216,8 @@ class ClientImpl {
216216
void post(SyncSocketProvider::FunctionHandler&& handler) REQUIRES(!m_drain_mutex);
217217
void post(util::UniqueFunction<void()>&& handler) REQUIRES(!m_drain_mutex);
218218
SyncSocketProvider::SyncTimer create_timer(std::chrono::milliseconds delay,
219-
SyncSocketProvider::FunctionHandler&& handler)
220-
REQUIRES(!m_drain_mutex);
221-
using SyncTrigger = std::unique_ptr<Trigger<ClientImpl>>;
222-
SyncTrigger create_trigger(SyncSocketProvider::FunctionHandler&& handler);
219+
util::UniqueFunction<void()>&& handler) REQUIRES(!m_drain_mutex);
220+
using SyncTrigger = Trigger<ClientImpl>;
223221

224222
RandomEngine& get_random() noexcept;
225223

@@ -521,10 +519,10 @@ class ClientImpl::Connection {
521519
std::string get_http_request_path() const;
522520

523521
void initiate_reconnect_wait();
524-
void handle_reconnect_wait(Status status);
522+
void handle_reconnect_wait();
525523
void initiate_reconnect();
526524
void initiate_connect_wait();
527-
void handle_connect_wait(Status status);
525+
void handle_connect_wait();
528526

529527
void handle_connection_established();
530528
void schedule_urgent_ping();
@@ -540,7 +538,7 @@ class ClientImpl::Connection {
540538
void handle_write_ping();
541539
void handle_message_received(util::Span<const char> data);
542540
void initiate_disconnect_wait();
543-
void handle_disconnect_wait(Status status);
541+
void handle_disconnect_wait();
544542
void close_due_to_protocol_error(Status status);
545543
void close_due_to_client_side_error(Status, IsFatal is_fatal, ConnectionTerminationReason reason);
546544
void close_due_to_transient_error(Status status, ConnectionTerminationReason reason);
@@ -1221,12 +1219,11 @@ inline void ClientImpl::Connection::involuntary_disconnect(const SessionErrorInf
12211219

12221220
inline void ClientImpl::Connection::change_state_to_disconnected() noexcept
12231221
{
1224-
REALM_ASSERT(m_on_idle);
12251222
REALM_ASSERT(m_state != ConnectionState::disconnected);
12261223
m_state = ConnectionState::disconnected;
12271224

12281225
if (m_num_active_sessions == 0)
1229-
m_on_idle->trigger();
1226+
m_on_idle.trigger();
12301227

12311228
REALM_ASSERT(!m_reconnect_delay_in_progress);
12321229
if (m_disconnect_delay_in_progress) {

0 commit comments

Comments
 (0)