Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions misc/python/materialize/checks/scenarios_zero_downtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def actions(self) -> list[Action]:
deploy_generation=1,
mz_service="mz_2",
system_parameter_defaults=system_parameter_defaults,
force_migrations="all",
force_migrations="replacement",
),
Manipulate(self, phase=1, mz_service="mz_1"),
*wait_ready_and_promote("mz_2"),
Expand All @@ -105,7 +105,7 @@ def actions(self) -> list[Action]:
deploy_generation=2,
mz_service="mz_3",
system_parameter_defaults=system_parameter_defaults,
force_migrations="all",
force_migrations="replacement",
),
Manipulate(self, phase=2, mz_service="mz_2"),
*wait_ready_and_promote("mz_3"),
Expand All @@ -114,7 +114,7 @@ def actions(self) -> list[Action]:
deploy_generation=3,
mz_service="mz_4",
system_parameter_defaults=system_parameter_defaults,
force_migrations="all",
force_migrations="replacement",
),
Validate(self, mz_service="mz_3"),
*wait_ready_and_promote("mz_4"),
Expand Down
1 change: 0 additions & 1 deletion misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,6 @@ def get_default_system_parameters(
"compute_peek_stash_batch_size",
"storage_statistics_retention_duration",
"enable_paused_cluster_readhold_downgrade",
"enable_builtin_migration_schema_evolution",
]


Expand Down
2 changes: 1 addition & 1 deletion misc/python/materialize/mzcompose/services/materialized.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def __init__(

if force_migrations is not None and image is None:
command += [
f"--unsafe-builtin-table-fingerprint-whitespace={force_migrations}",
f"--unsafe-force-builtin-schema-migration={force_migrations}",
]
if not unsafe_mode:
command += ["--unsafe-mode"]
Expand Down
1 change: 0 additions & 1 deletion misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1512,7 +1512,6 @@ def __init__(
"storage_statistics_retention_duration",
"enable_paused_cluster_readhold_downgrade",
"enable_with_ordinality_legacy_fallback",
"enable_builtin_migration_schema_evolution",
]

def run(self, exe: Executor) -> bool:
Expand Down
7 changes: 0 additions & 7 deletions src/adapter-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,6 @@ pub const PERSIST_FAST_PATH_ORDER: Config<bool> = Config::new(
"If set, send queries with a compatible literal constraint or ordering clause down the Persist fast path.",
);

pub const ENABLE_BUILTIN_MIGRATION_SCHEMA_EVOLUTION: Config<bool> = Config::new(
"enable_builtin_migration_schema_evolution",
true,
"Whether to attempt persist schema evolution for migration of builtin storage collections.",
);

/// Adds the full set of all adapter `Config`s.
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs
Expand All @@ -157,5 +151,4 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&ENABLE_PASSWORD_AUTH)
.add(&CONSTRAINT_BASED_TIMESTAMP_SELECTION)
.add(&PERSIST_FAST_PATH_ORDER)
.add(&ENABLE_BUILTIN_MIGRATION_SCHEMA_EVOLUTION)
}
2 changes: 2 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ impl Catalog {
unsafe_mode: true,
all_features: false,
build_info,
deploy_generation: 0,
environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
read_only,
now,
Expand Down Expand Up @@ -721,6 +722,7 @@ impl Catalog {
builtin_item_migration_config: BuiltinItemMigrationConfig {
persist_client: persist_client.clone(),
read_only,
force_migration: None,
},
persist_client,
enable_expression_cache_override,
Expand Down
83 changes: 26 additions & 57 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

//! Logic related to opening a [`Catalog`].

mod builtin_item_migration;
mod builtin_schema_migration;

use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
Expand All @@ -23,7 +23,7 @@ use mz_auth::hash::scram256_hash;
use mz_catalog::SYSTEM_CONN_ID;
use mz_catalog::builtin::{
BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin,
Fingerprint, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
Fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
};
use mz_catalog::config::StateConfig;
use mz_catalog::durable::objects::{
Expand Down Expand Up @@ -61,9 +61,6 @@ use uuid::Uuid;
// DO NOT add any more imports from `crate` outside of `crate::catalog`.
use crate::AdapterError;
use crate::catalog::migrate::{self, get_migration_version, set_migration_version};
use crate::catalog::open::builtin_item_migration::{
BuiltinItemMigrationResult, migrate_builtin_items,
};
use crate::catalog::state::LocalExpressionCache;
use crate::catalog::{
BuiltinTableUpdate, Catalog, CatalogPlans, CatalogState, Config, is_reserved_name,
Expand Down Expand Up @@ -184,12 +181,14 @@ impl Catalog {
license_key: config.license_key,
};

let deploy_generation = storage.get_deployment_generation().await?;

let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
assert!(!updates.is_empty(), "initial catalog snapshot is missing");
let mut txn = storage.transaction().await?;

// Migrate/update durable data before we start loading the in-memory catalog.
let (migrated_builtins, new_builtin_collections) = {
let new_builtin_collections = {
migrate::durable_migrate(
&mut txn,
state.config.environment_id.organization_id(),
Expand All @@ -204,7 +203,7 @@ impl Catalog {
txn.set_system_config_synced_once()?;
}
// Add any new builtin objects and remove old ones.
let (migrated_builtins, new_builtin_collections) =
let new_builtin_collections =
add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
system_cluster: config.builtin_system_cluster_config,
Expand All @@ -224,9 +223,10 @@ impl Catalog {
)?;
add_new_remove_old_builtin_roles_migration(&mut txn)?;
remove_invalid_config_param_role_defaults_migration(&mut txn)?;
(migrated_builtins, new_builtin_collections)
remove_pending_cluster_replicas_migration(&mut txn)?;

new_builtin_collections
};
remove_pending_cluster_replicas_migration(&mut txn)?;

let op_updates = txn.get_and_commit_op_updates();
updates.extend(op_updates);
Expand Down Expand Up @@ -431,7 +431,7 @@ impl Catalog {
)
.await
.map_err(|e| {
Error::new(ErrorKind::FailedMigration {
Error::new(ErrorKind::FailedCatalogMigration {
last_seen_version: last_seen_version.clone(),
this_version: config.build_info.version,
cause: e.to_string(),
Expand Down Expand Up @@ -481,31 +481,32 @@ impl Catalog {
}

// Migrate builtin items.
let BuiltinItemMigrationResult {
builtin_table_updates: builtin_table_update,
migrated_storage_collections_0dt,
cleanup_action,
} = migrate_builtin_items(
&mut state,
let schema_migration_result = builtin_schema_migration::run(
config.build_info,
deploy_generation,
&mut txn,
&mut local_expr_cache,
migrated_builtins,
config.builtin_item_migration_config,
)
.await?;
builtin_table_updates.extend(builtin_table_update);

let state_updates = txn.get_and_commit_op_updates();
let table_updates = state
.apply_updates_for_bootstrap(state_updates, &mut local_expr_cache)
.await;
builtin_table_updates.extend(table_updates);
let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);

// Bump the migration version immediately before committing.
set_migration_version(&mut txn, config.build_info.semver_version())?;

txn.commit(config.boot_ts).await?;

cleanup_action.await;
// Now that the migration is durable, run any requested deferred cleanup.
schema_migration_result.cleanup_action.await;

Ok(InitializeStateResult {
state,
migrated_storage_collections_0dt,
migrated_storage_collections_0dt: schema_migration_result.replaced_items,
new_builtin_collections: new_builtin_collections.into_iter().collect(),
builtin_table_updates,
last_seen_version,
Expand Down Expand Up @@ -711,19 +712,17 @@ impl CatalogState {

/// Updates the catalog with new and removed builtin items.
///
/// Returns the list of builtin [`GlobalId`]s that need to be migrated, and the list of new builtin
/// [`GlobalId`]s.
/// Returns the list of new builtin [`GlobalId`]s.
fn add_new_remove_old_builtin_items_migration(
builtins_cfg: &BuiltinsConfig,
txn: &mut mz_catalog::durable::Transaction<'_>,
) -> Result<(Vec<CatalogItemId>, Vec<GlobalId>), mz_catalog::durable::CatalogError> {
) -> Result<Vec<GlobalId>, mz_catalog::durable::CatalogError> {
let mut new_builtin_mappings = Vec::new();
let mut migrated_builtin_ids = Vec::new();
// Used to validate unique descriptions.
let mut builtin_descs = HashSet::new();

// We compare the builtin items that are compiled into the binary with the builtin items that
// are persisted in the catalog to discover new, deleted, and migrated builtin items.
// are persisted in the catalog to discover new and deleted builtin items.
let mut builtins = Vec::new();
for builtin in BUILTINS::iter(builtins_cfg) {
let desc = SystemObjectDescription {
Expand Down Expand Up @@ -775,36 +774,6 @@ fn add_new_remove_old_builtin_items_migration(
.zip_eq(new_builtin_ids.clone())
.collect();

// Look for migrated builtins.
for (builtin, system_object_mapping, fingerprint) in existing_builtins.iter().cloned() {
if system_object_mapping.unique_identifier.fingerprint != fingerprint {
// `mz_storage_usage_by_shard` cannot be migrated for multiple reasons. Firstly,
// it was cause the table to be truncated because the contents are not also
// stored in the durable catalog. Secondly, we prune `mz_storage_usage_by_shard`
// of old events in the background on startup. The correctness of that pruning
// relies on there being no other retractions to `mz_storage_usage_by_shard`.
assert_ne!(
*MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, system_object_mapping.description,
"mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
);
assert_ne!(
builtin.catalog_item_type(),
CatalogItemType::Type,
"types cannot be migrated"
);
assert_ne!(
system_object_mapping.unique_identifier.fingerprint,
RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
"clearing the runtime alterable flag on an existing object is not permitted",
);
assert!(
!builtin.runtime_alterable(),
"setting the runtime alterable flag on an existing object is not permitted"
);
migrated_builtin_ids.push(system_object_mapping.unique_identifier.catalog_id);
}
}

// Add new builtin items to catalog.
for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() {
new_builtin_mappings.push(SystemObjectMapping {
Expand Down Expand Up @@ -953,7 +922,7 @@ fn add_new_remove_old_builtin_items_migration(
.map(|(_catalog_id, global_id)| global_id)
.collect();

Ok((migrated_builtin_ids, new_builtin_collections))
Ok(new_builtin_collections)
}

fn add_new_remove_old_builtin_clusters_migration(
Expand Down
Loading