Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 10 additions & 33 deletions src/catalog/src/durable/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,37 +395,18 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
/// Increment the version in the catalog upgrade shard to the code's current version.
async fn increment_catalog_upgrade_shard_version(&self, organization_id: Uuid) {
let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
let mut write_handle: WriteHandle<(), (), Timestamp, StorageDiff> = self

let () = self
.persist_client
.open_writer(
.upgrade_version::<(), (), Timestamp, StorageDiff>(
upgrade_shard_id,
Arc::new(UnitSchema::default()),
Arc::new(UnitSchema::default()),
Diagnostics {
shard_name: UPGRADE_SHARD_NAME.to_string(),
handle_purpose: "increment durable catalog upgrade shard version".to_string(),
handle_purpose: "durable catalog state upgrade".to_string(),
},
)
.await
.expect("invalid usage");
const EMPTY_UPDATES: &[(((), ()), Timestamp, StorageDiff)] = &[];
let mut upper = write_handle.fetch_recent_upper().await.clone();
loop {
let next_upper = upper
.iter()
.map(|timestamp| timestamp.step_forward())
.collect();
match write_handle
.compare_and_append(EMPTY_UPDATES, upper, next_upper)
.await
.expect("invalid usage")
{
Ok(()) => break,
Err(upper_mismatch) => {
upper = upper_mismatch.current;
}
}
}
}

/// Fetch the current upper of the catalog state.
Expand Down Expand Up @@ -998,16 +979,12 @@ impl UnopenedPersistCatalogState {
// If this is `None`, no version was found in the upgrade shard. This is a brand-new
// environment, and we don't need to worry about fencing existing users.
if let Some(version_in_upgrade_shard) = version_in_upgrade_shard {
// IMPORTANT: We swap the order of arguments here! Normally it's
// `code_version, data_version`, and we check whether a given code
// version, which is usually _older_, is allowed to touch a shard
// that has been touched by a _future_ version.
//
// By inverting argument order, we check if our version is too far
// ahead of the version in the shard.
if mz_persist_client::cfg::check_data_version(&version_in_upgrade_shard, &version)
.is_err()
{
// Check that the current version of the code can handle data from the shard.
// (We used to reverse this check, to confirm that whatever code wrote the data
// in the shard would be able to read data written by the current version... but
// we now require the current code to be able to maintain compat with whatever
// data format versions pass this check.)
if !mz_persist_client::cfg::code_can_write_data(&version, &version_in_upgrade_shard) {
return Err(DurableCatalogError::IncompatiblePersistVersion {
found_version: version_in_upgrade_shard,
catalog_version: version,
Expand Down
48 changes: 11 additions & 37 deletions src/catalog/src/durable/persist/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,18 @@ use mz_persist_client::PersistLocation;
use mz_persist_client::cache::PersistClientCache;
use uuid::Uuid;

use crate::durable::persist::{
CATALOG_SEED, UPGRADE_SEED, fetch_catalog_upgrade_shard_version, shard_id,
};
use crate::durable::persist::{UPGRADE_SEED, fetch_catalog_upgrade_shard_version, shard_id};
use crate::durable::{DurableCatalogError, TestCatalogStateBuilder, test_bootstrap_args};

/// Test that the catalog forces users to upgrade one version at a time.
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
async fn test_upgrade_shard() {
let first_version = semver::Version::parse("0.10.0").expect("failed to parse version");
let second_version = semver::Version::parse("0.11.0").expect("failed to parse version");
let first_version = semver::Version::parse("0.147.0").expect("failed to parse version");
let second_version = semver::Version::parse("26.0.0").expect("failed to parse version");
let second_dev_version =
semver::Version::parse("0.11.0-dev.0").expect("failed to parse version");
let third_version = semver::Version::parse("0.12.0").expect("failed to parse version");
semver::Version::parse("26.0.0-dev.0").expect("failed to parse version");
let third_version = semver::Version::parse("27.1.0").expect("failed to parse version");
let organization_id = Uuid::new_v4();
let deploy_generation = 0;
let mut persist_cache = PersistClientCache::new_no_metrics();
Expand Down Expand Up @@ -156,12 +154,12 @@ async fn test_upgrade_shard() {
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
async fn test_version_regression() {
let first_version = semver::Version::parse("0.10.0").expect("failed to parse version");
let second_version = semver::Version::parse("0.11.0").expect("failed to parse version");
let first_version = semver::Version::parse("0.147.0").expect("failed to parse version");
let second_version = semver::Version::parse("26.1.0").expect("failed to parse version");
let organization_id = Uuid::new_v4();
let deploy_generation = 0;
let mut persist_cache = PersistClientCache::new_no_metrics();
let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);

persist_cache.cfg.build_version = first_version.clone();
let persist_client = persist_cache
Expand All @@ -171,7 +169,7 @@ async fn test_version_regression() {

assert_eq!(
None,
fetch_catalog_upgrade_shard_version(&persist_client, catalog_shard_id).await
fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await
);

let persist_openable_state = TestCatalogStateBuilder::new(persist_client.clone())
Expand All @@ -188,7 +186,7 @@ async fn test_version_regression() {

assert_eq!(
Some(first_version.clone()),
fetch_catalog_upgrade_shard_version(&persist_client, catalog_shard_id).await
fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await
);

persist_cache.cfg.build_version = second_version.clone();
Expand All @@ -210,30 +208,6 @@ async fn test_version_regression() {

assert_eq!(
Some(second_version.clone()),
fetch_catalog_upgrade_shard_version(&persist_client, catalog_shard_id).await
);

persist_cache.cfg.build_version = first_version.clone();
let persist_client = persist_cache
.open(PersistLocation::new_in_mem())
.await
.expect("in-mem location is valid");
let err = TestCatalogStateBuilder::new(persist_client.clone())
.with_organization_id(organization_id)
.with_deploy_generation(deploy_generation)
.with_version(first_version.clone())
.build()
.await
.expect_err("skipping versions should error");
assert!(
matches!(
&err,
DurableCatalogError::IncompatiblePersistVersion {
found_version,
catalog_version
}
if found_version == &second_version && catalog_version == &first_version
),
"Unexpected error: {err:?}"
fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await
);
}
19 changes: 14 additions & 5 deletions src/catalog/src/durable/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3406,10 +3406,12 @@ where
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use mz_ore::{assert_none, assert_ok};

use mz_ore::now::SYSTEM_TIME;
use mz_persist_client::PersistClient;
use mz_ore::{assert_none, assert_ok};
use mz_persist_client::cache::PersistClientCache;
use mz_persist_types::PersistLocation;
use semver::Version;

use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
use crate::memory;
Expand Down Expand Up @@ -3942,9 +3944,16 @@ mod tests {
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
async fn test_savepoint() {
let persist_client = PersistClient::new_for_tests().await;
let state_builder =
TestCatalogStateBuilder::new(persist_client).with_default_deploy_generation();
const VERSION: Version = Version::new(26, 0, 0);
let mut persist_cache = PersistClientCache::new_no_metrics();
persist_cache.cfg.build_version = VERSION;
let persist_client = persist_cache
.open(PersistLocation::new_in_mem())
.await
.unwrap();
let state_builder = TestCatalogStateBuilder::new(persist_client)
.with_default_deploy_generation()
.with_version(VERSION);

// Initialize catalog.
let _ = state_builder
Expand Down
25 changes: 15 additions & 10 deletions src/catalog/tests/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,12 +571,14 @@ async fn test_open(state_builder: TestCatalogStateBuilder) {
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
async fn test_persist_unopened_deploy_generation_fencing() {
let persist_client = PersistClient::new_for_tests().await;
let mut persist_cache = PersistClientCache::new_no_metrics();
persist_cache.cfg.build_version = semver::Version::new(0, 1, 0);
let persist_client = persist_cache
.open(PersistLocation::new_in_mem())
.await
.unwrap();
let state_builder = TestCatalogStateBuilder::new(persist_client);
test_unopened_deploy_generation_fencing(state_builder).await;
}

async fn test_unopened_deploy_generation_fencing(state_builder: TestCatalogStateBuilder) {
// Initialize catalog.
let deploy_generation = 0;
let version = semver::Version::new(0, 1, 0);
Expand Down Expand Up @@ -667,10 +669,7 @@ async fn test_unopened_deploy_generation_fencing(state_builder: TestCatalogState
.await
.unwrap_err();
assert!(
matches!(
err,
DurableCatalogError::Fence(FenceError::DeployGeneration { .. })
),
matches!(err, DurableCatalogError::IncompatiblePersistVersion { .. }),
"unexpected err: {err:?}"
);
}
Expand Down Expand Up @@ -906,8 +905,14 @@ async fn test_persist_version_fencing() {
}

testcase("0.10.0", "0.10.0", Ok(())).await;
testcase("0.10.0", "0.11.0", Ok(())).await;
testcase("0.10.0", "0.12.0", Err(())).await;
testcase("0.147.0", "0.148.1", Ok(())).await;
testcase("0.10.0", "0.148.1", Err(())).await;
testcase("0.147.0", "0.158.0", Ok(())).await;
testcase("0.147.0", "26.0.0", Ok(())).await;
testcase("0.160.0", "26.0.0", Ok(())).await;
testcase("26.0.0", "26.10.0", Ok(())).await;
testcase("26.1.0", "27.0.0", Ok(())).await;
testcase("0.147.0", "27.0.0", Err(())).await;
}

#[mz_ore::test(tokio::test)]
Expand Down
1 change: 0 additions & 1 deletion src/catalog/tests/snapshots/open__initial_snapshot.snap
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
---
source: src/catalog/tests/open.rs
assertion_line: 510
expression: test_snapshot
---
Snapshot {
Expand Down
Loading