Skip to content

Commit be21191

Browse files
committed
make something work!
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 1a61690 commit be21191

File tree

41 files changed

+2144
-87
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2144
-87
lines changed

src/adapter/src/catalog.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,8 @@ impl Catalog {
396396
| CatalogItemType::Func
397397
| CatalogItemType::Secret
398398
| CatalogItemType::Connection
399-
| CatalogItemType::ContinualTask => {
399+
| CatalogItemType::ContinualTask
400+
| CatalogItemType::ReplacementMaterializedView => {
400401
dependencies.extend(global_ids);
401402
}
402403
CatalogItemType::View => {
@@ -1551,6 +1552,7 @@ pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType
15511552
CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
15521553
CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
15531554
CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1555+
CommentObjectId::ReplacementMaterializedView(_) => ObjectType::ReplacementMaterializedView,
15541556
}
15551557
}
15561558

@@ -1582,6 +1584,9 @@ pub(crate) fn system_object_type_to_audit_object_type(
15821584
mz_sql::catalog::ObjectType::Func => ObjectType::Func,
15831585
mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
15841586
mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1587+
mz_sql::catalog::ObjectType::ReplacementMaterializedView => {
1588+
ObjectType::ReplacementMaterializedView
1589+
}
15851590
},
15861591
SystemObjectType::System => ObjectType::System,
15871592
}

src/adapter/src/catalog/apply.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1925,7 +1925,8 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
19251925
| CatalogItemType::Type
19261926
| CatalogItemType::Func
19271927
| CatalogItemType::Secret
1928-
| CatalogItemType::Connection => push_update(
1928+
| CatalogItemType::Connection
1929+
| CatalogItemType::ReplacementMaterializedView => push_update(
19291930
StateUpdate {
19301931
kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
19311932
ts,
@@ -2046,7 +2047,8 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
20462047
CatalogItemType::Table => tables.push(update),
20472048
CatalogItemType::View
20482049
| CatalogItemType::MaterializedView
2049-
| CatalogItemType::Index => derived_items.push(update),
2050+
| CatalogItemType::Index
2051+
| CatalogItemType::ReplacementMaterializedView => derived_items.push(update),
20502052
CatalogItemType::Sink => sinks.push(update),
20512053
CatalogItemType::ContinualTask => continual_tasks.push(update),
20522054
}
@@ -2116,7 +2118,8 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
21162118
CatalogItemType::Table => tables.push(update),
21172119
CatalogItemType::View
21182120
| CatalogItemType::MaterializedView
2119-
| CatalogItemType::Index => derived_items.push(update),
2121+
| CatalogItemType::Index
2122+
| CatalogItemType::ReplacementMaterializedView => derived_items.push(update),
21202123
CatalogItemType::Sink => sinks.push(update),
21212124
CatalogItemType::ContinualTask => continual_tasks.push(update),
21222125
}

src/adapter/src/catalog/builtin_table_updates.rs

Lines changed: 124 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,18 @@ use mz_catalog::builtin::{
2626
MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
2727
MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OPERATORS,
2828
MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES,
29-
MZ_ROLE_AUTH, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SCHEMAS, MZ_SECRETS,
30-
MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES,
31-
MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES,
32-
MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
29+
MZ_REPLACEMENT_MATERIALIZED_VIEWS, MZ_ROLE_AUTH, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES,
30+
MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES,
31+
MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
32+
MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS,
33+
MZ_WEBHOOKS_SOURCES,
3334
};
3435
use mz_catalog::config::AwsPrincipalContext;
3536
use mz_catalog::durable::SourceReferences;
3637
use mz_catalog::memory::error::{Error, ErrorKind};
3738
use mz_catalog::memory::objects::{
3839
CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func, Index,
39-
MaterializedView, Sink, Table, TableDataSource, Type, View,
40+
MaterializedView, ReplacementMaterializedView, Sink, Table, TableDataSource, Type, View,
4041
};
4142
use mz_controller::clusters::{
4243
ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
@@ -817,6 +818,10 @@ impl CatalogState {
817818
CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
818819
id, oid, schema_id, name, owner_id, privileges, ct, diff,
819820
),
821+
CatalogItem::ReplacementMaterializedView(mview) => self
822+
.pack_replacement_materialized_view_update(
823+
id, oid, schema_id, name, owner_id, privileges, mview, diff,
824+
),
820825
};
821826

822827
if !entry.item().is_temporary() {
@@ -1523,6 +1528,118 @@ impl CatalogState {
15231528
updates
15241529
}
15251530

1531+
fn pack_replacement_materialized_view_update(
1532+
&self,
1533+
id: CatalogItemId,
1534+
oid: u32,
1535+
schema_id: &SchemaSpecifier,
1536+
name: &str,
1537+
owner_id: &RoleId,
1538+
privileges: Datum,
1539+
mview: &ReplacementMaterializedView,
1540+
diff: Diff,
1541+
) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1542+
let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1543+
.unwrap_or_else(|e| {
1544+
panic!(
1545+
"create_sql cannot be invalid: `{}` --- error: `{}`",
1546+
mview.create_sql, e
1547+
)
1548+
})
1549+
.into_element()
1550+
.ast;
1551+
let query_string = match &create_stmt {
1552+
Statement::CreateReplacementMaterializedView(stmt) => {
1553+
let mut query_string = stmt.query.to_ast_string_stable();
1554+
// PostgreSQL appends a semicolon in `pg_matviews.definition`, we
1555+
// do the same for compatibility's sake.
1556+
query_string.push(';');
1557+
query_string
1558+
}
1559+
_ => unreachable!(),
1560+
};
1561+
1562+
let mut updates = Vec::new();
1563+
1564+
updates.push(BuiltinTableUpdate::row(
1565+
&*MZ_REPLACEMENT_MATERIALIZED_VIEWS,
1566+
Row::pack_slice(&[
1567+
Datum::String(&id.to_string()),
1568+
Datum::UInt32(oid),
1569+
Datum::String(&mview.replaces.to_string()),
1570+
Datum::String(&schema_id.to_string()),
1571+
Datum::String(name),
1572+
Datum::String(&mview.cluster_id.to_string()),
1573+
Datum::String(&query_string),
1574+
Datum::String(&owner_id.to_string()),
1575+
privileges,
1576+
Datum::String(&mview.create_sql),
1577+
Datum::String(&create_stmt.to_ast_string_redacted()),
1578+
]),
1579+
diff,
1580+
));
1581+
1582+
if let Some(refresh_schedule) = &mview.refresh_schedule {
1583+
// This can't be `ON COMMIT`, because that is represented by a `None` instead of an
1584+
// empty `RefreshSchedule`.
1585+
assert!(!refresh_schedule.is_empty());
1586+
for RefreshEvery {
1587+
interval,
1588+
aligned_to,
1589+
} in refresh_schedule.everies.iter()
1590+
{
1591+
let aligned_to_dt = mz_ore::now::to_datetime(
1592+
<&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1593+
);
1594+
updates.push(BuiltinTableUpdate::row(
1595+
&*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1596+
Row::pack_slice(&[
1597+
Datum::String(&id.to_string()),
1598+
Datum::String("every"),
1599+
Datum::Interval(
1600+
Interval::from_duration(interval).expect(
1601+
"planning ensured that this is convertible back to Interval",
1602+
),
1603+
),
1604+
Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1605+
Datum::Null,
1606+
]),
1607+
diff,
1608+
));
1609+
}
1610+
for at in refresh_schedule.ats.iter() {
1611+
let at_dt = mz_ore::now::to_datetime(
1612+
<&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1613+
);
1614+
updates.push(BuiltinTableUpdate::row(
1615+
&*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1616+
Row::pack_slice(&[
1617+
Datum::String(&id.to_string()),
1618+
Datum::String("at"),
1619+
Datum::Null,
1620+
Datum::Null,
1621+
Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1622+
]),
1623+
diff,
1624+
));
1625+
}
1626+
} else {
1627+
updates.push(BuiltinTableUpdate::row(
1628+
&*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1629+
Row::pack_slice(&[
1630+
Datum::String(&id.to_string()),
1631+
Datum::String("on-commit"),
1632+
Datum::Null,
1633+
Datum::Null,
1634+
Datum::Null,
1635+
]),
1636+
diff,
1637+
));
1638+
}
1639+
1640+
updates
1641+
}
1642+
15261643
fn pack_continual_task_update(
15271644
&self,
15281645
id: CatalogItemId,
@@ -2266,7 +2383,8 @@ impl CatalogState {
22662383
| CommentObjectId::Connection(global_id)
22672384
| CommentObjectId::Secret(global_id)
22682385
| CommentObjectId::Type(global_id)
2269-
| CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2386+
| CommentObjectId::ContinualTask(global_id)
2387+
| CommentObjectId::ReplacementMaterializedView(global_id) => global_id.to_string(),
22702388
CommentObjectId::Role(role_id) => role_id.to_string(),
22712389
CommentObjectId::Database(database_id) => database_id.to_string(),
22722390
CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),

src/adapter/src/catalog/consistency.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,8 @@ impl CatalogState {
274274
| CommentObjectId::Connection(item_id)
275275
| CommentObjectId::Type(item_id)
276276
| CommentObjectId::Secret(item_id)
277-
| CommentObjectId::ContinualTask(item_id) => {
277+
| CommentObjectId::ContinualTask(item_id)
278+
| CommentObjectId::ReplacementMaterializedView(item_id) => {
278279
let entry = self.entry_by_id.get(&item_id);
279280
match entry {
280281
None => comment_inconsistencies

src/adapter/src/catalog/open.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -916,7 +916,8 @@ fn add_new_remove_old_builtin_items_migration(
916916
| CatalogItemType::Func
917917
| CatalogItemType::Secret
918918
| CatalogItemType::Connection
919-
| CatalogItemType::ContinualTask => continue,
919+
| CatalogItemType::ContinualTask
920+
| CatalogItemType::ReplacementMaterializedView => continue,
920921
};
921922
deleted_comments.insert(comment_id);
922923
}

src/adapter/src/catalog/open/builtin_item_migration.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ pub(crate) async fn migrate_builtin_items(
9494
Source(source) => Some(source.global_id()),
9595
MaterializedView(mv) => Some(mv.global_id()),
9696
ContinualTask(ct) => Some(ct.global_id()),
97+
// TODO(alter-mv): Do we need to migrate replacement materialized views?
98+
ReplacementMaterializedView(mv) => Some(mv.global_id()),
9799
Log(_) | Sink(_) | View(_) | Index(_) | Type(_) | Func(_) | Secret(_)
98100
| Connection(_) => None,
99101
}

0 commit comments

Comments
 (0)