diff --git a/misc/python/materialize/checks/scenarios_zero_downtime.py b/misc/python/materialize/checks/scenarios_zero_downtime.py index f5a50d0e66d6c..0acfb9c6a1d0c 100644 --- a/misc/python/materialize/checks/scenarios_zero_downtime.py +++ b/misc/python/materialize/checks/scenarios_zero_downtime.py @@ -96,7 +96,7 @@ def actions(self) -> list[Action]: deploy_generation=1, mz_service="mz_2", system_parameter_defaults=system_parameter_defaults, - force_migrations="all", + force_migrations="replacement", ), Manipulate(self, phase=1, mz_service="mz_1"), *wait_ready_and_promote("mz_2"), @@ -105,7 +105,7 @@ def actions(self) -> list[Action]: deploy_generation=2, mz_service="mz_3", system_parameter_defaults=system_parameter_defaults, - force_migrations="all", + force_migrations="replacement", ), Manipulate(self, phase=2, mz_service="mz_2"), *wait_ready_and_promote("mz_3"), @@ -114,7 +114,7 @@ def actions(self) -> list[Action]: deploy_generation=3, mz_service="mz_4", system_parameter_defaults=system_parameter_defaults, - force_migrations="all", + force_migrations="replacement", ), Validate(self, mz_service="mz_3"), *wait_ready_and_promote("mz_4"), diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index 26158fbe58834..d2e38753530b7 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -577,7 +577,6 @@ def get_default_system_parameters( "compute_peek_stash_batch_size", "storage_statistics_retention_duration", "enable_paused_cluster_readhold_downgrade", - "enable_builtin_migration_schema_evolution", ] diff --git a/misc/python/materialize/mzcompose/services/materialized.py b/misc/python/materialize/mzcompose/services/materialized.py index f04523b0e18f9..9e943cac8b564 100644 --- a/misc/python/materialize/mzcompose/services/materialized.py +++ b/misc/python/materialize/mzcompose/services/materialized.py @@ -220,7 +220,7 @@ def __init__( if force_migrations is not None and image is None: command += [ - f"--unsafe-builtin-table-fingerprint-whitespace={force_migrations}", + f"--unsafe-force-builtin-schema-migration={force_migrations}", ] if not unsafe_mode: command += ["--unsafe-mode"] diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index 1f31dfe342f72..eb76fc7a05682 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -1512,7 +1512,6 @@ def __init__( "storage_statistics_retention_duration", "enable_paused_cluster_readhold_downgrade", "enable_with_ordinality_legacy_fallback", - "enable_builtin_migration_schema_evolution", ] def run(self, exe: Executor) -> bool: diff --git a/src/adapter-types/src/dyncfgs.rs b/src/adapter-types/src/dyncfgs.rs index 83c8448a30141..368e1c7588fb2 100644 --- a/src/adapter-types/src/dyncfgs.rs +++ b/src/adapter-types/src/dyncfgs.rs @@ -131,12 +131,6 @@ pub const PERSIST_FAST_PATH_ORDER: Config = Config::new( "If set, send queries with a compatible literal constraint or ordering clause down the Persist fast path.", ); -pub const ENABLE_BUILTIN_MIGRATION_SCHEMA_EVOLUTION: Config = Config::new( - "enable_builtin_migration_schema_evolution", - true, - "Whether to attempt persist schema evolution for migration of builtin storage collections.", -); - /// Adds the full set of all adapter `Config`s. pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { configs @@ -157,5 +151,4 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(&ENABLE_PASSWORD_AUTH) .add(&CONSTRAINT_BASED_TIMESTAMP_SELECTION) .add(&PERSIST_FAST_PATH_ORDER) - .add(&ENABLE_BUILTIN_MIGRATION_SCHEMA_EVOLUTION) } diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index d7251011706f8..26e90758c6afe 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -684,6 +684,7 @@ impl Catalog { unsafe_mode: true, all_features: false, build_info, + deploy_generation: 0, environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests), read_only, now, @@ -721,6 +722,7 @@ impl Catalog { builtin_item_migration_config: BuiltinItemMigrationConfig { persist_client: persist_client.clone(), read_only, + force_migration: None, }, persist_client, enable_expression_cache_override, diff --git a/src/adapter/src/catalog/open.rs b/src/adapter/src/catalog/open.rs index ad63ea51c717b..c74efddbefaac 100644 --- a/src/adapter/src/catalog/open.rs +++ b/src/adapter/src/catalog/open.rs @@ -9,7 +9,7 @@ //! Logic related to opening a [`Catalog`]. -mod builtin_item_migration; +mod builtin_schema_migration; use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; @@ -23,7 +23,7 @@ use mz_auth::hash::scram256_hash; use mz_catalog::SYSTEM_CONN_ID; use mz_catalog::builtin::{ BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin, - Fingerprint, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL, + Fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL, }; use mz_catalog::config::StateConfig; use mz_catalog::durable::objects::{ @@ -61,9 +61,6 @@ use uuid::Uuid; // DO NOT add any more imports from `crate` outside of `crate::catalog`. use crate::AdapterError; use crate::catalog::migrate::{self, get_migration_version, set_migration_version}; -use crate::catalog::open::builtin_item_migration::{ - BuiltinItemMigrationResult, migrate_builtin_items, -}; use crate::catalog::state::LocalExpressionCache; use crate::catalog::{ BuiltinTableUpdate, Catalog, CatalogPlans, CatalogState, Config, is_reserved_name, @@ -184,12 +181,14 @@ impl Catalog { license_key: config.license_key, }; + let deploy_generation = storage.get_deployment_generation().await?; + let mut updates: Vec<_> = storage.sync_to_current_updates().await?; assert!(!updates.is_empty(), "initial catalog snapshot is missing"); let mut txn = storage.transaction().await?; // Migrate/update durable data before we start loading the in-memory catalog. - let (migrated_builtins, new_builtin_collections) = { + let new_builtin_collections = { migrate::durable_migrate( &mut txn, state.config.environment_id.organization_id(), @@ -204,7 +203,7 @@ impl Catalog { txn.set_system_config_synced_once()?; } // Add any new builtin objects and remove old ones. - let (migrated_builtins, new_builtin_collections) = + let new_builtin_collections = add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?; let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap { system_cluster: config.builtin_system_cluster_config, @@ -224,9 +223,10 @@ impl Catalog { )?; add_new_remove_old_builtin_roles_migration(&mut txn)?; remove_invalid_config_param_role_defaults_migration(&mut txn)?; - (migrated_builtins, new_builtin_collections) + remove_pending_cluster_replicas_migration(&mut txn)?; + + new_builtin_collections }; - remove_pending_cluster_replicas_migration(&mut txn)?; let op_updates = txn.get_and_commit_op_updates(); updates.extend(op_updates); @@ -431,7 +431,7 @@ impl Catalog { ) .await .map_err(|e| { - Error::new(ErrorKind::FailedMigration { + Error::new(ErrorKind::FailedCatalogMigration { last_seen_version: last_seen_version.clone(), this_version: config.build_info.version, cause: e.to_string(), @@ -481,19 +481,19 @@ impl Catalog { } // Migrate builtin items. - let BuiltinItemMigrationResult { - builtin_table_updates: builtin_table_update, - migrated_storage_collections_0dt, - cleanup_action, - } = migrate_builtin_items( - &mut state, + let schema_migration_result = builtin_schema_migration::run( + config.build_info, + deploy_generation, &mut txn, - &mut local_expr_cache, - migrated_builtins, config.builtin_item_migration_config, ) .await?; - builtin_table_updates.extend(builtin_table_update); + + let state_updates = txn.get_and_commit_op_updates(); + let table_updates = state + .apply_updates_for_bootstrap(state_updates, &mut local_expr_cache) + .await; + builtin_table_updates.extend(table_updates); let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates); // Bump the migration version immediately before committing. @@ -501,11 +501,12 @@ impl Catalog { txn.commit(config.boot_ts).await?; - cleanup_action.await; + // Now that the migration is durable, run any requested deferred cleanup. + schema_migration_result.cleanup_action.await; Ok(InitializeStateResult { state, - migrated_storage_collections_0dt, + migrated_storage_collections_0dt: schema_migration_result.replaced_items, new_builtin_collections: new_builtin_collections.into_iter().collect(), builtin_table_updates, last_seen_version, @@ -711,19 +712,17 @@ impl CatalogState { /// Updates the catalog with new and removed builtin items. /// -/// Returns the list of builtin [`GlobalId`]s that need to be migrated, and the list of new builtin -/// [`GlobalId`]s. +/// Returns the list of new builtin [`GlobalId`]s. fn add_new_remove_old_builtin_items_migration( builtins_cfg: &BuiltinsConfig, txn: &mut mz_catalog::durable::Transaction<'_>, -) -> Result<(Vec, Vec), mz_catalog::durable::CatalogError> { +) -> Result, mz_catalog::durable::CatalogError> { let mut new_builtin_mappings = Vec::new(); - let mut migrated_builtin_ids = Vec::new(); // Used to validate unique descriptions. let mut builtin_descs = HashSet::new(); // We compare the builtin items that are compiled into the binary with the builtin items that - // are persisted in the catalog to discover new, deleted, and migrated builtin items. + // are persisted in the catalog to discover new and deleted builtin items. let mut builtins = Vec::new(); for builtin in BUILTINS::iter(builtins_cfg) { let desc = SystemObjectDescription { @@ -775,36 +774,6 @@ fn add_new_remove_old_builtin_items_migration( .zip_eq(new_builtin_ids.clone()) .collect(); - // Look for migrated builtins. - for (builtin, system_object_mapping, fingerprint) in existing_builtins.iter().cloned() { - if system_object_mapping.unique_identifier.fingerprint != fingerprint { - // `mz_storage_usage_by_shard` cannot be migrated for multiple reasons. Firstly, - // it was cause the table to be truncated because the contents are not also - // stored in the durable catalog. Secondly, we prune `mz_storage_usage_by_shard` - // of old events in the background on startup. The correctness of that pruning - // relies on there being no other retractions to `mz_storage_usage_by_shard`. - assert_ne!( - *MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, system_object_mapping.description, - "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated" - ); - assert_ne!( - builtin.catalog_item_type(), - CatalogItemType::Type, - "types cannot be migrated" - ); - assert_ne!( - system_object_mapping.unique_identifier.fingerprint, - RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL, - "clearing the runtime alterable flag on an existing object is not permitted", - ); - assert!( - !builtin.runtime_alterable(), - "setting the runtime alterable flag on an existing object is not permitted" - ); - migrated_builtin_ids.push(system_object_mapping.unique_identifier.catalog_id); - } - } - // Add new builtin items to catalog. for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() { new_builtin_mappings.push(SystemObjectMapping { @@ -953,7 +922,7 @@ fn add_new_remove_old_builtin_items_migration( .map(|(_catalog_id, global_id)| global_id) .collect(); - Ok((migrated_builtin_ids, new_builtin_collections)) + Ok(new_builtin_collections) } fn add_new_remove_old_builtin_clusters_migration( diff --git a/src/adapter/src/catalog/open/builtin_item_migration.rs b/src/adapter/src/catalog/open/builtin_item_migration.rs deleted file mode 100644 index f98c0395ca745..0000000000000 --- a/src/adapter/src/catalog/open/builtin_item_migration.rs +++ /dev/null @@ -1,748 +0,0 @@ -// Copyright Materialize, Inc. and contributors. All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -//! Migrations for builtin items. - -use std::collections::{BTreeMap, BTreeSet}; -use std::sync::Arc; - -use futures::FutureExt; -use futures::future::BoxFuture; -use mz_adapter_types::dyncfgs::ENABLE_BUILTIN_MIGRATION_SCHEMA_EVOLUTION; -use mz_catalog::SYSTEM_CONN_ID; -use mz_catalog::builtin::{BUILTINS, BuiltinTable, Fingerprint}; -use mz_catalog::config::BuiltinItemMigrationConfig; -use mz_catalog::durable::objects::SystemObjectUniqueIdentifier; -use mz_catalog::durable::{ - DurableCatalogError, FenceError, SystemObjectDescription, SystemObjectMapping, Transaction, -}; -use mz_catalog::memory::error::{Error, ErrorKind}; -use mz_catalog::memory::objects::CatalogItem; -use mz_ore::collections::CollectionExt; -use mz_ore::{halt, soft_assert_or_log, soft_panic_or_log}; -use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG; -use mz_persist_client::critical::SinceHandle; -use mz_persist_client::read::ReadHandle; -use mz_persist_client::schema::CaESchema; -use mz_persist_client::write::WriteHandle; -use mz_persist_client::{Diagnostics, PersistClient}; -use mz_persist_types::ShardId; -use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema}; -use mz_repr::{CatalogItemId, GlobalId, Timestamp}; -use mz_sql::catalog::CatalogItem as _; -use mz_storage_client::controller::StorageTxn; -use mz_storage_types::StorageDiff; -use mz_storage_types::sources::SourceData; -use timely::progress::{Antichain, Timestamp as TimelyTimestamp}; -use tracing::{debug, error, info, warn}; - -use crate::catalog::open::builtin_item_migration::persist_schema::{TableKey, TableKeySchema}; -use crate::catalog::state::LocalExpressionCache; -use crate::catalog::{BuiltinTableUpdate, CatalogState}; - -/// The results of a builtin item migration. -pub(crate) struct BuiltinItemMigrationResult { - /// A vec of updates to apply to the builtin tables. - pub(crate) builtin_table_updates: Vec>, - /// A set of new shards that may need to be initialized. - pub(crate) migrated_storage_collections_0dt: BTreeSet, - /// Some cleanup action to take once the migration has been made durable. - pub(crate) cleanup_action: BoxFuture<'static, ()>, -} - -/// Perform migrations for any builtin items that may have changed between versions. -/// -/// We only need to do anything for items that have an associated storage collection. Others -/// (views, indexes) don't have any durable state that requires migration. -/// -/// We have the ability to handle some backward-compatible schema changes through persist schema -/// evolution, and we do so when possible. For changes that schema evolution doesn't support, we -/// instead "migrate" the affected storage collections by creating new persist shards with the new -/// schemas and dropping the old ones. See [`migrate_builtin_collections_incompatible`] for -/// details. -pub(crate) async fn migrate_builtin_items( - state: &mut CatalogState, - txn: &mut Transaction<'_>, - local_expr_cache: &mut LocalExpressionCache, - migrated_builtins: Vec, - BuiltinItemMigrationConfig { - persist_client, - read_only, - }: BuiltinItemMigrationConfig, -) -> Result { - assert_eq!( - read_only, - txn.is_savepoint(), - "txn must be in savepoint mode when read_only is true, and in writable mode otherwise", - ); - - update_catalog_fingerprints(state, txn, &migrated_builtins)?; - - // Collect GlobalIds of storage collections we need to migrate. - let mut collections_to_migrate: Vec<_> = migrated_builtins - .into_iter() - .filter_map(|id| { - use CatalogItem::*; - match &state.get_entry(&id).item() { - Table(table) => Some(table.global_ids().into_element()), - Source(source) => Some(source.global_id()), - MaterializedView(mv) => Some(mv.global_id()), - ContinualTask(ct) => Some(ct.global_id()), - Log(_) | Sink(_) | View(_) | Index(_) | Type(_) | Func(_) | Secret(_) - | Connection(_) => None, - } - }) - .collect(); - - // Attempt to perform schema evolution. - // - // If we run into an unexpected error while trying to perform schema evolution, we abort the - // migration process, rather than automatically falling back to replacing the persist shards. - // We do this to avoid accidentally losing data due to a bug. This gives us the option to - // decide if we'd rather fix the bug or skip schema evolution using the dyncfg flag. - if ENABLE_BUILTIN_MIGRATION_SCHEMA_EVOLUTION.get(state.system_config().dyncfgs()) { - collections_to_migrate = - try_evolve_persist_schemas(state, txn, collections_to_migrate, &persist_client).await?; - } else { - info!("skipping builtin migration by schema evolution"); - } - - // For collections whose schemas we couldn't evolve, perform the replacement process. - // Note that we need to invoke this process even if `collections_to_migrate` is empty because - // it also cleans up any leftovers of previous migrations from the migration shard. - migrate_builtin_collections_incompatible( - state, - txn, - local_expr_cache, - persist_client, - collections_to_migrate, - read_only, - ) - .await -} - -/// Update the durably stored fingerprints of `migrated_builtins`. -fn update_catalog_fingerprints( - state: &CatalogState, - txn: &mut Transaction<'_>, - migrated_builtins: &[CatalogItemId], -) -> Result<(), Error> { - let id_fingerprint_map: BTreeMap<_, _> = BUILTINS::iter(&state.config().builtins_cfg) - .map(|builtin| { - let id = state.resolve_builtin_object(builtin); - let fingerprint = builtin.fingerprint(); - (id, fingerprint) - }) - .collect(); - let mut migrated_system_object_mappings = BTreeMap::new(); - for item_id in migrated_builtins { - let fingerprint = id_fingerprint_map - .get(item_id) - .expect("missing fingerprint"); - let entry = state.get_entry(item_id); - let schema_name = state - .get_schema( - &entry.name().qualifiers.database_spec, - &entry.name().qualifiers.schema_spec, - entry.conn_id().unwrap_or(&SYSTEM_CONN_ID), - ) - .name - .schema - .as_str(); - // Builtin Items can only be referenced by a single GlobalId. - let global_id = state.get_entry(item_id).global_ids().into_element(); - - migrated_system_object_mappings.insert( - *item_id, - SystemObjectMapping { - description: SystemObjectDescription { - schema_name: schema_name.to_string(), - object_type: entry.item_type(), - object_name: entry.name().item.clone(), - }, - unique_identifier: SystemObjectUniqueIdentifier { - catalog_id: *item_id, - global_id, - fingerprint: fingerprint.clone(), - }, - }, - ); - } - txn.update_system_object_mappings(migrated_system_object_mappings)?; - - Ok(()) -} - -/// Attempt to migrate the given builtin collections using persist schema evolution. -/// -/// Returns the IDs of collections for which schema evolution did not succeed, due to an "expected" -/// error. At the moment, there are two expected reasons for schema evolution to fail: (a) the -/// existing shard doesn't have a schema and (b) the new schema is incompatible with the existing -/// schema. -/// -/// If this method encounters an unexpected error, it returns an `Error` instead. The caller is -/// expected to abort the migration process in response. -async fn try_evolve_persist_schemas( - state: &CatalogState, - txn: &Transaction<'_>, - migrated_storage_collections: Vec, - persist_client: &PersistClient, -) -> Result, Error> { - let collection_metadata = txn.get_collection_metadata(); - - let mut failed = Vec::new(); - for id in migrated_storage_collections { - let Some(&shard_id) = collection_metadata.get(&id) else { - return Err(Error::new(ErrorKind::Internal(format!( - "builtin migration: missing metadata for builtin collection {id}" - )))); - }; - - let diagnostics = Diagnostics { - shard_name: id.to_string(), - handle_purpose: "migrate builtin schema".to_string(), - }; - let Some((old_schema_id, old_schema, _)) = persist_client - .latest_schema::(shard_id, diagnostics.clone()) - .await - .expect("invalid usage") - else { - // There are two known cases where it is expected to not find a latest schema: - // * The existing shard has never been written to previously. - // * We are running inside an upgrade check, with an in-memory `persist_client`. - info!(%id, "builtin schema evolution failed: missing latest schema"); - failed.push(id); - continue; - }; - - let entry = state.get_entry_by_global_id(&id); - let Some(new_schema) = entry.desc_opt_latest() else { - return Err(Error::new(ErrorKind::Internal(format!( - "builtin migration: missing new schema for builtin collection {id}" - )))); - }; - - info!(%id, ?old_schema, ?new_schema, "attempting builtin schema evolution"); - - let result = persist_client - .compare_and_evolve_schema::( - shard_id, - old_schema_id, - &new_schema, - &UnitSchema, - diagnostics, - ) - .await - .expect("invalid usage"); - - match result { - CaESchema::Ok(_) => { - info!("builtin schema evolution succeeded"); - } - CaESchema::Incompatible => { - info!("builtin schema evolution failed: incompatible"); - failed.push(id); - } - CaESchema::ExpectedMismatch { schema_id, .. } => { - return Err(Error::new(ErrorKind::Internal(format!( - "builtin migration: unexpected schema mismatch ({} != {})", - schema_id, old_schema_id, - )))); - } - } - } - - Ok(failed) -} - -/// Migrate builtin collections that are not supported by persist schema evolution. -/// -/// The high level description of this approach is that we create new shards for each migrated -/// builtin collection with the new schema, without changing the global ID. Dependent objects are -/// not modified but now read from the new shards. -/// -/// A detailed description of this approach follows. It's important that all of these steps are -/// idempotent, so that we can safely crash at any point and non-upgrades turn into a no-op. -/// -/// 1. Each environment has a dedicated persist shard, called the migration shard, that allows -/// environments to durably write down metadata while in read-only mode. The shard is a -/// mapping of `(GlobalId, build_version)` to `ShardId`. -/// 2. Read in the current contents of the migration shard. -/// 3. Collect all the `ShardId`s from the migration shard that are not at the current -/// `build_version` or are not in the set of migrated collections. -/// a. If they ARE NOT mapped to a `GlobalId` in the storage metadata then they are shards -/// from an incomplete migration. Finalize them and remove them from the migration shard. -/// Note: care must be taken to not remove the shard from the migration shard until we are -/// sure that they will be finalized, otherwise the shard will leak. -/// b. If they ARE mapped to a `GlobalId` in the storage metadata then they are shards from a -/// complete migration. Remove them from the migration shard. -/// 4. Collect all the `GlobalId`s of collections that are migrated, but not in the migration -/// shard for the current build version. Generate new `ShardId`s and add them to the -/// migration shard. -/// 5. At this point the migration shard should only logically contain a mapping of migrated -/// collection `GlobalId`s to new `ShardId`s for the current build version. For each of these -/// `GlobalId`s such that the `ShardId` isn't already in the storage metadata: -/// a. Remove the current `GlobalId` to `ShardId` mapping from the storage metadata. -/// b. Finalize the removed `ShardId`s. -/// c. Insert the new `GlobalId` to `ShardId` mapping into the storage metadata. -/// -/// This approach breaks the abstraction boundary between the catalog and the storage metadata, but -/// these types of rare, but extremely useful, abstraction breaks is the exact reason they are -/// co-located. -/// -/// Since the new shards are created in read-only mode, they will be left empty and all dependent -/// items will fail to hydrate. -/// -/// While in read-only mode we write the migration changes to `txn`, which will update the -/// in-memory catalog, which will cause the new shards to be created in storage. However, we don't -/// have to worry about the catalog changes becoming durable because the `txn` is in savepoint -/// mode. When we re-execute this migration as the leader (i.e. outside of read-only mode), `txn` -/// will be writable and the migration will be made durable in the catalog. We always write -/// directly to the migration shard, regardless of read-only mode. So we have to be careful not to -/// remove anything from the migration shard until we're sure that its results have been made -/// durable elsewhere. -async fn migrate_builtin_collections_incompatible( - state: &mut CatalogState, - txn: &mut Transaction<'_>, - local_expr_cache: &mut LocalExpressionCache, - persist_client: PersistClient, - migrated_storage_collections: Vec, - read_only: bool, -) -> Result { - let build_version = state.config.build_info.semver_version(); - - // The migration shard only stores raw GlobalIds, so it's more convenient to keep the list of - // migrated collections in that form. - let migrated_storage_collections: Vec<_> = migrated_storage_collections - .into_iter() - .map(|gid| match gid { - GlobalId::System(raw) => raw, - _ => panic!("builtins must have system IDs"), - }) - .collect(); - - // 1. Open migration shard. - let organization_id = state.config.environment_id.organization_id(); - let shard_id = txn - .get_builtin_migration_shard() - .expect("builtin migration shard should exist for opened catalogs"); - let diagnostics = Diagnostics { - shard_name: "builtin_migration".to_string(), - handle_purpose: format!( - "builtin table migration shard for org {organization_id:?} version {build_version:?}" - ), - }; - let mut since_handle: SinceHandle = - persist_client - .open_critical_since( - shard_id, - // TODO: We may need to use a different critical reader - // id for this if we want to be able to introspect it via SQL. - PersistClient::CONTROLLER_CRITICAL_SINCE, - diagnostics.clone(), - ) - .await - .expect("invalid usage"); - let (mut write_handle, mut read_handle): ( - WriteHandle, - ReadHandle, - ) = persist_client - .open( - shard_id, - Arc::new(TableKeySchema), - Arc::new(ShardIdSchema), - diagnostics.clone(), - USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()), - ) - .await - .expect("invalid usage"); - // Commit an empty write at the minimum timestamp so the shard is always readable. - const EMPTY_UPDATES: &[((TableKey, ShardId), Timestamp, StorageDiff)] = &[]; - let res = write_handle - .compare_and_append( - EMPTY_UPDATES, - Antichain::from_elem(Timestamp::minimum()), - Antichain::from_elem(Timestamp::minimum().step_forward()), - ) - .await - .expect("invalid usage"); - if let Err(e) = res { - debug!("migration shard already initialized: {e:?}"); - } - - // 2. Read in the current contents of the migration shard. - // We intentionally fetch the upper AFTER opening the read handle to address races between - // the upper and since moving forward in some other process. - let upper = fetch_upper(&mut write_handle).await; - // The empty write above should ensure that the upper is at least 1. - let as_of = upper.checked_sub(1).ok_or_else(|| { - Error::new(ErrorKind::Internal(format!( - "builtin migration failed, unexpected upper: {upper:?}" - ))) - })?; - let since = read_handle.since(); - assert!( - since.less_equal(&as_of), - "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of" - ); - let as_of = Antichain::from_elem(as_of); - let snapshot = read_handle - .snapshot_and_fetch(as_of) - .await - .expect("we have advanced the as_of by the since"); - soft_assert_or_log!( - snapshot.iter().all(|(_, _, diff)| *diff == 1), - "snapshot_and_fetch guarantees a consolidated result: {snapshot:?}" - ); - let mut global_id_shards: BTreeMap<_, _> = snapshot - .into_iter() - .filter_map(|(data, _ts, _diff)| { - if let (Ok(table_key), Ok(shard_id)) = data { - Some((table_key, shard_id)) - } else { - // If we can't decode the data, it has likely been written by a newer version, so - // we ignore it. - warn!("skipping unreadable migration shard entry: {data:?}"); - None - } - }) - .collect(); - - // 4. Clean up contents of migration shard. - let mut migrated_shard_updates: Vec<((TableKey, ShardId), Timestamp, StorageDiff)> = Vec::new(); - let mut migration_shards_to_finalize = BTreeSet::new(); - let storage_collection_metadata = txn.get_collection_metadata(); - for (table_key, shard_id) in global_id_shards.clone() { - if table_key.build_version > build_version { - if read_only { - halt!( - "saw build version {}, which is greater than current build version {}", - table_key.build_version, - build_version - ); - } else { - // If we are in leader mode, and a newer (read-only) version has started a - // migration, we must not allow ourselves to get fenced out! Continuing here might - // confuse any read-only process running the migrations concurrently, but it's - // better for the read-only env to crash than the leader. - // TODO(#9755): handle this in a more principled way - warn!( - %table_key.build_version, %build_version, - "saw build version which is greater than current build version", - ); - global_id_shards.remove(&table_key); - continue; - } - } - - if !migrated_storage_collections.contains(&table_key.global_id) - || table_key.build_version < build_version - { - global_id_shards.remove(&table_key); - if storage_collection_metadata.get(&GlobalId::System(table_key.global_id)) - == Some(&shard_id) - { - migrated_shard_updates.push(((table_key, shard_id.clone()), upper, -1)); - } else { - migration_shards_to_finalize.insert((table_key, shard_id)); - } - } - } - - // 5. Add migrated tables to migration shard for current build version. - let mut global_id_shards: BTreeMap<_, _> = global_id_shards - .into_iter() - .map(|(table_key, shard_id)| (table_key.global_id, shard_id)) - .collect(); - for global_id in migrated_storage_collections { - if !global_id_shards.contains_key(&global_id) { - let shard_id = ShardId::new(); - global_id_shards.insert(global_id, shard_id); - let table_key = TableKey { - global_id, - build_version: build_version.clone(), - }; - migrated_shard_updates.push(((table_key, shard_id), upper, 1)); - } - } - - // It's very important that we use the same `upper` that was used to read in a snapshot of the - // shard. If someone updated the shard after we read then this write will fail. - let upper = if !migrated_shard_updates.is_empty() { - write_to_migration_shard( - migrated_shard_updates, - upper, - &mut write_handle, - &mut since_handle, - ) - .await? - } else { - upper - }; - - // 6. Update `GlobalId` to `ShardId` mapping and register old `ShardId`s for finalization. We don't do the finalization here and instead rely on the background finalization task. - let migrated_storage_collections_0dt = { - let txn: &mut dyn StorageTxn = txn; - let storage_collection_metadata = txn.get_collection_metadata(); - let global_id_shards: BTreeMap<_, _> = global_id_shards - .into_iter() - .map(|(global_id, shard_id)| (GlobalId::System(global_id), shard_id)) - .filter(|(global_id, shard_id)| { - storage_collection_metadata.get(global_id) != Some(shard_id) - }) - .collect(); - let global_ids: BTreeSet<_> = global_id_shards.keys().cloned().collect(); - let mut old_shard_ids: BTreeSet<_> = txn - .delete_collection_metadata(global_ids.clone()) - .into_iter() - .map(|(_, shard_id)| shard_id) - .collect(); - old_shard_ids.extend( - migration_shards_to_finalize - .iter() - .map(|(_, shard_id)| shard_id), - ); - txn.insert_unfinalized_shards(old_shard_ids).map_err(|e| { - Error::new(ErrorKind::Internal(format!( - "builtin migration failed: {e}" - ))) - })?; - txn.insert_collection_metadata(global_id_shards) - .map_err(|e| { - Error::new(ErrorKind::Internal(format!( - "builtin migration failed: {e}" - ))) - })?; - global_ids - }; - - // 7. Map the migrated `GlobalId`s to their corresponding `CatalogItemId`. - let migrated_storage_collections_0dt = migrated_storage_collections_0dt - .into_iter() - .map(|gid| state.get_entry_by_global_id(&gid).id()) - .collect(); - - let updates = txn.get_and_commit_op_updates(); - let builtin_table_updates = state - .apply_updates_for_bootstrap(updates, local_expr_cache) - .await; - - let cleanup_action = async move { - if !read_only { - let updates: Vec<_> = migration_shards_to_finalize - .into_iter() - .map(|(table_key, shard_id)| ((table_key, shard_id), upper, -1)) - .collect(); - if !updates.is_empty() { - // Ignore any errors, these shards will get cleaned up in the next upgrade. - // It's important to use `upper` here. If there was another concurrent write at - // `upper`, then `updates` are no longer valid. - let res = - write_to_migration_shard(updates, upper, &mut write_handle, &mut since_handle) - .await; - if let Err(e) = res { - error!("Unable to remove old entries from migration shard: {e:?}"); - } - } - persist_client - .upgrade_version::(shard_id, diagnostics) - .await - .expect("valid usage"); - } - } - .boxed(); - - Ok(BuiltinItemMigrationResult { - builtin_table_updates, - migrated_storage_collections_0dt, - cleanup_action, - }) -} - -async fn fetch_upper( - write_handle: &mut WriteHandle, -) -> Timestamp { - write_handle - .fetch_recent_upper() - .await - .as_option() - .cloned() - .expect("we use a totally ordered time and never finalize the shard") -} - -async fn write_to_migration_shard( - updates: Vec<((TableKey, ShardId), Timestamp, StorageDiff)>, - upper: Timestamp, - write_handle: &mut WriteHandle, - since_handle: &mut SinceHandle, -) -> Result { - let next_upper = upper.step_forward(); - // Lag the shard's upper by 1 to keep it readable. - let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1)); - let next_upper_antichain = Antichain::from_elem(next_upper); - - if let Err(err) = write_handle - .compare_and_append(updates, Antichain::from_elem(upper), next_upper_antichain) - .await - .expect("invalid usage") - { - return Err(Error::new(ErrorKind::Durable(DurableCatalogError::Fence( - FenceError::migration(err), - )))); - } - - // The since handle gives us the ability to fence out other downgraders using an opaque token. - // (See the method documentation for details.) - // That's not needed here, so we use the since handle's opaque token to avoid any comparison - // failures. - let opaque = *since_handle.opaque(); - let downgrade = since_handle - .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to)) - .await; - match downgrade { - None => {} - Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"), - Some(Ok(updated)) => soft_assert_or_log!( - updated == downgrade_to, - "updated bound ({updated:?}) should match expected ({downgrade_to:?})" - ), - } - - Ok(next_upper) -} - -mod persist_schema { - use std::num::ParseIntError; - - use arrow::array::{StringArray, StringBuilder}; - use bytes::{BufMut, Bytes}; - use mz_persist_types::Codec; - use mz_persist_types::codec_impls::{ - SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder, - }; - use mz_persist_types::columnar::Schema; - use mz_persist_types::stats::NoneStats; - - #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)] - pub(super) struct TableKey { - pub(super) global_id: u64, - pub(super) build_version: semver::Version, - } - - impl std::fmt::Display for TableKey { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}-{}", self.global_id, self.build_version) - } - } - - impl std::str::FromStr for TableKey { - type Err = String; - - fn from_str(s: &str) -> Result { - let parts: Vec<_> = s.splitn(2, '-').collect(); - let &[global_id, build_version] = parts.as_slice() else { - return Err(format!("invalid TableKey '{s}'")); - }; - let global_id = global_id - .parse() - .map_err(|e: ParseIntError| e.to_string())?; - let build_version = build_version - .parse() - .map_err(|e: semver::Error| e.to_string())?; - Ok(TableKey { - global_id, - build_version, - }) - } - } - - impl From for String { - fn from(table_key: TableKey) -> Self { - table_key.to_string() - } - } - - impl TryFrom for TableKey { - type Error = String; - - fn try_from(s: String) -> Result { - s.parse() - } - } - - impl Default for TableKey { - fn default() -> Self { - Self { - global_id: Default::default(), - build_version: semver::Version::new(0, 0, 0), - } - } - } - - impl Codec for TableKey { - type Storage = (); - type Schema = TableKeySchema; - fn codec_name() -> String { - "TableKey".into() - } - fn encode(&self, buf: &mut B) { - buf.put(self.to_string().as_bytes()) - } - fn decode<'a>(buf: &'a [u8], _schema: &TableKeySchema) -> Result { - let table_key = String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())?; - table_key.parse() - } - fn encode_schema(_schema: &Self::Schema) -> Bytes { - Bytes::new() - } - fn decode_schema(buf: &Bytes) -> Self::Schema { - assert_eq!(*buf, Bytes::new()); - TableKeySchema - } - } - - impl SimpleColumnarData for TableKey { - type ArrowBuilder = StringBuilder; - type ArrowColumn = StringArray; - - fn goodbytes(builder: &Self::ArrowBuilder) -> usize { - builder.values_slice().len() - } - - fn push(&self, builder: &mut Self::ArrowBuilder) { - builder.append_value(&self.to_string()); - } - fn push_null(builder: &mut Self::ArrowBuilder) { - builder.append_null(); - } - fn read(&mut self, idx: usize, column: &Self::ArrowColumn) { - *self = column.value(idx).parse().expect("should be valid TableKey"); - } - } - - /// An implementation of [Schema] for [TableKey]. - #[derive(Debug, PartialEq)] - pub(super) struct TableKeySchema; - - impl Schema for TableKeySchema { - type ArrowColumn = StringArray; - type Statistics = NoneStats; - - type Decoder = SimpleColumnarDecoder; - type Encoder = SimpleColumnarEncoder; - - fn encoder(&self) -> Result { - Ok(SimpleColumnarEncoder::default()) - } - - fn decoder(&self, col: Self::ArrowColumn) -> Result { - Ok(SimpleColumnarDecoder::new(col)) - } - } -} diff --git a/src/adapter/src/catalog/open/builtin_schema_migration.rs b/src/adapter/src/catalog/open/builtin_schema_migration.rs new file mode 100644 index 0000000000000..2b79cefa87f48 --- /dev/null +++ b/src/adapter/src/catalog/open/builtin_schema_migration.rs @@ -0,0 +1,1106 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Support for migrating the schemas of builtin storage collections. +//! +//! If a version upgrade changes the schema of a builtin collection that's made durable in persist, +//! that persist shard's schema must be migrated accordingly. The migration must happen in a way +//! that's compatible with 0dt upgrades: Read-only environments need to be able to read the +//! collections with the new schema, without interfering with the leader environment's continued +//! use of the old schema. +//! +//! Two migration mechanisms are provided: +//! +//! * [`Mechanism::Evolution`] uses persist's schema evolution support to evolve the persist +//! shard's schema in-place. Only works for backward-compatible changes. +//! * [`Mechanism::Replacement`] creates a new shard to serve the builtin collection in the new +//! version. Works for all schema changes but discards existing data. +//! +//! Which mechanism to use is selected through entries in the `MIGRATIONS` list. In general, the +//! `Evolution` mechanism should be used when possible, as it avoids data loss. +//! +//! For more context and details on the implementation, see +//! `doc/developer/design/20251015_builtin_schema_migration.md`. + +use std::collections::{BTreeMap, BTreeSet}; +use std::sync::Arc; + +use anyhow::bail; +use futures::FutureExt; +use futures::future::BoxFuture; +use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO}; +use mz_catalog::builtin::{ + BUILTINS_STATIC, Builtin, Fingerprint, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, + RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL, +}; +use mz_catalog::config::BuiltinItemMigrationConfig; +use mz_catalog::durable::objects::SystemObjectUniqueIdentifier; +use mz_catalog::durable::{SystemObjectDescription, SystemObjectMapping, Transaction}; +use mz_catalog::memory::error::{Error, ErrorKind}; +use mz_ore::soft_assert_or_log; +use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG; +use mz_persist_client::critical::SinceHandle; +use mz_persist_client::read::ReadHandle; +use mz_persist_client::schema::CaESchema; +use mz_persist_client::write::WriteHandle; +use mz_persist_client::{Diagnostics, PersistClient}; +use mz_persist_types::ShardId; +use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema}; +use mz_persist_types::schema::backward_compatible; +use mz_repr::namespaces::{MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA}; +use mz_repr::{CatalogItemId, GlobalId, Timestamp}; +use mz_sql::catalog::{CatalogItemType, NameReference}; +use mz_storage_client::controller::StorageTxn; +use mz_storage_types::StorageDiff; +use mz_storage_types::sources::SourceData; +use semver::Version; +use timely::progress::Antichain; +use tracing::{debug, info}; + +use crate::catalog::migrate::get_migration_version; + +/// Builtin schema migrations required to upgrade to the current build version. +/// +/// Migration steps for old versions must be retained around according to the upgrade policy. For +/// example, if we support upgrading one major version at a time, the release of version `N.0.0` +/// can delete all migration steps with versions before `(N-1).0.0`. +/// +/// Smallest supported version: 0.147.0 +const MIGRATIONS: &[MigrationStep] = &[ + MigrationStep { + version: Version::new(0, 149, 0), + object: Object { + type_: CatalogItemType::Source, + schema: MZ_INTERNAL_SCHEMA, + name: "mz_sink_statistics_raw", + }, + mechanism: Mechanism::Replacement, + }, + MigrationStep { + version: Version::new(0, 149, 0), + object: Object { + type_: CatalogItemType::Source, + schema: MZ_INTERNAL_SCHEMA, + name: "mz_source_statistics_raw", + }, + mechanism: Mechanism::Replacement, + }, + MigrationStep { + version: Version::new(0, 159, 0), + object: Object { + type_: CatalogItemType::Source, + schema: MZ_INTERNAL_SCHEMA, + name: "mz_cluster_replica_metrics_history", + }, + mechanism: Mechanism::Evolution, + }, + MigrationStep { + version: Version::new(0, 160, 0), + object: Object { + type_: CatalogItemType::Table, + schema: MZ_CATALOG_SCHEMA, + name: "mz_roles", + }, + mechanism: Mechanism::Replacement, + }, + MigrationStep { + version: Version::new(0, 160, 0), + object: Object { + type_: CatalogItemType::Table, + schema: MZ_CATALOG_SCHEMA, + name: "mz_sinks", + }, + mechanism: Mechanism::Replacement, + }, +]; + +/// A migration required to upgrade past a specific version. +#[derive(Clone, Debug)] +struct MigrationStep { + /// The build version that requires this migration. + version: Version, + /// The object that requires migration. + object: Object, + /// The migration mechanism to be used. + mechanism: Mechanism, +} + +/// The mechanism to use to migrate the schema of a builtin collection. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +#[allow(dead_code)] +enum Mechanism { + /// Persist schema evolution. + /// + /// Keeps existing contents but only works for schema changes that are backward compatible + /// according to [`backward_compatible`]. + Evolution, + /// Shard replacement. + /// + /// Works for arbitrary schema changes but loses existing contents. + Replacement, +} + +/// The object of a migration. +/// +/// This has the same information as [`SystemObjectDescription`] but can be constructed in `const` +/// contexts, like the `MIGRATIONS` list. +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +struct Object { + type_: CatalogItemType, + schema: &'static str, + name: &'static str, +} + +impl From for SystemObjectDescription { + fn from(object: Object) -> Self { + SystemObjectDescription { + schema_name: object.schema.into(), + object_type: object.type_, + object_name: object.name.into(), + } + } +} + +/// The result of a builtin schema migration. +pub(super) struct MigrationResult { + /// IDs of items whose shards have been replaced using the `Replacement` mechanism. + pub replaced_items: BTreeSet, + /// A cleanup action to take once the migration has been made durable. + pub cleanup_action: BoxFuture<'static, ()>, +} + +impl Default for MigrationResult { + fn default() -> Self { + Self { + replaced_items: Default::default(), + cleanup_action: async {}.boxed(), + } + } +} + +/// Run builtin schema migrations. +/// +/// This is the entry point used by adapter when opening the catalog. It uses the hardcoded +/// `BUILTINS_STATIC` and `MIGRATIONS` lists to initialize the lists of available builtins and +/// required migrations, respectively. +pub(super) async fn run( + build_info: &BuildInfo, + deploy_generation: u64, + txn: &mut Transaction<'_>, + config: BuiltinItemMigrationConfig, +) -> Result { + // Sanity check to ensure we're not touching durable state in read-only mode. + assert_eq!(config.read_only, txn.is_savepoint()); + + // Tests may provide a dummy build info that confuses the migration step selection logic. Skip + // migrations if we observe this build info. + if *build_info == DUMMY_BUILD_INFO { + return Ok(MigrationResult::default()); + } + + let Some(durable_version) = get_migration_version(txn) else { + // New catalog; nothing to do. + return Ok(MigrationResult::default()); + }; + let build_version = build_info.semver_version(); + + let builtins = BUILTINS_STATIC + .iter() + .map(|builtin| { + let object = SystemObjectDescription { + schema_name: builtin.schema().to_string(), + object_type: builtin.catalog_item_type(), + object_name: builtin.name().to_string(), + }; + (object, builtin) + }) + .collect(); + + let migration = Migration::new( + durable_version.clone(), + build_version.clone(), + deploy_generation, + txn, + builtins, + config, + ); + + let result = migration.run(MIGRATIONS).await.map_err(|e| { + Error::new(ErrorKind::FailedBuiltinSchemaMigration { + last_seen_version: durable_version.to_string(), + this_version: build_version.to_string(), + cause: e.to_string(), + }) + })?; + + Ok(result) +} + +/// Context of a builtin schema migration. +struct Migration<'a, 'b> { + /// The version we are migrating from. + /// + /// Same as the build version of the most recent leader process that successfully performed + /// migrations. + source_version: Version, + /// The version we are migration to. + /// + /// Same as the build version of this process. + target_version: Version, + deploy_generation: u64, + txn: &'a mut Transaction<'b>, + builtins: BTreeMap>, + object_ids: BTreeMap, + config: BuiltinItemMigrationConfig, +} + +impl<'a, 'b> Migration<'a, 'b> { + fn new( + source_version: Version, + target_version: Version, + deploy_generation: u64, + txn: &'a mut Transaction<'b>, + builtins: BTreeMap>, + config: BuiltinItemMigrationConfig, + ) -> Self { + let object_ids = txn + .get_system_object_mappings() + .map(|m| (m.description, m.unique_identifier)) + .collect(); + + Self { + source_version, + target_version, + deploy_generation, + txn, + builtins, + object_ids, + config, + } + } + + async fn run(mut self, steps: &[MigrationStep]) -> anyhow::Result { + info!( + "running builtin schema migration: {} -> {}", + self.source_version, self.target_version + ); + + self.validate_migration_steps(steps); + + let (force, plan) = match self.config.force_migration.as_deref() { + None => (false, self.plan_migration(steps)), + Some("evolution") => (true, self.plan_forced_migration(Mechanism::Evolution)), + Some("replacement") => (true, self.plan_forced_migration(Mechanism::Replacement)), + Some(other) => panic!("unknown force migration mechanism: {other}"), + }; + + if self.source_version == self.target_version && !force { + info!("skipping migration: already at target version"); + return Ok(Default::default()); + } else if self.source_version > self.target_version { + bail!("downgrade not supported"); + } + + // In leader mode, upgrade the version of the migration shard to the target version. + // This fences out any readers at lower versions. + if !self.config.read_only { + self.upgrade_migration_shard_version().await; + } + + info!("executing migration plan: {plan:?}"); + + self.migrate_evolve(&plan.evolve).await?; + self.migrate_replace(&plan.replace).await?; + + let mut migrated_items = BTreeSet::new(); + let mut replaced_items = BTreeSet::new(); + for object in &plan.evolve { + let id = self.object_ids[object].catalog_id; + migrated_items.insert(id); + } + for object in &plan.replace { + let id = self.object_ids[object].catalog_id; + migrated_items.insert(id); + replaced_items.insert(id); + } + + self.update_fingerprints(&migrated_items)?; + + let cleanup_action = self.cleanup().await?; + + Ok(MigrationResult { + replaced_items, + cleanup_action, + }) + } + + /// Sanity check the given migration steps. + /// + /// If any of these checks fail, that's a bug in Materialize, and we panic immediately. + fn validate_migration_steps(&self, steps: &[MigrationStep]) { + for step in steps { + assert!( + step.version <= self.target_version, + "migration step version greater than target version: {} > {}", + step.version, + self.target_version, + ); + + let object = SystemObjectDescription::from(step.object.clone()); + + // `mz_storage_usage_by_shard` cannot be migrated for multiple reasons. Firstly, it would + // cause the table to be truncated because the contents are not also stored in the durable + // catalog. Secondly, we prune `mz_storage_usage_by_shard` of old events in the background + // on startup. The correctness of that pruning relies on there being no other retractions + // to `mz_storage_usage_by_shard`. + // + // TODO: Confirm the above reasoning, it might be outdated? + assert_ne!( + *MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, object, + "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated" + ); + + let Some(builtin) = self.builtins.get(&object) else { + panic!("migration step for non-existent builtin: {object:?}"); + }; + + use Builtin::*; + assert!( + matches!(builtin, Table(..) | Source(..) | ContinualTask(..)), + "schema migration not supported for builtin: {builtin:?}", + ); + } + } + + /// Select for each object to migrate the appropriate migration mechanism. + fn plan_migration(&self, steps: &[MigrationStep]) -> Plan { + // Ignore any steps at versions before `source_version`. + let steps = steps.iter().filter(|s| s.version > self.source_version); + + // Select a mechanism for each object, according to the requested migrations: + // * If any `Replacement` was requested, use `Replacement`. + // * Otherwise, (i.e. only `Evolution` was requested), use `Evolution`. + let mut by_object = BTreeMap::new(); + for step in steps { + if let Some(entry) = by_object.get_mut(&step.object) { + *entry = match (step.mechanism, *entry) { + (Mechanism::Evolution, Mechanism::Evolution) => Mechanism::Evolution, + (Mechanism::Replacement, _) | (_, Mechanism::Replacement) => { + Mechanism::Replacement + } + }; + } else { + by_object.insert(step.object.clone(), step.mechanism); + } + } + + let mut plan = Plan::default(); + for (object, mechanism) in by_object { + match mechanism { + Mechanism::Evolution => plan.evolve.push(object.into()), + Mechanism::Replacement => plan.replace.push(object.into()), + } + } + + plan + } + + /// Plan a forced migration of all objects using the given mechanism. + fn plan_forced_migration(&self, mechanism: Mechanism) -> Plan { + let objects = self + .builtins + .iter() + .filter(|(_, builtin)| matches!(builtin, Builtin::Table(..) | Builtin::Source(..))) + .map(|(object, _)| object.clone()) + .collect(); + + let mut plan = Plan::default(); + match mechanism { + Mechanism::Evolution => plan.evolve = objects, + Mechanism::Replacement => plan.replace = objects, + } + + plan + } + + /// Upgrade the migration shard to the target version. + async fn upgrade_migration_shard_version(&self) { + let persist = &self.config.persist_client; + let shard_id = self.txn.get_builtin_migration_shard().expect("must exist"); + let diagnostics = Diagnostics { + shard_name: "builtin_migration".to_string(), + handle_purpose: format!("migration shard upgrade @ {}", self.target_version), + }; + + persist + .upgrade_version::( + shard_id, + diagnostics, + ) + .await + .expect("valid usage"); + } + + /// Migrate the given objects using the `Evolution` mechanism. + async fn migrate_evolve(&self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> { + for object in objects { + self.migrate_evolve_one(object).await?; + } + Ok(()) + } + + async fn migrate_evolve_one(&self, object: &SystemObjectDescription) -> anyhow::Result<()> { + let collection_metadata = self.txn.get_collection_metadata(); + let persist = &self.config.persist_client; + + let Some(builtin) = self.builtins.get(object) else { + bail!("missing builtin {object:?}"); + }; + let Some(id) = self.object_ids.get(object).map(|i| i.global_id) else { + bail!("missing id for builtin {object:?}"); + }; + let Some(&shard_id) = collection_metadata.get(&id) else { + // No shard is registered for this builtin. In leader mode, this is fine, we'll + // register the shard during bootstrap. In read-only mode, we might be racing with the + // leader to register the shard and it's unclear what sort of confusion can arise from + // that -- better to bail out in this case. + if self.config.read_only { + bail!("missing collection metadata for builtin {object:?} ({id})"); + } else { + return Ok(()); + } + }; + + let target_desc = match builtin { + Builtin::Table(table) => &table.desc, + Builtin::Source(source) => &source.desc, + Builtin::ContinualTask(ct) => &ct.desc, + _ => bail!("not a storage collection: {builtin:?}"), + }; + + let diagnostics = Diagnostics { + shard_name: id.to_string(), + handle_purpose: format!("builtin schema migration @ {}", self.target_version), + }; + let source_schema = persist + .latest_schema::(shard_id, diagnostics.clone()) + .await + .expect("valid usage"); + + info!(?object, %id, %shard_id, ?source_schema, ?target_desc, "migrating by evolution"); + + if self.config.read_only { + // In read-only mode, only check that the new schema is backward compatible. + // We'll register it when/if we restart in leader mode. + if let Some((_, source_desc, _)) = &source_schema { + let old = mz_persist_types::columnar::data_type::(source_desc)?; + let new = mz_persist_types::columnar::data_type::(target_desc)?; + if backward_compatible(&old, &new).is_none() { + bail!( + "incompatible schema evolution for {object:?}: \ + {source_desc:?} -> {target_desc:?}" + ); + } + } + + return Ok(()); + } + + let (mut schema_id, mut source_desc) = match source_schema { + Some((schema_id, source_desc, _)) => (schema_id, source_desc), + None => { + // If no schema was previously registered, simply try to register the new one. This + // might fail due to a concurrent registration, in which case we'll fall back to + // `compare_and_evolve_schema`. + + debug!(%id, %shard_id, "no previous schema found; registering initial one"); + let schema_id = persist + .register_schema::( + shard_id, + target_desc, + &UnitSchema, + diagnostics.clone(), + ) + .await + .expect("valid usage"); + if schema_id.is_some() { + return Ok(()); + } + + debug!(%id, %shard_id, "schema registration failed; falling back to CaES"); + let (schema_id, source_desc, _) = persist + .latest_schema::( + shard_id, + diagnostics.clone(), + ) + .await + .expect("valid usage") + .expect("known to exist"); + + (schema_id, source_desc) + } + }; + + loop { + // Evolving the schema might fail if another process evolved the schema concurrently, + // in which case we need to retry. Most likely the other process evolved the schema to + // our own target schema and the second try is a no-op. + + debug!(%id, %shard_id, %schema_id, ?source_desc, ?target_desc, "attempting CaES"); + let result = persist + .compare_and_evolve_schema::( + shard_id, + schema_id, + target_desc, + &UnitSchema, + diagnostics.clone(), + ) + .await + .expect("valid usage"); + + match result { + CaESchema::Ok(schema_id) => { + debug!(%id, %shard_id, %schema_id, "schema evolved successfully"); + break; + } + CaESchema::Incompatible => bail!( + "incompatible schema evolution for {object:?}: \ + {source_desc:?} -> {target_desc:?}" + ), + CaESchema::ExpectedMismatch { + schema_id: new_id, + key, + val: UnitSchema, + } => { + schema_id = new_id; + source_desc = key; + } + } + } + + Ok(()) + } + + /// Migrate the given objects using the `Replacement` mechanism. + async fn migrate_replace(&mut self, objects: &[SystemObjectDescription]) -> anyhow::Result<()> { + if objects.is_empty() { + return Ok(Default::default()); + } + + let diagnostics = Diagnostics { + shard_name: "builtin_migration".to_string(), + handle_purpose: format!("builtin schema migration @ {}", self.target_version), + }; + let (mut persist_write, mut persist_read) = + self.open_migration_shard(diagnostics.clone()).await; + + let mut ids_to_replace = BTreeSet::new(); + for object in objects { + if let Some(ids) = self.object_ids.get(object) { + ids_to_replace.insert(ids.global_id); + } else { + bail!("missing id for builtin {object:?}"); + } + } + + info!(?objects, ?ids_to_replace, "migrating by replacement"); + + // Fetch replacement shard IDs from the migration shard, or insert new ones if none exist. + // This can fail due to writes by concurrent processes, so we need to retry. + let replaced_shards = loop { + if let Some(shards) = self + .try_get_or_insert_replacement_shards( + &ids_to_replace, + &mut persist_write, + &mut persist_read, + ) + .await? + { + break shards; + } + }; + + // Update the collection metadata in the transaction and enqueue old shards for + // finalization. + let old = self.txn.delete_collection_metadata(ids_to_replace); + let old_shards = old.into_iter().map(|(_, shard_id)| shard_id).collect(); + self.txn.insert_unfinalized_shards(old_shards)?; + self.txn.insert_collection_metadata(replaced_shards)?; + + Ok(()) + } + + /// Try to get or insert replacement shards for the given IDs into the migration shard, at + /// `target_version` and `deploy_generation`. + /// + /// This method looks for existing entries in the migration shards and returns those if they + /// are present. Otherwise it generates new shard IDs and tries to insert them. + /// + /// The result of this call is `None` if no existing entries were found and inserting new ones + /// failed because of a concurrent write to the migration shard. In this case, the caller is + /// expected to retry. + async fn try_get_or_insert_replacement_shards( + &self, + ids_to_replace: &BTreeSet, + persist_write: &mut WriteHandle, + persist_read: &mut ReadHandle, + ) -> anyhow::Result>> { + let upper = persist_write.fetch_recent_upper().await; + let write_ts = *upper.as_option().expect("migration shard not sealed"); + + // Another process might already have done a shard replacement at our version and + // generation, in which case we can directly reuse the replacement shards. + if let Some(read_ts) = write_ts.step_back() { + let pred = |key: &migration_shard::Key| { + key.build_version == self.target_version + && key.deploy_generation == Some(self.deploy_generation) + }; + if let Some(entries) = read_migration_shard(persist_read, read_ts, pred).await { + let replaced_shards: BTreeMap<_, _> = entries + .into_iter() + .map(|(key, shard_id)| (GlobalId::System(key.global_id), shard_id)) + .collect(); + + // Processes at the same build version are expected to migrate the same collections. + let replaced_ids: BTreeSet<_> = replaced_shards.keys().copied().collect(); + if replaced_ids != *ids_to_replace { + bail!("replaced ids mismatch: {replaced_ids:?} != {ids_to_replace:?}"); + } + + debug!( + %read_ts, ?replaced_shards, + "found existing entries in migration shard", + ); + return Ok(Some(replaced_shards)); + } + } + + // Generate new shard IDs and attempt to insert them into the migration shard. If we get a + // CaA failure at `write_ts` that means a concurrent process has inserted in the meantime + // and we need to re-check the migration shard contents. + let mut replaced_shards = BTreeMap::new(); + let mut updates = Vec::new(); + for &id in ids_to_replace { + let shard_id = ShardId::new(); + replaced_shards.insert(id, shard_id); + + let GlobalId::System(global_id) = id else { + bail!("attempt to migrate a non-system collection: {id}"); + }; + let key = migration_shard::Key { + global_id, + build_version: self.target_version.clone(), + deploy_generation: Some(self.deploy_generation), + }; + updates.push(((key, shard_id), write_ts, 1)); + } + + let upper = Antichain::from_elem(write_ts); + let new_upper = Antichain::from_elem(write_ts.step_forward()); + debug!(%write_ts, "attempting insert into migration shard"); + let result = persist_write + .compare_and_append(updates, upper, new_upper) + .await + .expect("valid usage"); + + match result { + Ok(()) => { + debug!( + %write_ts, ?replaced_shards, + "successfully inserted into migration shard" + ); + Ok(Some(replaced_shards)) + } + Err(_mismatch) => Ok(None), + } + } + + /// Open writer and reader for the migration shard. + async fn open_migration_shard( + &self, + diagnostics: Diagnostics, + ) -> ( + WriteHandle, + ReadHandle, + ) { + let persist = &self.config.persist_client; + let shard_id = self.txn.get_builtin_migration_shard().expect("must exist"); + + persist + .open( + shard_id, + Arc::new(migration_shard::KeySchema), + Arc::new(ShardIdSchema), + diagnostics, + USE_CRITICAL_SINCE_CATALOG.get(persist.dyncfgs()), + ) + .await + .expect("valid usage") + } + + /// Open a [`SinceHandle`] for the migration shard. + async fn open_migration_shard_since( + &self, + diagnostics: Diagnostics, + ) -> SinceHandle { + let persist = &self.config.persist_client; + let shard_id = self.txn.get_builtin_migration_shard().expect("must exist"); + + persist + .open_critical_since( + shard_id, + // TODO: We may need to use a different critical reader + // id for this if we want to be able to introspect it via SQL. + PersistClient::CONTROLLER_CRITICAL_SINCE, + diagnostics.clone(), + ) + .await + .expect("valid usage") + } + + /// Update the fingerprints stored for `migrated_items` in the catalog. + /// + /// Also asserts that the stored fingerprints of all other system items match their builtin + /// definitions. + fn update_fingerprints( + &mut self, + migrated_items: &BTreeSet, + ) -> anyhow::Result<()> { + let mut updates = BTreeMap::new(); + for (object, ids) in &self.object_ids { + let Some(builtin) = self.builtins.get(object) else { + bail!("missing builtin {object:?}"); + }; + + let id = ids.catalog_id; + let fingerprint = builtin.fingerprint(); + if fingerprint == ids.fingerprint { + continue; // fingerprint unchanged, nothing to do + } + + // Fingerprint mismatch is expected for a migrated item. + let migrated = migrated_items.contains(&id); + // Some builtin types have schemas but no durable state. No migration needed for those. + let ephemeral = matches!( + builtin, + Builtin::Log(_) | Builtin::View(_) | Builtin::Index(_), + ); + + if migrated || ephemeral { + let new_mapping = SystemObjectMapping { + description: object.clone(), + unique_identifier: SystemObjectUniqueIdentifier { + catalog_id: ids.catalog_id, + global_id: ids.global_id, + fingerprint, + }, + }; + updates.insert(id, new_mapping); + } else if builtin.runtime_alterable() { + // Runtime alterable builtins have no meaningful builtin fingerprint, and a + // sentinel value stored in the catalog. + assert_eq!( + ids.fingerprint, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL, + "fingerprint mismatch for runtime-alterable builtin {builtin:?} ({id})", + ); + } else { + panic!( + "fingerprint mismatch for builtin {builtin:?} ({id}): {} != {}", + fingerprint, ids.fingerprint, + ); + } + } + + self.txn.update_system_object_mappings(updates)?; + + Ok(()) + } + + /// Perform cleanup of migration state, i.e. the migration shard. + /// + /// Returns a `Future` that must be run after the `txn` has been successfully committed to + /// durable state. This is used to remove entries from the migration shard only after we know + /// the respective shards will be finalized. Removing entries immediately would risk leaking + /// the shards. + /// + /// We only perform cleanup in leader mode, to keep the durable state changes made by read-only + /// processes a minimal as possible. Given that Materialize doesn't support version downgrades, + /// it is safe to assume that any state for versions below the `target_version` is not needed + /// anymore and can be cleaned up. + /// + /// Note that it is fine for cleanup to sometimes fail or be skipped. The size of the migration + /// shard should always be pretty small, so keeping migration state around for longer isn't a + /// concern. As a result, we can keep the logic simple here and skip doing cleanup in response + /// to transient failures, instead of retrying. + async fn cleanup(&mut self) -> anyhow::Result> { + let noop_action = async {}.boxed(); + + if self.config.read_only { + return Ok(noop_action); + } + + let collection_metadata = self.txn.get_collection_metadata(); + let diagnostics = Diagnostics { + shard_name: "builtin_migration".to_string(), + handle_purpose: "builtin schema migration cleanup".into(), + }; + let (mut persist_write, mut persist_read) = + self.open_migration_shard(diagnostics.clone()).await; + let mut persist_since = self.open_migration_shard_since(diagnostics.clone()).await; + + let upper = persist_write.fetch_recent_upper().await.clone(); + let write_ts = *upper.as_option().expect("migration shard not sealed"); + let Some(read_ts) = write_ts.step_back() else { + return Ok(noop_action); + }; + + // Collect old entries to remove. + let pred = |key: &migration_shard::Key| key.build_version < self.target_version; + let Some(stale_entries) = read_migration_shard(&mut persist_read, read_ts, pred).await + else { + return Ok(noop_action); + }; + + debug!( + ?stale_entries, + "cleaning migration shard up to version {}", self.target_version, + ); + + let mut unfinalized_shards = BTreeSet::new(); + let mut retractions = Vec::new(); + for (key, shard_id) in stale_entries { + // The migration shard contains both shards created during aborted upgrades and shards + // created during successful upgrades. The latter may still be in use, so we have to + // check and only finalize those that aren't anymore. + let gid = GlobalId::System(key.global_id); + if collection_metadata.get(&gid) != Some(&shard_id) { + unfinalized_shards.insert(shard_id); + } + + retractions.push(((key, shard_id), write_ts, -1)); + } + + // Arrange for shard finalization and subsequent removal of the migration shard entries. + self.txn.insert_unfinalized_shards(unfinalized_shards)?; + let cleanup_action = async move { + if !retractions.is_empty() { + let new_upper = Antichain::from_elem(write_ts.step_forward()); + let result = persist_write + .compare_and_append(retractions, upper, new_upper) + .await + .expect("valid usage"); + match result { + Ok(()) => debug!("cleaned up migration shard"), + Err(mismatch) => debug!(?mismatch, "migration shard cleanup failed"), + } + } + } + .boxed(); + + // Downgrade the since, to enable some compaction. + let o = *persist_since.opaque(); + let new_since = Antichain::from_elem(read_ts); + let result = persist_since + .maybe_compare_and_downgrade_since(&o, (&o, &new_since)) + .await; + soft_assert_or_log!(result.is_none_or(|r| r.is_ok()), "opaque mismatch"); + + Ok(cleanup_action) + } +} + +/// Read the migration shard at the given timestamp, returning all entries that match the given +/// predicate. +/// +/// Returns `None` if the migration shard contains no matching entries, or if it isn't readable at +/// `read_ts`. +async fn read_migration_shard

( + persist_read: &mut ReadHandle, + read_ts: Timestamp, + predicate: P, +) -> Option> +where + P: for<'a> Fn(&migration_shard::Key) -> bool, +{ + let as_of = Antichain::from_elem(read_ts); + let updates = persist_read.snapshot_and_fetch(as_of).await.ok()?; + + assert!( + updates.iter().all(|(_, _, diff)| *diff == 1), + "migration shard contains invalid diffs: {updates:?}", + ); + + let entries: Vec<_> = updates + .into_iter() + .filter_map(|(data, _, _)| { + if let (Ok(key), Ok(val)) = data { + Some((key, val)) + } else { + // If we can't decode the data, it has likely been written by a newer version, so + // we ignore it. + info!("skipping unreadable migration shard entry: {data:?}"); + None + } + }) + .filter(move |(key, _)| predicate(key)) + .collect(); + + (!entries.is_empty()).then_some(entries) +} + +/// A plan to migrate between two versions. +#[derive(Debug, Default)] +struct Plan { + /// Objects to migrate using the `Evolution` mechanism. + evolve: Vec, + /// Objects to migrate using the `Replacement` mechanism. + replace: Vec, +} + +/// Types and persist codec impls for the migration shard used by the `Replacement` mechanism. +mod migration_shard { + use std::fmt; + use std::str::FromStr; + + use arrow::array::{StringArray, StringBuilder}; + use bytes::{BufMut, Bytes}; + use mz_persist_types::Codec; + use mz_persist_types::codec_impls::{ + SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder, + }; + use mz_persist_types::columnar::Schema; + use mz_persist_types::stats::NoneStats; + use semver::Version; + use serde::{Deserialize, Serialize}; + + #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)] + pub(super) struct Key { + pub(super) global_id: u64, + pub(super) build_version: Version, + // Versions < 26.0 didn't include the deploy generation. As long as we still might + // encounter migration shard entries that don't have it, we need to keep this an `Option` + // and keep supporting both key formats. + pub(super) deploy_generation: Option, + } + + impl fmt::Display for Key { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.deploy_generation.is_some() { + // current format + let s = serde_json::to_string(self).expect("JSON serializable"); + f.write_str(&s) + } else { + // pre-26.0 format + write!(f, "{}-{}", self.global_id, self.build_version) + } + } + } + + impl FromStr for Key { + type Err = String; + + fn from_str(s: &str) -> Result { + // current format + if let Ok(key) = serde_json::from_str(s) { + return Ok(key); + }; + + // pre-26.0 format + let parts: Vec<_> = s.splitn(2, '-').collect(); + let &[global_id, build_version] = parts.as_slice() else { + return Err(format!("invalid Key '{s}'")); + }; + let global_id = global_id.parse::().map_err(|e| e.to_string())?; + let build_version = build_version + .parse::() + .map_err(|e| e.to_string())?; + Ok(Key { + global_id, + build_version, + deploy_generation: None, + }) + } + } + + impl Default for Key { + fn default() -> Self { + Self { + global_id: Default::default(), + build_version: Version::new(0, 0, 0), + deploy_generation: Some(0), + } + } + } + + impl Codec for Key { + type Schema = KeySchema; + type Storage = (); + + fn codec_name() -> String { + "TableKey".into() + } + + fn encode(&self, buf: &mut B) { + buf.put(self.to_string().as_bytes()) + } + + fn decode<'a>(buf: &'a [u8], _schema: &KeySchema) -> Result { + let s = str::from_utf8(buf).map_err(|e| e.to_string())?; + s.parse() + } + + fn encode_schema(_schema: &KeySchema) -> Bytes { + Bytes::new() + } + + fn decode_schema(buf: &Bytes) -> Self::Schema { + assert_eq!(*buf, Bytes::new()); + KeySchema + } + } + + impl SimpleColumnarData for Key { + type ArrowBuilder = StringBuilder; + type ArrowColumn = StringArray; + + fn goodbytes(builder: &Self::ArrowBuilder) -> usize { + builder.values_slice().len() + } + + fn push(&self, builder: &mut Self::ArrowBuilder) { + builder.append_value(&self.to_string()); + } + + fn push_null(builder: &mut Self::ArrowBuilder) { + builder.append_null(); + } + + fn read(&mut self, idx: usize, column: &Self::ArrowColumn) { + *self = column.value(idx).parse().expect("valid Key"); + } + } + + #[derive(Debug, PartialEq)] + pub(super) struct KeySchema; + + impl Schema for KeySchema { + type ArrowColumn = StringArray; + type Statistics = NoneStats; + type Decoder = SimpleColumnarDecoder; + type Encoder = SimpleColumnarEncoder; + + fn encoder(&self) -> anyhow::Result> { + Ok(SimpleColumnarEncoder::default()) + } + + fn decoder(&self, col: StringArray) -> anyhow::Result> { + Ok(SimpleColumnarDecoder::new(col)) + } + } +} diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index f1ba888c1f065..8db6c580c4333 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -1066,6 +1066,7 @@ pub struct Config { pub helm_chart_version: Option, pub license_key: ValidatedLicenseKey, pub external_login_password_mz_system: Option, + pub force_builtin_schema_migration: Option, } /// Metadata about an active connection. @@ -4014,6 +4015,7 @@ pub fn serve( helm_chart_version, license_key, external_login_password_mz_system, + force_builtin_schema_migration, }: Config, ) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> { async move { @@ -4121,6 +4123,7 @@ pub fn serve( BuiltinItemMigrationConfig { persist_client: persist_client.clone(), read_only: read_only_controllers, + force_migration: force_builtin_schema_migration, } ; let OpenCatalogResult { @@ -4137,6 +4140,7 @@ pub fn serve( unsafe_mode, all_features, build_info, + deploy_generation: controller_config.deploy_generation, environment_id: environment_id.clone(), read_only: read_only_controllers, now: now.clone(), diff --git a/src/catalog-debug/src/main.rs b/src/catalog-debug/src/main.rs index 41822b03f2ae5..0833b05654c2d 100644 --- a/src/catalog-debug/src/main.rs +++ b/src/catalog-debug/src/main.rs @@ -605,6 +605,7 @@ async fn upgrade_check( unsafe_mode: true, all_features: false, build_info: &BUILD_INFO, + deploy_generation: args.deploy_generation.unwrap_or(0), environment_id: args.environment_id.clone(), read_only, now, @@ -651,6 +652,7 @@ async fn upgrade_check( // client. persist_client: PersistClient::new_for_tests().await, read_only, + force_migration: None, }, persist_client: persist_client.clone(), enable_expression_cache_override: None, diff --git a/src/catalog/src/builtin.rs b/src/catalog/src/builtin.rs index 3ee415812fd05..2de813e3aa6dd 100644 --- a/src/catalog/src/builtin.rs +++ b/src/catalog/src/builtin.rs @@ -28,9 +28,7 @@ use std::collections::BTreeMap; use std::hash::Hash; use std::string::ToString; use std::sync::LazyLock; -use std::sync::Mutex; -use clap::clap_derive::ValueEnum; use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog}; use mz_ore::collections::HashMap; use mz_pgrepr::oid; @@ -61,7 +59,6 @@ use mz_storage_client::healthcheck::{ REPLICA_STATUS_HISTORY_DESC, WALLCLOCK_LAG_HISTORY_DESC, }; use mz_storage_client::statistics::{MZ_SINK_STATISTICS_RAW_DESC, MZ_SOURCE_STATISTICS_RAW_DESC}; -use rand::Rng; use serde::Serialize; use crate::durable::objects::SystemObjectDescription; @@ -350,48 +347,9 @@ impl Fingerprint for &BuiltinLog { } } -/// Allows tests to inject arbitrary amounts of whitespace to forcibly change the fingerprint and -/// trigger a builtin migration. -#[derive(Debug, Clone, ValueEnum)] -pub enum UnsafeBuiltinTableFingerprintWhitespace { - /// Inject whitespace into all builtin table fingerprints. - All, - /// Inject whitespace into half of the builtin table fingerprints, - /// which are randomly selected. - Half, -} -pub static UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE: Mutex< - Option<(UnsafeBuiltinTableFingerprintWhitespace, String)>, -> = Mutex::new(None); - impl Fingerprint for &BuiltinTable { fn fingerprint(&self) -> String { - // This is only called during bootstrapping, so it's not that big of a deal to lock a mutex, - // though it's not great. - let guard = UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE - .lock() - .expect("lock poisoned"); - match &*guard { - // `mz_storage_usage_by_shard` can never be migrated. - _ if self.schema == MZ_STORAGE_USAGE_BY_SHARD.schema - && self.name == MZ_STORAGE_USAGE_BY_SHARD.name => - { - self.desc.fingerprint() - } - Some((UnsafeBuiltinTableFingerprintWhitespace::All, whitespace)) => { - format!("{}{}", self.desc.fingerprint(), whitespace) - } - Some((UnsafeBuiltinTableFingerprintWhitespace::Half, whitespace)) => { - let mut rng = rand::thread_rng(); - let migrate: bool = rng.r#gen(); - if migrate { - format!("{}{}", self.desc.fingerprint(), whitespace) - } else { - self.desc.fingerprint() - } - } - None => self.desc.fingerprint(), - } + self.desc.fingerprint() } } diff --git a/src/catalog/src/config.rs b/src/catalog/src/config.rs index 389959ff902d6..7c333da851159 100644 --- a/src/catalog/src/config.rs +++ b/src/catalog/src/config.rs @@ -50,6 +50,8 @@ pub struct StateConfig { pub all_features: bool, /// Information about this build of Materialize. pub build_info: &'static BuildInfo, + /// The deploy generation with which the process was started. + pub deploy_generation: u64, /// A persistent ID associated with the environment. pub environment_id: EnvironmentId, /// Whether to start Materialize in read-only mode. @@ -104,6 +106,7 @@ pub struct StateConfig { pub struct BuiltinItemMigrationConfig { pub persist_client: PersistClient, pub read_only: bool, + pub force_migration: Option, } #[derive(Debug, Clone, Serialize)] diff --git a/src/catalog/src/memory/error.rs b/src/catalog/src/memory/error.rs index 9aa982bd84e59..34203c0b5064c 100644 --- a/src/catalog/src/memory/error.rs +++ b/src/catalog/src/memory/error.rs @@ -78,11 +78,19 @@ pub enum ErrorKind { #[error( "cannot migrate from catalog version {last_seen_version} to version {this_version} (earlier versions might still work): {cause}" )] - FailedMigration { + FailedCatalogMigration { last_seen_version: String, this_version: &'static str, cause: String, }, + #[error( + "cannot migrate builtin schemas from version {last_seen_version} to version {this_version}: {cause}" + )] + FailedBuiltinSchemaMigration { + last_seen_version: String, + this_version: String, + cause: String, + }, #[error("failpoint {0} reached)")] FailpointReached(String), #[error("{0}")] diff --git a/src/environmentd/src/environmentd/main.rs b/src/environmentd/src/environmentd/main.rs index 5404afbba4255..4ef178aec5aee 100644 --- a/src/environmentd/src/environmentd/main.rs +++ b/src/environmentd/src/environmentd/main.rs @@ -38,10 +38,6 @@ use mz_adapter_types::bootstrap_builtin_cluster_config::{ use mz_auth::password::Password; use mz_aws_secrets_controller::AwsSecretsController; use mz_build_info::BuildInfo; -use mz_catalog::builtin::{ - UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE, - UnsafeBuiltinTableFingerprintWhitespace, -}; use mz_catalog::config::ClusterReplicaSizeMap; use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceController}; use mz_controller::ControllerConfig; @@ -617,21 +613,13 @@ pub struct Args { tracing: TracingCliArgs, // === Testing options. === - /// Injects arbitrary whitespace into builtin table fingerprints, which can - /// trigger builtin item migrations. The amount of whitespace is determined - /// by - /// `unsafe_builtin_table_fingerprint_whitespace_version`. + /// Forces the migration of all builtin storage collections using the + /// specified migration mechanism (either "evolution" or "replacement"). + /// /// This argument is meant for testing only and as the names suggests /// should not be set in production. #[clap(long, value_enum, requires = "unsafe_mode")] - unsafe_builtin_table_fingerprint_whitespace: Option, - /// Controls the amount of whitespace injected by - /// `unsafe_builtin_table_fingerprint_whitespace`. - /// Incrementing this value can allow triggering multiple builtin - /// migrations from a single test. This argument is meant for testing only - /// and as the names suggests should not be set in production. - #[clap(long, requires = "unsafe_mode", default_value = "1")] - unsafe_builtin_table_fingerprint_whitespace_version: usize, + unsafe_force_builtin_schema_migration: Option, } #[derive(ValueEnum, Debug, Clone)] @@ -695,13 +683,9 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> { }; // Configure testing options. - if let Some(fingerprint_whitespace) = args.unsafe_builtin_table_fingerprint_whitespace { - assert!(args.unsafe_mode); - let whitespace = "\n".repeat(args.unsafe_builtin_table_fingerprint_whitespace_version); - *UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE - .lock() - .expect("lock poisoned") = Some((fingerprint_whitespace, whitespace)); - } + let force_builtin_schema_migration = args + .unsafe_force_builtin_schema_migration + .inspect(|_mechanism| assert!(args.unsafe_mode)); // Start Tokio runtime. @@ -1168,6 +1152,7 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> { tracing_handle, // Testing options. now, + force_builtin_schema_migration, }) .await .maybe_terminate("booting server")?; diff --git a/src/environmentd/src/lib.rs b/src/environmentd/src/lib.rs index 5bcc909b40ee4..8104ea0f021d0 100644 --- a/src/environmentd/src/lib.rs +++ b/src/environmentd/src/lib.rs @@ -209,6 +209,9 @@ pub struct Config { // === Testing options. === /// A now generation function for mocking time. pub now: NowFn, + /// If `Some`, force running builtin schema migration using the specified + /// migration mechanism ("evolution" or "replacement"). + pub force_builtin_schema_migration: Option, } /// Configuration for the Catalog. @@ -786,6 +789,7 @@ impl Listeners { helm_chart_version: config.helm_chart_version.clone(), license_key: config.license_key, external_login_password_mz_system: config.external_login_password_mz_system, + force_builtin_schema_migration: config.force_builtin_schema_migration, }) .instrument(info_span!("adapter::serve")) .await?; diff --git a/src/environmentd/src/test_util.rs b/src/environmentd/src/test_util.rs index d9a2fe26dd8e7..8dd6fe95db500 100644 --- a/src/environmentd/src/test_util.rs +++ b/src/environmentd/src/test_util.rs @@ -792,6 +792,7 @@ impl Listeners { helm_chart_version: None, license_key: ValidatedLicenseKey::for_tests(), external_login_password_mz_system: config.external_login_password_mz_system, + force_builtin_schema_migration: None, }) .await?; diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 7773fb4828e9c..191bcf6d5f426 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -702,6 +702,41 @@ impl PersistClient { Ok(machine.latest_schema()) } + /// Registers a schema for the given shard. + /// + /// Returns the new schema ID if the registration succeeds, and `None` + /// otherwise. Schema registration succeeds in two cases: + /// a) No schema was currently registered for the shard. + /// b) The given schema is already registered for the shard. + /// + /// To evolve an existing schema instead, use + /// [PersistClient::compare_and_evolve_schema]. + // + // TODO: unify with `compare_and_evolve_schema` + pub async fn register_schema( + &self, + shard_id: ShardId, + key_schema: &K::Schema, + val_schema: &V::Schema, + diagnostics: Diagnostics, + ) -> Result, InvalidUsage> + where + K: Debug + Codec, + V: Debug + Codec, + T: Timestamp + Lattice + Codec64 + Sync, + D: Monoid + Codec64 + Send + Sync, + { + let machine = self + .make_machine::(shard_id, diagnostics) + .await?; + let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime)); + + let (schema_id, maintenance) = machine.register_schema(key_schema, val_schema).await; + maintenance.start_performing(&machine, &gc); + + Ok(schema_id) + } + /// Registers a new latest schema for the given shard. /// /// This new schema must be [backward_compatible] with all previous schemas diff --git a/src/sqllogictest/src/runner.rs b/src/sqllogictest/src/runner.rs index 29e77004da96a..46e8dfac68588 100644 --- a/src/sqllogictest/src/runner.rs +++ b/src/sqllogictest/src/runner.rs @@ -1234,6 +1234,7 @@ impl<'a> RunnerInner<'a> { helm_chart_version: None, license_key: ValidatedLicenseKey::for_tests(), external_login_password_mz_system: None, + force_builtin_schema_migration: None, }; // We need to run the server on its own Tokio runtime, which in turn // requires its own thread, so that we can wait for any tasks spawned diff --git a/test/0dt/mzcompose.py b/test/0dt/mzcompose.py index 79d8e7c0333ff..1fe69a31e3537 100644 --- a/test/0dt/mzcompose.py +++ b/test/0dt/mzcompose.py @@ -1801,10 +1801,9 @@ def fetch_reconciliation_metrics(c: Composition, process: str) -> tuple[int, int return reused, replaced -def workflow_builtin_item_migrations_replacement(c: Composition) -> None: +def workflow_builtin_schema_migrations_replacement(c: Composition) -> None: """ - Verify builtin item migrations by shard replacement, with schema evolution - disabled. + Verify builtin schema migrations by shard replacement. """ def get_persist_shard_id(item_id: str, service: str) -> str: @@ -1825,10 +1824,7 @@ def get_persist_shard_id(item_id: str, service: str) -> str: c.down(destroy_volumes=True) c.up("mz_old") c.sql( - """ - ALTER SYSTEM SET enable_builtin_migration_schema_evolution = false; - CREATE MATERIALIZED VIEW mv AS SELECT name FROM mz_tables; - """, + "CREATE MATERIALIZED VIEW mv AS SELECT name FROM mz_tables", service="mz_old", port=6877, user="mz_system", @@ -1854,7 +1850,7 @@ def get_persist_shard_id(item_id: str, service: str) -> str: system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS, restart="on-failure", external_metadata_store=True, - force_migrations="all", + force_migrations="replacement", healthcheck=LEADER_STATUS_HEALTHCHECK, default_replication_factor=2, ), @@ -1901,9 +1897,9 @@ def get_persist_shard_id(item_id: str, service: str) -> str: ), f"{replaced} dataflows have been replaced, expected all to be reused" -def workflow_builtin_item_migrations_schema_evolution(c: Composition) -> None: +def workflow_builtin_schema_migrations_evolution(c: Composition) -> None: """ - Verify builtin item migrations by schema evolution. + Verify builtin schema migrations by schema evolution. """ def get_persist_shard_id(item_id: str, service: str) -> str: @@ -1924,10 +1920,7 @@ def get_persist_shard_id(item_id: str, service: str) -> str: c.down(destroy_volumes=True) c.up("mz_old") c.sql( - """ - ALTER SYSTEM SET enable_builtin_migration_schema_evolution = true; - CREATE MATERIALIZED VIEW mv AS SELECT name FROM mz_tables; - """, + "CREATE MATERIALIZED VIEW mv AS SELECT name FROM mz_tables", service="mz_old", port=6877, user="mz_system", @@ -1953,7 +1946,7 @@ def get_persist_shard_id(item_id: str, service: str) -> str: system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS, restart="on-failure", external_metadata_store=True, - force_migrations="all", + force_migrations="evolution", healthcheck=LEADER_STATUS_HEALTHCHECK, default_replication_factor=2, ),