Skip to content

Commit 28a8446

Browse files
committed
Implement the new versioning scheme
1 parent 794984f commit 28a8446

File tree

13 files changed

+341
-388
lines changed

13 files changed

+341
-388
lines changed

src/catalog/src/durable/persist.rs

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -395,36 +395,30 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
395395
/// Increment the version in the catalog upgrade shard to the code's current version.
396396
async fn increment_catalog_upgrade_shard_version(&self, organization_id: Uuid) {
397397
let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
398-
let mut write_handle: WriteHandle<(), (), Timestamp, StorageDiff> = self
398+
399+
let () = self
399400
.persist_client
400-
.open_writer(
401+
.upgrade_version::<(), (), Timestamp, StorageDiff>(
401402
upgrade_shard_id,
402-
Arc::new(UnitSchema::default()),
403-
Arc::new(UnitSchema::default()),
404403
Diagnostics {
405404
shard_name: UPGRADE_SHARD_NAME.to_string(),
406-
handle_purpose: "increment durable catalog upgrade shard version".to_string(),
405+
handle_purpose: "durable catalog state upgrade".to_string(),
407406
},
408407
)
409408
.await
410409
.expect("invalid usage");
411-
const EMPTY_UPDATES: &[(((), ()), Timestamp, StorageDiff)] = &[];
412-
let mut upper = write_handle.fetch_recent_upper().await.clone();
413-
loop {
414-
let next_upper = upper
415-
.iter()
416-
.map(|timestamp| timestamp.step_forward())
417-
.collect();
418-
match write_handle
419-
.compare_and_append(EMPTY_UPDATES, upper, next_upper)
420-
.await
421-
.expect("invalid usage")
422-
{
423-
Ok(()) => break,
424-
Err(upper_mismatch) => {
425-
upper = upper_mismatch.current;
426-
}
427-
}
410+
411+
if cfg!(debug_assertions) {
412+
let fetched_version =
413+
fetch_catalog_upgrade_shard_version(&self.persist_client, upgrade_shard_id)
414+
.await
415+
.expect("fetched just-set version");
416+
assert!(
417+
self.catalog_content_version
418+
.cmp_precedence(&fetched_version)
419+
.is_eq(),
420+
"code version should match the upgraded data version"
421+
);
428422
}
429423
}
430424

@@ -998,16 +992,12 @@ impl UnopenedPersistCatalogState {
998992
// If this is `None`, no version was found in the upgrade shard. This is a brand-new
999993
// environment, and we don't need to worry about fencing existing users.
1000994
if let Some(version_in_upgrade_shard) = version_in_upgrade_shard {
1001-
// IMPORTANT: We swap the order of arguments here! Normally it's
1002-
// `code_version, data_version`, and we check whether a given code
1003-
// version, which is usually _older_, is allowed to touch a shard
1004-
// that has been touched by a _future_ version.
1005-
//
1006-
// By inverting argument order, we check if our version is too far
1007-
// ahead of the version in the shard.
1008-
if mz_persist_client::cfg::check_data_version(&version_in_upgrade_shard, &version)
1009-
.is_err()
1010-
{
995+
// Check that the current version of the code can handle data from the shard.
996+
// (We used to reverse this check, to confirm that whatever code wrote the data
997+
// in the shard would be able to read data written by the current version... but
998+
// we now require the current code to be able to maintain compat with whatever
999+
// data format versions pass this check.)
1000+
if !mz_persist_client::cfg::code_can_write_data(&version, &version_in_upgrade_shard) {
10111001
return Err(DurableCatalogError::IncompatiblePersistVersion {
10121002
found_version: version_in_upgrade_shard,
10131003
catalog_version: version,

src/catalog/src/durable/persist/tests.rs

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,18 @@ use mz_persist_client::PersistLocation;
1212
use mz_persist_client::cache::PersistClientCache;
1313
use uuid::Uuid;
1414

15-
use crate::durable::persist::{
16-
CATALOG_SEED, UPGRADE_SEED, fetch_catalog_upgrade_shard_version, shard_id,
17-
};
15+
use crate::durable::persist::{UPGRADE_SEED, fetch_catalog_upgrade_shard_version, shard_id};
1816
use crate::durable::{DurableCatalogError, TestCatalogStateBuilder, test_bootstrap_args};
1917

2018
/// Test that the catalog forces users to upgrade one version at a time.
2119
#[mz_ore::test(tokio::test)]
2220
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2321
async fn test_upgrade_shard() {
24-
let first_version = semver::Version::parse("0.10.0").expect("failed to parse version");
25-
let second_version = semver::Version::parse("0.11.0").expect("failed to parse version");
22+
let first_version = semver::Version::parse("0.147.0").expect("failed to parse version");
23+
let second_version = semver::Version::parse("26.0.0").expect("failed to parse version");
2624
let second_dev_version =
27-
semver::Version::parse("0.11.0-dev.0").expect("failed to parse version");
28-
let third_version = semver::Version::parse("0.12.0").expect("failed to parse version");
25+
semver::Version::parse("26.0.0-dev.0").expect("failed to parse version");
26+
let third_version = semver::Version::parse("27.1.0").expect("failed to parse version");
2927
let organization_id = Uuid::new_v4();
3028
let deploy_generation = 0;
3129
let mut persist_cache = PersistClientCache::new_no_metrics();
@@ -156,12 +154,12 @@ async fn test_upgrade_shard() {
156154
#[mz_ore::test(tokio::test)]
157155
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
158156
async fn test_version_regression() {
159-
let first_version = semver::Version::parse("0.10.0").expect("failed to parse version");
160-
let second_version = semver::Version::parse("0.11.0").expect("failed to parse version");
157+
let first_version = semver::Version::parse("0.147.0").expect("failed to parse version");
158+
let second_version = semver::Version::parse("26.1.0").expect("failed to parse version");
161159
let organization_id = Uuid::new_v4();
162160
let deploy_generation = 0;
163161
let mut persist_cache = PersistClientCache::new_no_metrics();
164-
let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
162+
let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
165163

166164
persist_cache.cfg.build_version = first_version.clone();
167165
let persist_client = persist_cache
@@ -171,7 +169,7 @@ async fn test_version_regression() {
171169

172170
assert_eq!(
173171
None,
174-
fetch_catalog_upgrade_shard_version(&persist_client, catalog_shard_id).await
172+
fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await
175173
);
176174

177175
let persist_openable_state = TestCatalogStateBuilder::new(persist_client.clone())
@@ -188,7 +186,7 @@ async fn test_version_regression() {
188186

189187
assert_eq!(
190188
Some(first_version.clone()),
191-
fetch_catalog_upgrade_shard_version(&persist_client, catalog_shard_id).await
189+
fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await
192190
);
193191

194192
persist_cache.cfg.build_version = second_version.clone();
@@ -210,30 +208,32 @@ async fn test_version_regression() {
210208

211209
assert_eq!(
212210
Some(second_version.clone()),
213-
fetch_catalog_upgrade_shard_version(&persist_client, catalog_shard_id).await
211+
fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await
214212
);
215213

216-
persist_cache.cfg.build_version = first_version.clone();
217-
let persist_client = persist_cache
218-
.open(PersistLocation::new_in_mem())
219-
.await
220-
.expect("in-mem location is valid");
221-
let err = TestCatalogStateBuilder::new(persist_client.clone())
222-
.with_organization_id(organization_id)
223-
.with_deploy_generation(deploy_generation)
224-
.with_version(first_version.clone())
225-
.build()
226-
.await
227-
.expect_err("skipping versions should error");
228-
assert!(
229-
matches!(
230-
&err,
231-
DurableCatalogError::IncompatiblePersistVersion {
232-
found_version,
233-
catalog_version
234-
}
235-
if found_version == &second_version && catalog_version == &first_version
236-
),
237-
"Unexpected error: {err:?}"
238-
);
214+
// NB: running the following is expected to halt the process, but we can't catch halts in tests.
215+
//
216+
// persist_cache.cfg.build_version = first_version.clone();
217+
// let persist_client = persist_cache
218+
// .open(PersistLocation::new_in_mem())
219+
// .await
220+
// .expect("in-mem location is valid");
221+
// let err = TestCatalogStateBuilder::new(persist_client.clone())
222+
// .with_organization_id(organization_id)
223+
// .with_deploy_generation(deploy_generation)
224+
// .with_version(first_version.clone())
225+
// .build()
226+
// .await
227+
// .expect_err("skipping versions should error");
228+
// assert!(
229+
// matches!(
230+
// &err,
231+
// DurableCatalogError::IncompatiblePersistVersion {
232+
// found_version,
233+
// catalog_version
234+
// }
235+
// if found_version == &second_version && catalog_version == &first_version
236+
// ),
237+
// "Unexpected error: {err:?}"
238+
// );
239239
}

src/catalog/src/durable/transaction.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3407,12 +3407,13 @@ where
34073407
mod tests {
34083408
use super::*;
34093409
use mz_ore::{assert_none, assert_ok};
3410-
3411-
use mz_ore::now::SYSTEM_TIME;
3412-
use mz_persist_client::PersistClient;
3410+
use semver::Version;
34133411

34143412
use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
34153413
use crate::memory;
3414+
use mz_ore::now::SYSTEM_TIME;
3415+
use mz_persist_client::cache::PersistClientCache;
3416+
use mz_persist_types::PersistLocation;
34163417

34173418
#[mz_ore::test]
34183419
fn test_table_transaction_simple() {
@@ -3942,9 +3943,16 @@ mod tests {
39423943
#[mz_ore::test(tokio::test)]
39433944
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
39443945
async fn test_savepoint() {
3945-
let persist_client = PersistClient::new_for_tests().await;
3946-
let state_builder =
3947-
TestCatalogStateBuilder::new(persist_client).with_default_deploy_generation();
3946+
const VERSION: Version = Version::new(26, 0, 0);
3947+
let mut persist_cache = PersistClientCache::new_no_metrics();
3948+
persist_cache.cfg.build_version = VERSION;
3949+
let persist_client = persist_cache
3950+
.open(PersistLocation::new_in_mem())
3951+
.await
3952+
.unwrap();
3953+
let state_builder = TestCatalogStateBuilder::new(persist_client)
3954+
.with_default_deploy_generation()
3955+
.with_version(VERSION);
39483956

39493957
// Initialize catalog.
39503958
let _ = state_builder

src/catalog/tests/open.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -571,12 +571,14 @@ async fn test_open(state_builder: TestCatalogStateBuilder) {
571571
#[mz_ore::test(tokio::test)]
572572
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
573573
async fn test_persist_unopened_deploy_generation_fencing() {
574-
let persist_client = PersistClient::new_for_tests().await;
574+
let mut persist_cache = PersistClientCache::new_no_metrics();
575+
persist_cache.cfg.build_version = semver::Version::new(0, 1, 0);
576+
let persist_client = persist_cache
577+
.open(PersistLocation::new_in_mem())
578+
.await
579+
.unwrap();
575580
let state_builder = TestCatalogStateBuilder::new(persist_client);
576-
test_unopened_deploy_generation_fencing(state_builder).await;
577-
}
578581

579-
async fn test_unopened_deploy_generation_fencing(state_builder: TestCatalogStateBuilder) {
580582
// Initialize catalog.
581583
let deploy_generation = 0;
582584
let version = semver::Version::new(0, 1, 0);
@@ -667,10 +669,7 @@ async fn test_unopened_deploy_generation_fencing(state_builder: TestCatalogState
667669
.await
668670
.unwrap_err();
669671
assert!(
670-
matches!(
671-
err,
672-
DurableCatalogError::Fence(FenceError::DeployGeneration { .. })
673-
),
672+
matches!(err, DurableCatalogError::IncompatiblePersistVersion { .. }),
674673
"unexpected err: {err:?}"
675674
);
676675
}
@@ -906,8 +905,14 @@ async fn test_persist_version_fencing() {
906905
}
907906

908907
testcase("0.10.0", "0.10.0", Ok(())).await;
909-
testcase("0.10.0", "0.11.0", Ok(())).await;
910-
testcase("0.10.0", "0.12.0", Err(())).await;
908+
testcase("0.147.0", "0.148.1", Ok(())).await;
909+
testcase("0.10.0", "0.148.1", Err(())).await;
910+
testcase("0.147.0", "0.158.0", Ok(())).await;
911+
testcase("0.147.0", "26.0.0", Ok(())).await;
912+
testcase("0.160.0", "26.0.0", Ok(())).await;
913+
testcase("26.0.0", "26.10.0", Ok(())).await;
914+
testcase("26.1.0", "27.0.0", Ok(())).await;
915+
testcase("0.147.0", "27.0.0", Err(())).await;
911916
}
912917

913918
#[mz_ore::test(tokio::test)]

src/catalog/tests/snapshots/open__initial_snapshot.snap

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
---
22
source: src/catalog/tests/open.rs
3-
assertion_line: 510
43
expression: test_snapshot
54
---
65
Snapshot {

0 commit comments

Comments
 (0)