Skip to content

Commit 172d4bb

Browse files
committed
adapter,test: support forcing builtin schema migrations
This commit adds support for forcing builtin schema migrations during tests, through a --force-builtin-schema-migration envd flag, which gets wired through to the builtin schema migration code. If the flag is set, all builtin collections are force-migrated using the selected mechanism. The existing mechanism to force builtin item migrations worked by modifying the fingerprints of builtins. This doesn't work anymore because fingerprint mismatches are no longer used to inform which migrations to run, only to verify all items have been correctly migrated. Thus, this commit also removes support for adding whitespace to builtin fingerprints.
1 parent cdee4f3 commit 172d4bb

File tree

13 files changed

+57
-79
lines changed

13 files changed

+57
-79
lines changed

misc/python/materialize/checks/scenarios_zero_downtime.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def actions(self) -> list[Action]:
9393
deploy_generation=1,
9494
mz_service="mz_2",
9595
system_parameter_defaults=system_parameter_defaults,
96-
force_migrations="all",
96+
force_migrations="replacement",
9797
),
9898
Manipulate(self, phase=1, mz_service="mz_1"),
9999
*wait_ready_and_promote("mz_2"),
@@ -102,7 +102,7 @@ def actions(self) -> list[Action]:
102102
deploy_generation=2,
103103
mz_service="mz_3",
104104
system_parameter_defaults=system_parameter_defaults,
105-
force_migrations="all",
105+
force_migrations="replacement",
106106
),
107107
Manipulate(self, phase=2, mz_service="mz_2"),
108108
*wait_ready_and_promote("mz_3"),
@@ -111,7 +111,7 @@ def actions(self) -> list[Action]:
111111
deploy_generation=3,
112112
mz_service="mz_4",
113113
system_parameter_defaults=system_parameter_defaults,
114-
force_migrations="all",
114+
force_migrations="replacement",
115115
),
116116
Validate(self, mz_service="mz_3"),
117117
*wait_ready_and_promote("mz_4"),

misc/python/materialize/mzcompose/services/materialized.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ def __init__(
220220

221221
if force_migrations is not None and image is None:
222222
command += [
223-
f"--unsafe-builtin-table-fingerprint-whitespace={force_migrations}",
223+
f"--unsafe-force-builtin-schema-migration={force_migrations}",
224224
]
225225
if not unsafe_mode:
226226
command += ["--unsafe-mode"]

src/adapter/src/catalog.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,7 @@ impl Catalog {
721721
builtin_item_migration_config: BuiltinItemMigrationConfig {
722722
persist_client: persist_client.clone(),
723723
read_only,
724+
force_migration: None,
724725
},
725726
persist_client,
726727
enable_expression_cache_override,

src/adapter/src/catalog/open/builtin_schema_migration.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,14 +240,20 @@ impl<'a, 'b> Migration<'a, 'b> {
240240

241241
self.validate_migration_steps(steps);
242242

243-
if self.source_version == self.target_version {
243+
let (force, plan) = match self.config.force_migration.as_deref() {
244+
None => (false, self.plan_migration(steps)),
245+
Some("evolution") => (true, self.plan_forced_migration(Mechanism::Evolution)),
246+
Some("replacement") => (true, self.plan_forced_migration(Mechanism::Replacement)),
247+
Some(other) => panic!("unknown force migration mechanism: {other}"),
248+
};
249+
250+
if self.source_version == self.target_version && !force {
244251
info!("skipping migration: already at target version");
245252
return Ok(Default::default());
246253
} else if self.source_version > self.target_version {
247254
bail!("downgrade not supported");
248255
}
249256

250-
let plan = self.plan_migration(steps);
251257
info!("executing migration plan: {plan:?}");
252258

253259
self.migrate_evolve(&plan.evolve).await?;
@@ -346,6 +352,24 @@ impl<'a, 'b> Migration<'a, 'b> {
346352
plan
347353
}
348354

355+
/// Plan a forced migration of all objects using the given mechanism.
356+
fn plan_forced_migration(&self, mechanism: Mechanism) -> Plan {
357+
let objects = self
358+
.builtins
359+
.iter()
360+
.filter(|(_, builtin)| matches!(builtin, Builtin::Table(..) | Builtin::Source(..)))
361+
.map(|(object, _)| object.clone())
362+
.collect();
363+
364+
let mut plan = Plan::default();
365+
match mechanism {
366+
Mechanism::Evolution => plan.evolve = objects,
367+
Mechanism::Replacement => plan.replace = objects,
368+
}
369+
370+
plan
371+
}
372+
349373
/// Migrate the given objects using the `Evolution` mechanism.
350374
async fn migrate_evolve(&self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> {
351375
for object in objects {

src/adapter/src/coord.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,6 +1066,7 @@ pub struct Config {
10661066
pub helm_chart_version: Option<String>,
10671067
pub license_key: ValidatedLicenseKey,
10681068
pub external_login_password_mz_system: Option<Password>,
1069+
pub force_builtin_schema_migration: Option<String>,
10691070
}
10701071

10711072
/// Metadata about an active connection.
@@ -4014,6 +4015,7 @@ pub fn serve(
40144015
helm_chart_version,
40154016
license_key,
40164017
external_login_password_mz_system,
4018+
force_builtin_schema_migration,
40174019
}: Config,
40184020
) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
40194021
async move {
@@ -4121,6 +4123,7 @@ pub fn serve(
41214123
BuiltinItemMigrationConfig {
41224124
persist_client: persist_client.clone(),
41234125
read_only: read_only_controllers,
4126+
force_migration: force_builtin_schema_migration,
41244127
}
41254128
;
41264129
let OpenCatalogResult {

src/catalog-debug/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,7 @@ async fn upgrade_check(
651651
// client.
652652
persist_client: PersistClient::new_for_tests().await,
653653
read_only,
654+
force_migration: None,
654655
},
655656
persist_client: persist_client.clone(),
656657
enable_expression_cache_override: None,

src/catalog/src/builtin.rs

Lines changed: 1 addition & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@ use std::collections::BTreeMap;
2828
use std::hash::Hash;
2929
use std::string::ToString;
3030
use std::sync::LazyLock;
31-
use std::sync::Mutex;
3231

33-
use clap::clap_derive::ValueEnum;
3432
use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
3533
use mz_ore::collections::HashMap;
3634
use mz_pgrepr::oid;
@@ -61,7 +59,6 @@ use mz_storage_client::healthcheck::{
6159
REPLICA_STATUS_HISTORY_DESC, WALLCLOCK_LAG_HISTORY_DESC,
6260
};
6361
use mz_storage_client::statistics::{MZ_SINK_STATISTICS_RAW_DESC, MZ_SOURCE_STATISTICS_RAW_DESC};
64-
use rand::Rng;
6562
use serde::Serialize;
6663

6764
use crate::durable::objects::SystemObjectDescription;
@@ -350,48 +347,9 @@ impl Fingerprint for &BuiltinLog {
350347
}
351348
}
352349

353-
/// Allows tests to inject arbitrary amounts of whitespace to forcibly change the fingerprint and
354-
/// trigger a builtin migration.
355-
#[derive(Debug, Clone, ValueEnum)]
356-
pub enum UnsafeBuiltinTableFingerprintWhitespace {
357-
/// Inject whitespace into all builtin table fingerprints.
358-
All,
359-
/// Inject whitespace into half of the builtin table fingerprints,
360-
/// which are randomly selected.
361-
Half,
362-
}
363-
pub static UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE: Mutex<
364-
Option<(UnsafeBuiltinTableFingerprintWhitespace, String)>,
365-
> = Mutex::new(None);
366-
367350
impl Fingerprint for &BuiltinTable {
368351
fn fingerprint(&self) -> String {
369-
// This is only called during bootstrapping, so it's not that big of a deal to lock a mutex,
370-
// though it's not great.
371-
let guard = UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE
372-
.lock()
373-
.expect("lock poisoned");
374-
match &*guard {
375-
// `mz_storage_usage_by_shard` can never be migrated.
376-
_ if self.schema == MZ_STORAGE_USAGE_BY_SHARD.schema
377-
&& self.name == MZ_STORAGE_USAGE_BY_SHARD.name =>
378-
{
379-
self.desc.fingerprint()
380-
}
381-
Some((UnsafeBuiltinTableFingerprintWhitespace::All, whitespace)) => {
382-
format!("{}{}", self.desc.fingerprint(), whitespace)
383-
}
384-
Some((UnsafeBuiltinTableFingerprintWhitespace::Half, whitespace)) => {
385-
let mut rng = rand::thread_rng();
386-
let migrate: bool = rng.r#gen();
387-
if migrate {
388-
format!("{}{}", self.desc.fingerprint(), whitespace)
389-
} else {
390-
self.desc.fingerprint()
391-
}
392-
}
393-
None => self.desc.fingerprint(),
394-
}
352+
self.desc.fingerprint()
395353
}
396354
}
397355

src/catalog/src/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ pub struct StateConfig {
104104
pub struct BuiltinItemMigrationConfig {
105105
pub persist_client: PersistClient,
106106
pub read_only: bool,
107+
pub force_migration: Option<String>,
107108
}
108109

109110
#[derive(Debug, Clone, Serialize)]

src/environmentd/src/environmentd/main.rs

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,6 @@ use mz_adapter_types::bootstrap_builtin_cluster_config::{
3838
use mz_auth::password::Password;
3939
use mz_aws_secrets_controller::AwsSecretsController;
4040
use mz_build_info::BuildInfo;
41-
use mz_catalog::builtin::{
42-
UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE,
43-
UnsafeBuiltinTableFingerprintWhitespace,
44-
};
4541
use mz_catalog::config::ClusterReplicaSizeMap;
4642
use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceController};
4743
use mz_controller::ControllerConfig;
@@ -617,21 +613,13 @@ pub struct Args {
617613
tracing: TracingCliArgs,
618614

619615
// === Testing options. ===
620-
/// Injects arbitrary whitespace into builtin table fingerprints, which can
621-
/// trigger builtin item migrations. The amount of whitespace is determined
622-
/// by
623-
/// `unsafe_builtin_table_fingerprint_whitespace_version`.
616+
/// Forces the migration of all builtin storage collections using the
617+
/// specified migration mechanism (either "evolution" or "replacement").
618+
///
624619
/// This argument is meant for testing only and as the names suggests
625620
/// should not be set in production.
626621
#[clap(long, value_enum, requires = "unsafe_mode")]
627-
unsafe_builtin_table_fingerprint_whitespace: Option<UnsafeBuiltinTableFingerprintWhitespace>,
628-
/// Controls the amount of whitespace injected by
629-
/// `unsafe_builtin_table_fingerprint_whitespace`.
630-
/// Incrementing this value can allow triggering multiple builtin
631-
/// migrations from a single test. This argument is meant for testing only
632-
/// and as the names suggests should not be set in production.
633-
#[clap(long, requires = "unsafe_mode", default_value = "1")]
634-
unsafe_builtin_table_fingerprint_whitespace_version: usize,
622+
unsafe_force_builtin_schema_migration: Option<String>,
635623
}
636624

637625
#[derive(ValueEnum, Debug, Clone)]
@@ -695,13 +683,9 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> {
695683
};
696684

697685
// Configure testing options.
698-
if let Some(fingerprint_whitespace) = args.unsafe_builtin_table_fingerprint_whitespace {
699-
assert!(args.unsafe_mode);
700-
let whitespace = "\n".repeat(args.unsafe_builtin_table_fingerprint_whitespace_version);
701-
*UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE
702-
.lock()
703-
.expect("lock poisoned") = Some((fingerprint_whitespace, whitespace));
704-
}
686+
let force_builtin_schema_migration = args
687+
.unsafe_force_builtin_schema_migration
688+
.inspect(|_mechanism| assert!(args.unsafe_mode));
705689

706690
// Start Tokio runtime.
707691

@@ -1168,6 +1152,7 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> {
11681152
tracing_handle,
11691153
// Testing options.
11701154
now,
1155+
force_builtin_schema_migration,
11711156
})
11721157
.await
11731158
.maybe_terminate("booting server")?;

src/environmentd/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,9 @@ pub struct Config {
209209
// === Testing options. ===
210210
/// A now generation function for mocking time.
211211
pub now: NowFn,
212+
/// If `Some`, force running builtin schema migration using the specified
213+
/// migration mechanism ("evolution" or "replacement").
214+
pub force_builtin_schema_migration: Option<String>,
212215
}
213216

214217
/// Configuration for the Catalog.
@@ -786,6 +789,7 @@ impl Listeners {
786789
helm_chart_version: config.helm_chart_version.clone(),
787790
license_key: config.license_key,
788791
external_login_password_mz_system: config.external_login_password_mz_system,
792+
force_builtin_schema_migration: config.force_builtin_schema_migration,
789793
})
790794
.instrument(info_span!("adapter::serve"))
791795
.await?;

0 commit comments

Comments
 (0)