Skip to content

Commit 44fa31b

Browse files
committed
adapter: key migration shard by deploy generation too
This commit adds the deploy generation to the `Key` used in the migration shard. This is necessary because some of our tests simulate 0dt upgrades by bumping the deploy generation without actually bumping the version, and those tests get confused when the deploy generation isn't in the key. Note that this requires changing the serialization format of the migration shard key. We choose JSON, to make the format easier to modify in the future. We still need to be able to parse the older format so we can clean up old entries.
1 parent 775384e commit 44fa31b

File tree

6 files changed

+43
-9
lines changed

6 files changed

+43
-9
lines changed

src/adapter/src/catalog.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,7 @@ impl Catalog {
684684
unsafe_mode: true,
685685
all_features: false,
686686
build_info,
687+
deploy_generation: 0,
687688
environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
688689
read_only,
689690
now,

src/adapter/src/catalog/open.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ impl Catalog {
181181
license_key: config.license_key,
182182
};
183183

184+
let deploy_generation = storage.get_deployment_generation().await?;
185+
184186
let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
185187
assert!(!updates.is_empty(), "initial catalog snapshot is missing");
186188
let mut txn = storage.transaction().await?;
@@ -481,6 +483,7 @@ impl Catalog {
481483
// Migrate builtin items.
482484
let schema_migration_result = builtin_schema_migration::run(
483485
config.build_info,
486+
deploy_generation,
484487
&mut txn,
485488
config.builtin_item_migration_config,
486489
)

src/adapter/src/catalog/open/builtin_schema_migration.rs

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ impl Default for MigrationResult {
190190
/// required migrations, respectively.
191191
pub(super) async fn run(
192192
build_info: &BuildInfo,
193+
deploy_generation: u64,
193194
txn: &mut Transaction<'_>,
194195
config: BuiltinItemMigrationConfig,
195196
) -> Result<MigrationResult, Error> {
@@ -223,6 +224,7 @@ pub(super) async fn run(
223224
let migration = Migration::new(
224225
durable_version.clone(),
225226
build_version.clone(),
227+
deploy_generation,
226228
txn,
227229
builtins,
228230
config,
@@ -243,6 +245,7 @@ pub(super) async fn run(
243245
struct Migration<'a, 'b> {
244246
source_version: Version,
245247
target_version: Version,
248+
deploy_generation: u64,
246249
txn: &'a mut Transaction<'b>,
247250
builtins: BTreeMap<SystemObjectDescription, &'static Builtin<NameReference>>,
248251
object_ids: BTreeMap<SystemObjectDescription, SystemObjectUniqueIdentifier>,
@@ -253,6 +256,7 @@ impl<'a, 'b> Migration<'a, 'b> {
253256
fn new(
254257
source_version: Version,
255258
target_version: Version,
259+
deploy_generation: u64,
256260
txn: &'a mut Transaction<'b>,
257261
builtins: BTreeMap<SystemObjectDescription, &'static Builtin<NameReference>>,
258262
config: BuiltinItemMigrationConfig,
@@ -265,6 +269,7 @@ impl<'a, 'b> Migration<'a, 'b> {
265269
Self {
266270
source_version,
267271
target_version,
272+
deploy_generation,
268273
txn,
269274
builtins,
270275
object_ids,
@@ -600,7 +605,7 @@ impl<'a, 'b> Migration<'a, 'b> {
600605
}
601606

602607
/// Try to get or insert replacement shards for the given IDs into the migration shard, at
603-
/// `target_version`.
608+
/// `target_version` and `deploy_generation`.
604609
///
605610
/// This method looks for existing entries in the migration shards and returns those if they
606611
/// are present. Otherwise it generates new shard IDs and tries to insert them.
@@ -617,13 +622,13 @@ impl<'a, 'b> Migration<'a, 'b> {
617622
let upper = persist_write.fetch_recent_upper().await;
618623
let write_ts = *upper.as_option().expect("migration shard not sealed");
619624

620-
// Another process might already have done a shard replacement at our version, in which
621-
// case we can directly reuse the replacement shards.
622-
//
623-
// TODO: We should check for deploy generation as well as build version. Storing the deploy
624-
// generation as well requires changing the key schema of the migration shard.
625+
// Another process might already have done a shard replacement at our version and
626+
// generation, in which case we can directly reuse the replacement shards.
625627
if let Some(read_ts) = write_ts.step_back() {
626-
let pred = |key: &migration_shard::Key| key.build_version == self.target_version;
628+
let pred = |key: &migration_shard::Key| {
629+
key.build_version == self.target_version
630+
&& key.deploy_generation == Some(self.deploy_generation)
631+
};
627632
if let Some(entries) = read_migration_shard(persist_read, read_ts, pred).await {
628633
let replaced_shards: BTreeMap<_, _> = entries
629634
.into_iter()
@@ -659,6 +664,7 @@ impl<'a, 'b> Migration<'a, 'b> {
659664
let key = migration_shard::Key {
660665
global_id,
661666
build_version: self.target_version.clone(),
667+
deploy_generation: Some(self.deploy_generation),
662668
};
663669
updates.push(((key, shard_id), write_ts, 1));
664670
}
@@ -939,23 +945,41 @@ mod migration_shard {
939945
use mz_persist_types::columnar::Schema;
940946
use mz_persist_types::stats::NoneStats;
941947
use semver::Version;
948+
use serde::{Deserialize, Serialize};
942949

943-
#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
950+
#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
944951
pub(super) struct Key {
945952
pub(super) global_id: u64,
946953
pub(super) build_version: Version,
954+
// Versions < 26.0 didn't include the deploy generation. As long as we still might
955+
// encounter migration shard entries that don't have it, we need to keep this an `Option`
956+
// and keep supporting both key formats.
957+
pub(super) deploy_generation: Option<u64>,
947958
}
948959

949960
impl fmt::Display for Key {
950961
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
951-
write!(f, "{}-{}", self.global_id, self.build_version)
962+
if self.deploy_generation.is_some() {
963+
// current format
964+
let s = serde_json::to_string(self).expect("JSON serializable");
965+
f.write_str(&s)
966+
} else {
967+
// pre-26.0 format
968+
write!(f, "{}-{}", self.global_id, self.build_version)
969+
}
952970
}
953971
}
954972

955973
impl FromStr for Key {
956974
type Err = String;
957975

958976
fn from_str(s: &str) -> Result<Self, String> {
977+
// current format
978+
if let Ok(key) = serde_json::from_str(s) {
979+
return Ok(key);
980+
};
981+
982+
// pre-26.0 format
959983
let parts: Vec<_> = s.splitn(2, '-').collect();
960984
let &[global_id, build_version] = parts.as_slice() else {
961985
return Err(format!("invalid Key '{s}'"));
@@ -967,6 +991,7 @@ mod migration_shard {
967991
Ok(Key {
968992
global_id,
969993
build_version,
994+
deploy_generation: None,
970995
})
971996
}
972997
}
@@ -976,6 +1001,7 @@ mod migration_shard {
9761001
Self {
9771002
global_id: Default::default(),
9781003
build_version: Version::new(0, 0, 0),
1004+
deploy_generation: Some(0),
9791005
}
9801006
}
9811007
}

src/adapter/src/coord.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4140,6 +4140,7 @@ pub fn serve(
41404140
unsafe_mode,
41414141
all_features,
41424142
build_info,
4143+
deploy_generation: controller_config.deploy_generation,
41434144
environment_id: environment_id.clone(),
41444145
read_only: read_only_controllers,
41454146
now: now.clone(),

src/catalog-debug/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,7 @@ async fn upgrade_check(
605605
unsafe_mode: true,
606606
all_features: false,
607607
build_info: &BUILD_INFO,
608+
deploy_generation: args.deploy_generation.unwrap_or(0),
608609
environment_id: args.environment_id.clone(),
609610
read_only,
610611
now,

src/catalog/src/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ pub struct StateConfig {
5050
pub all_features: bool,
5151
/// Information about this build of Materialize.
5252
pub build_info: &'static BuildInfo,
53+
/// The deploy generation with which the process was started.
54+
pub deploy_generation: u64,
5355
/// A persistent ID associated with the environment.
5456
pub environment_id: EnvironmentId,
5557
/// Whether to start Materialize in read-only mode.

0 commit comments

Comments
 (0)