Skip to content

Commit 751819c

Browse files
committed
Implement the new versioning scheme
1 parent 32ab7f1 commit 751819c

File tree

15 files changed

+336
-391
lines changed

15 files changed

+336
-391
lines changed

src/catalog/src/durable.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ impl TestCatalogStateBuilder {
450450
Self {
451451
persist_client,
452452
organization_id: Uuid::new_v4(),
453-
version: semver::Version::new(0, 0, 0),
453+
version: mz_build_info::DUMMY_BUILD_INFO.semver_version(),
454454
deploy_generation: None,
455455
metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
456456
}

src/catalog/src/durable/persist.rs

Lines changed: 19 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -395,36 +395,27 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
395395
/// Increment the version in the catalog upgrade shard to the code's current version.
396396
async fn increment_catalog_upgrade_shard_version(&self, organization_id: Uuid) {
397397
let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
398-
let mut write_handle: WriteHandle<(), (), Timestamp, StorageDiff> = self
398+
399+
let () = self
399400
.persist_client
400-
.open_writer(
401+
.upgrade_version::<(), (), Timestamp, StorageDiff>(
401402
upgrade_shard_id,
402-
Arc::new(UnitSchema::default()),
403-
Arc::new(UnitSchema::default()),
404403
Diagnostics {
405404
shard_name: UPGRADE_SHARD_NAME.to_string(),
406-
handle_purpose: "increment durable catalog upgrade shard version".to_string(),
405+
handle_purpose: "durable catalog state upgrade".to_string(),
407406
},
408407
)
409408
.await
410409
.expect("invalid usage");
411-
const EMPTY_UPDATES: &[(((), ()), Timestamp, StorageDiff)] = &[];
412-
let mut upper = write_handle.fetch_recent_upper().await.clone();
413-
loop {
414-
let next_upper = upper
415-
.iter()
416-
.map(|timestamp| timestamp.step_forward())
417-
.collect();
418-
match write_handle
419-
.compare_and_append(EMPTY_UPDATES, upper, next_upper)
420-
.await
421-
.expect("invalid usage")
422-
{
423-
Ok(()) => break,
424-
Err(upper_mismatch) => {
425-
upper = upper_mismatch.current;
426-
}
427-
}
410+
411+
if cfg!(debug_assertions) {
412+
let fetched_version =
413+
fetch_catalog_upgrade_shard_version(&self.persist_client, upgrade_shard_id).await;
414+
assert_eq!(
415+
Some(&self.catalog_content_version),
416+
fetched_version.as_ref(),
417+
"code version should match the upgraded data version"
418+
);
428419
}
429420
}
430421

@@ -998,16 +989,12 @@ impl UnopenedPersistCatalogState {
998989
// If this is `None`, no version was found in the upgrade shard. This is a brand-new
999990
// environment, and we don't need to worry about fencing existing users.
1000991
if let Some(version_in_upgrade_shard) = version_in_upgrade_shard {
1001-
// IMPORTANT: We swap the order of arguments here! Normally it's
1002-
// `code_version, data_version`, and we check whether a given code
1003-
// version, which is usually _older_, is allowed to touch a shard
1004-
// that has been touched by a _future_ version.
1005-
//
1006-
// By inverting argument order, we check if our version is too far
1007-
// ahead of the version in the shard.
1008-
if mz_persist_client::cfg::check_data_version(&version_in_upgrade_shard, &version)
1009-
.is_err()
1010-
{
992+
// Check that the current version of the code can handle data from the shard.
993+
// (We used to reverse this check, to confirm that whatever code wrote the data
994+
// in the shard would be able to read data written by the current version... but
995+
// we now require the current code to be able to maintain compat with whatever
996+
// data format versions pass this check.)
997+
if !mz_persist_client::cfg::code_can_write_data(&version, &version_in_upgrade_shard) {
1011998
return Err(DurableCatalogError::IncompatiblePersistVersion {
1012999
found_version: version_in_upgrade_shard,
10131000
catalog_version: version,

src/catalog/src/durable/persist/tests.rs

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,18 @@ use mz_persist_client::PersistLocation;
1212
use mz_persist_client::cache::PersistClientCache;
1313
use uuid::Uuid;
1414

15-
use crate::durable::persist::{
16-
CATALOG_SEED, UPGRADE_SEED, fetch_catalog_upgrade_shard_version, shard_id,
17-
};
15+
use crate::durable::persist::{UPGRADE_SEED, fetch_catalog_upgrade_shard_version, shard_id};
1816
use crate::durable::{DurableCatalogError, TestCatalogStateBuilder, test_bootstrap_args};
1917

2018
/// Test that the catalog forces users to upgrade one version at a time.
2119
#[mz_ore::test(tokio::test)]
2220
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2321
async fn test_upgrade_shard() {
24-
let first_version = semver::Version::parse("0.10.0").expect("failed to parse version");
25-
let second_version = semver::Version::parse("0.11.0").expect("failed to parse version");
22+
let first_version = semver::Version::parse("0.147.0").expect("failed to parse version");
23+
let second_version = semver::Version::parse("26.0.0").expect("failed to parse version");
2624
let second_dev_version =
27-
semver::Version::parse("0.11.0-dev.0").expect("failed to parse version");
28-
let third_version = semver::Version::parse("0.12.0").expect("failed to parse version");
25+
semver::Version::parse("26.0.0-dev.0").expect("failed to parse version");
26+
let third_version = semver::Version::parse("27.1.0").expect("failed to parse version");
2927
let organization_id = Uuid::new_v4();
3028
let deploy_generation = 0;
3129
let mut persist_cache = PersistClientCache::new_no_metrics();
@@ -156,12 +154,12 @@ async fn test_upgrade_shard() {
156154
#[mz_ore::test(tokio::test)]
157155
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
158156
async fn test_version_regression() {
159-
let first_version = semver::Version::parse("0.10.0").expect("failed to parse version");
160-
let second_version = semver::Version::parse("0.11.0").expect("failed to parse version");
157+
let first_version = semver::Version::parse("0.147.0").expect("failed to parse version");
158+
let second_version = semver::Version::parse("26.1.0").expect("failed to parse version");
161159
let organization_id = Uuid::new_v4();
162160
let deploy_generation = 0;
163161
let mut persist_cache = PersistClientCache::new_no_metrics();
164-
let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
162+
let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
165163

166164
persist_cache.cfg.build_version = first_version.clone();
167165
let persist_client = persist_cache
@@ -171,7 +169,7 @@ async fn test_version_regression() {
171169

172170
assert_eq!(
173171
None,
174-
fetch_catalog_upgrade_shard_version(&persist_client, catalog_shard_id).await
172+
fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await
175173
);
176174

177175
let persist_openable_state = TestCatalogStateBuilder::new(persist_client.clone())
@@ -188,7 +186,7 @@ async fn test_version_regression() {
188186

189187
assert_eq!(
190188
Some(first_version.clone()),
191-
fetch_catalog_upgrade_shard_version(&persist_client, catalog_shard_id).await
189+
fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await
192190
);
193191

194192
persist_cache.cfg.build_version = second_version.clone();
@@ -210,30 +208,32 @@ async fn test_version_regression() {
210208

211209
assert_eq!(
212210
Some(second_version.clone()),
213-
fetch_catalog_upgrade_shard_version(&persist_client, catalog_shard_id).await
211+
fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await
214212
);
215213

216-
persist_cache.cfg.build_version = first_version.clone();
217-
let persist_client = persist_cache
218-
.open(PersistLocation::new_in_mem())
219-
.await
220-
.expect("in-mem location is valid");
221-
let err = TestCatalogStateBuilder::new(persist_client.clone())
222-
.with_organization_id(organization_id)
223-
.with_deploy_generation(deploy_generation)
224-
.with_version(first_version.clone())
225-
.build()
226-
.await
227-
.expect_err("skipping versions should error");
228-
assert!(
229-
matches!(
230-
&err,
231-
DurableCatalogError::IncompatiblePersistVersion {
232-
found_version,
233-
catalog_version
234-
}
235-
if found_version == &second_version && catalog_version == &first_version
236-
),
237-
"Unexpected error: {err:?}"
238-
);
214+
// NB: running the following is expected to halt the process, but we can't catch halts in tests.
215+
//
216+
// persist_cache.cfg.build_version = first_version.clone();
217+
// let persist_client = persist_cache
218+
// .open(PersistLocation::new_in_mem())
219+
// .await
220+
// .expect("in-mem location is valid");
221+
// let err = TestCatalogStateBuilder::new(persist_client.clone())
222+
// .with_organization_id(organization_id)
223+
// .with_deploy_generation(deploy_generation)
224+
// .with_version(first_version.clone())
225+
// .build()
226+
// .await
227+
// .expect_err("skipping versions should error");
228+
// assert!(
229+
// matches!(
230+
// &err,
231+
// DurableCatalogError::IncompatiblePersistVersion {
232+
// found_version,
233+
// catalog_version
234+
// }
235+
// if found_version == &second_version && catalog_version == &first_version
236+
// ),
237+
// "Unexpected error: {err:?}"
238+
// );
239239
}

src/catalog/src/durable/transaction.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3407,12 +3407,13 @@ where
34073407
mod tests {
34083408
use super::*;
34093409
use mz_ore::{assert_none, assert_ok};
3410-
3411-
use mz_ore::now::SYSTEM_TIME;
3412-
use mz_persist_client::PersistClient;
3410+
use semver::Version;
34133411

34143412
use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
34153413
use crate::memory;
3414+
use mz_ore::now::SYSTEM_TIME;
3415+
use mz_persist_client::cache::PersistClientCache;
3416+
use mz_persist_types::PersistLocation;
34163417

34173418
#[mz_ore::test]
34183419
fn test_table_transaction_simple() {
@@ -3942,9 +3943,16 @@ mod tests {
39423943
#[mz_ore::test(tokio::test)]
39433944
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
39443945
async fn test_savepoint() {
3945-
let persist_client = PersistClient::new_for_tests().await;
3946-
let state_builder =
3947-
TestCatalogStateBuilder::new(persist_client).with_default_deploy_generation();
3946+
const VERSION: Version = Version::new(26, 0, 0);
3947+
let mut persist_cache = PersistClientCache::new_no_metrics();
3948+
persist_cache.cfg.build_version = VERSION;
3949+
let persist_client = persist_cache
3950+
.open(PersistLocation::new_in_mem())
3951+
.await
3952+
.unwrap();
3953+
let state_builder = TestCatalogStateBuilder::new(persist_client)
3954+
.with_default_deploy_generation()
3955+
.with_version(VERSION);
39483956

39493957
// Initialize catalog.
39503958
let _ = state_builder

src/catalog/tests/open.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -558,12 +558,14 @@ async fn test_open(state_builder: TestCatalogStateBuilder) {
558558
#[mz_ore::test(tokio::test)]
559559
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
560560
async fn test_persist_unopened_deploy_generation_fencing() {
561-
let persist_client = PersistClient::new_for_tests().await;
561+
let mut persist_cache = PersistClientCache::new_no_metrics();
562+
persist_cache.cfg.build_version = semver::Version::new(0, 1, 0);
563+
let persist_client = persist_cache
564+
.open(PersistLocation::new_in_mem())
565+
.await
566+
.unwrap();
562567
let state_builder = TestCatalogStateBuilder::new(persist_client);
563-
test_unopened_deploy_generation_fencing(state_builder).await;
564-
}
565568

566-
async fn test_unopened_deploy_generation_fencing(state_builder: TestCatalogStateBuilder) {
567569
// Initialize catalog.
568570
let deploy_generation = 0;
569571
let version = semver::Version::new(0, 1, 0);
@@ -654,10 +656,7 @@ async fn test_unopened_deploy_generation_fencing(state_builder: TestCatalogState
654656
.await
655657
.unwrap_err();
656658
assert!(
657-
matches!(
658-
err,
659-
DurableCatalogError::Fence(FenceError::DeployGeneration { .. })
660-
),
659+
matches!(err, DurableCatalogError::IncompatiblePersistVersion { .. }),
661660
"unexpected err: {err:?}"
662661
);
663662
}
@@ -893,8 +892,14 @@ async fn test_persist_version_fencing() {
893892
}
894893

895894
testcase("0.10.0", "0.10.0", Ok(())).await;
896-
testcase("0.10.0", "0.11.0", Ok(())).await;
897-
testcase("0.10.0", "0.12.0", Err(())).await;
895+
testcase("0.147.0", "0.148.1", Ok(())).await;
896+
testcase("0.10.0", "0.148.1", Err(())).await;
897+
testcase("0.147.0", "0.158.0", Ok(())).await;
898+
testcase("0.147.0", "26.0.0", Ok(())).await;
899+
testcase("0.160.0", "26.0.0", Ok(())).await;
900+
testcase("26.0.0", "26.10.0", Ok(())).await;
901+
testcase("26.1.0", "27.0.0", Ok(())).await;
902+
testcase("0.147.0", "27.0.0", Err(())).await;
898903
}
899904

900905
#[mz_ore::test(tokio::test)]

src/catalog/tests/snapshots/debug__opened_trace.snap

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2386,7 +2386,7 @@ Trace {
23862386
name: "catalog_content_version",
23872387
},
23882388
SettingValue {
2389-
value: "0.0.0",
2389+
value: "0.0.0+dummy",
23902390
},
23912391
),
23922392
2,

src/catalog/tests/snapshots/open__initial_snapshot.snap

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
---
22
source: src/catalog/tests/open.rs
3-
assertion_line: 510
43
expression: test_snapshot
54
---
65
Snapshot {
@@ -1461,7 +1460,7 @@ Snapshot {
14611460
SettingKey {
14621461
name: "catalog_content_version",
14631462
}: SettingValue {
1464-
value: "0.0.0",
1463+
value: "0.0.0+dummy",
14651464
},
14661465
},
14671466
source_references: {},

0 commit comments

Comments
 (0)