From 4c96a9e278e2350d5278a72c56cddffe85afb986 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Fri, 8 Aug 2025 16:54:02 +0200 Subject: [PATCH 1/4] smoketests: Smoketest enable replication on existing database --- smoketests/__init__.py | 5 +++-- smoketests/tests/replication.py | 38 +++++++++++++++++++++++++++++---- 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/smoketests/__init__.py b/smoketests/__init__.py index 58e61ffa72d..8b7196c3531 100644 --- a/smoketests/__init__.py +++ b/smoketests/__init__.py @@ -216,7 +216,7 @@ def log_records(self, n): logs = self.spacetime("logs", "--format=json", "-n", str(n), "--", self.database_identity) return list(map(json.loads, logs.splitlines())) - def publish_module(self, domain=None, *, clear=True, capture_stderr=True): + def publish_module(self, domain=None, *, clear=True, capture_stderr=True, num_replicas=None): print("publishing module", self.publish_module) publish_output = self.spacetime( "publish", @@ -227,10 +227,11 @@ def publish_module(self, domain=None, *, clear=True, capture_stderr=True): # because the server address is `node` which doesn't look like `localhost` or `127.0.0.1` # and so the publish step prompts for confirmation. "--yes", + *["--num-replicas", f"{num_replicas}"] if num_replicas is not None else [], capture_stderr=capture_stderr, ) self.resolved_identity = re.search(r"identity: ([0-9a-fA-F]+)", publish_output)[1] - self.database_identity = domain if domain is not None else self.resolved_identity + self.database_identity = self.resolved_identity @classmethod def reset_config(cls): diff --git a/smoketests/tests/replication.py b/smoketests/tests/replication.py index 1936a6f0c42..dfb1c6619f0 100644 --- a/smoketests/tests/replication.py +++ b/smoketests/tests/replication.py @@ -1,9 +1,10 @@ -from .. import COMPOSE_FILE, Smoketest, requires_docker, spacetime, parse_sql_result -from ..docker import DockerManager - import time -from typing import Callable import unittest +from typing import Callable +import json + +from .. import COMPOSE_FILE, Smoketest, requires_docker, spacetime, parse_sql_result +from ..docker import DockerManager def retry(func: Callable, max_retries: int = 3, retry_delay: int = 2): """Retry a function on failure with delay.""" @@ -240,6 +241,9 @@ def start(self, id: int, count: int): def collect_counter_rows(self): return int_vals(self.cluster.sql("select * from counter")) + def call_control(self, reducer, *args): + self.spacetime("call", "spacetime-control", reducer, *map(json.dumps, args)) + class LeaderElection(ReplicationTest): def test_leader_election_in_loop(self): @@ -393,3 +397,29 @@ def test_quorum_loss(self): with self.assertRaises(Exception): for i in range(1001): self.call("send_message", "terminal") + + +class EnableReplication(ReplicationTest): + AUTOPUBLISH = False + + def test_enable_replication(self): + """Tests enabling replication on an un-replicated database""" + + name = random_string() + + self.publish_module(name, num_replicas = 1) + leader = self.cluster.wait_for_leader_change(None) + + n1 = 100 + n2 = 100 + self.start(1, n1) + + self.call_control("enable_replication", {"Name": name}, 3) + + self.cluster.wait_for_leader_change(leader) + self.start(2, n2) + + time.sleep(3) + + rows = self.collect_counter_rows() + self.assertEqual([{"id": 1, "value": n1}, {"id": 2, "value": n2}], rows) From 518e63fc9d6da82b8fe7b8c773c83167c56586d8 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Sat, 9 Aug 2025 17:21:28 +0200 Subject: [PATCH 2/4] smoketests: Wait for the counter reaching the expected values, instead of sleeping --- smoketests/tests/replication.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/smoketests/tests/replication.py b/smoketests/tests/replication.py index dfb1c6619f0..85a94a62d88 100644 --- a/smoketests/tests/replication.py +++ b/smoketests/tests/replication.py @@ -114,6 +114,18 @@ def ensure_leader_health(self, id): # TODO: Replace with confirmed read. time.sleep(0.6) + def wait_counter_value(self, id, value, max_attempts=10, delay=1): + """Wait for the value for `id` in the counter table to reach `value`""" + + for _ in range(max_attempts): + rows = self.sql(f"select * from counter where id={id}") + if len(rows) >= 1 and int(rows[0]['value']) >= value: + return + else: + time.sleep(delay) + + raise ValueError(f"Counter {id} below {value}") + def fail_leader(self, action='kill'): """Force leader failure through either killing or network disconnect.""" @@ -410,16 +422,18 @@ def test_enable_replication(self): self.publish_module(name, num_replicas = 1) leader = self.cluster.wait_for_leader_change(None) - n1 = 100 + n1 = 1_000 n2 = 100 self.start(1, n1) + self.cluster.wait_counter_value(1, n1, max_attempts=10, delay=10) + self.call_control("enable_replication", {"Name": name}, 3) self.cluster.wait_for_leader_change(leader) self.start(2, n2) - time.sleep(3) + self.cluster.wait_counter_value(2, n2) rows = self.collect_counter_rows() self.assertEqual([{"id": 1, "value": n1}, {"id": 2, "value": n2}], rows) From 5c91881c3fac9e23340295597acc54f3c461e2a7 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 13 Aug 2025 08:15:34 +0200 Subject: [PATCH 3/4] Add durable_tx_offset getter to module host --- crates/core/src/host/module_host.rs | 5 ++++ smoketests/tests/replication.py | 42 ++++++++++++++++++----------- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 44645bd3a39..7bbcf2e6fdb 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -32,6 +32,7 @@ use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData}; +use spacetimedb_durability::DurableOffset; use spacetimedb_execution::pipelined::PipelinedProject; use spacetimedb_lib::db::raw_def::v9::Lifecycle; use spacetimedb_lib::identity::{AuthCtx, RequestId}; @@ -1233,6 +1234,10 @@ impl ModuleHost { &self.replica_ctx().database } + pub fn durable_tx_offset(&self) -> Option { + self.replica_ctx().relational_db.durable_tx_offset() + } + pub(crate) fn replica_ctx(&self) -> &ReplicaContext { self.module.replica_ctx() } diff --git a/smoketests/tests/replication.py b/smoketests/tests/replication.py index 85a94a62d88..9cdfd3cb3c1 100644 --- a/smoketests/tests/replication.py +++ b/smoketests/tests/replication.py @@ -3,7 +3,7 @@ from typing import Callable import json -from .. import COMPOSE_FILE, Smoketest, requires_docker, spacetime, parse_sql_result +from .. import COMPOSE_FILE, Smoketest, random_string, requires_docker, spacetime, parse_sql_result from ..docker import DockerManager def retry(func: Callable, max_retries: int = 3, retry_delay: int = 2): @@ -414,26 +414,36 @@ def test_quorum_loss(self): class EnableReplication(ReplicationTest): AUTOPUBLISH = False + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.expected_counter_rows = [] + + def run_counter(self, id, n = 100): + self.start(id, n) + self.cluster.wait_counter_value(id, n) + self.expected_counter_rows.append({"id": id, "value": n}) + self.assertEqual(self.collect_counter_rows(), self.expected_counter_rows) + def test_enable_replication(self): - """Tests enabling replication on an un-replicated database""" + """Tests enabling and disabling replication""" name = random_string() self.publish_module(name, num_replicas = 1) - leader = self.cluster.wait_for_leader_change(None) - - n1 = 1_000 - n2 = 100 - self.start(1, n1) + self.cluster.wait_for_leader_change(None) - self.cluster.wait_counter_value(1, n1, max_attempts=10, delay=10) + self.add_me_as_admin() + n = 100 + # start un-replicated + self.run_counter(1, n) + # enable replication self.call_control("enable_replication", {"Name": name}, 3) - - self.cluster.wait_for_leader_change(leader) - self.start(2, n2) - - self.cluster.wait_counter_value(2, n2) - - rows = self.collect_counter_rows() - self.assertEqual([{"id": 1, "value": n1}, {"id": 2, "value": n2}], rows) + self.run_counter(2, n) + # disable replication + self.call_control("disable_replication", {"Name": name }) + self.run_counter(3, n) + # enable it one more time + self.call_control("enable_replication", {"Name": name}, 3) + self.run_counter(4, n) From 1bfe3dfc9d5bd147372b21eaa37a170a0b3eeb7f Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Thu, 11 Sep 2025 12:04:50 +0200 Subject: [PATCH 4/4] smoketest: Exercise suspend->enable replication->unsuspend --- smoketests/tests/replication.py | 44 ++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/smoketests/tests/replication.py b/smoketests/tests/replication.py index 9cdfd3cb3c1..5fd1e24a03b 100644 --- a/smoketests/tests/replication.py +++ b/smoketests/tests/replication.py @@ -428,14 +428,13 @@ def run_counter(self, id, n = 100): def test_enable_replication(self): """Tests enabling and disabling replication""" + self.add_me_as_admin() name = random_string() + n = 100 self.publish_module(name, num_replicas = 1) self.cluster.wait_for_leader_change(None) - self.add_me_as_admin() - n = 100 - # start un-replicated self.run_counter(1, n) # enable replication @@ -447,3 +446,42 @@ def test_enable_replication(self): # enable it one more time self.call_control("enable_replication", {"Name": name}, 3) self.run_counter(4, n) + + +class EnableReplicationSuspended(ReplicationTest): + AUTOPUBLISH = False + + def test_enable_replication_on_suspended_database(self): + """Tests that we can enable replication on a suspended database""" + + self.add_me_as_admin() + name = random_string() + + self.publish_module(name, num_replicas = 1) + self.cluster.wait_for_leader_change(None) + self.cluster.ensure_leader_health(1) + + id = self.cluster.get_db_id() + + self.call_control("suspend_database", {"Name": name}) + # Database is now unreachable. + with self.assertRaises(Exception): + self.call("send_message", "hi") + + self.call_control("enable_replication", {"Name": name}, 3) + # Still unreachable until we call unsuspend. + with self.assertRaises(Exception): + self.call("send_message", "hi") + + self.call_control("unsuspend_database", {"Name": name}) + self.cluster.wait_for_leader_change(None) + self.cluster.ensure_leader_health(2) + + # We can't direcly observe that there are indeed three replicas running, + # so as a sanity check inspect the event log. + rows = self.cluster.read_controldb( + f"select message from staged_enable_replication_event where database_id={id}") + self.assertEqual(rows, [ + {'message': '"bootstrap requested"'}, + {'message': '"bootstrap complete"'}, + ])