Skip to content

Commit 4e80a93

Browse files
authored
Feat!: Decouple forward-only from change categorization (#5110)
1 parent a867d0b commit 4e80a93

File tree

15 files changed

+344
-275
lines changed

15 files changed

+344
-275
lines changed

docs/concepts/plans.md

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,23 +43,24 @@ This is a common choice in scenarios such as an addition of a new column, an act
4343

4444
If any downstream models contain a `select *` from the model, SQLMesh attempts to infer breaking status on a best-effort basis. We recommend explicitly specifying a query's columns to avoid unnecessary recomputation.
4545

46-
### Forward-only change
47-
A modified (either directly or indirectly) model that is categorized as forward-only will continue to use the existing physical table once the change is deployed to production (the `prod` environment). This means that no backfill will take place.
48-
49-
While iterating on forward-only changes in the development environment, the model's output will be stored in either a temporary table or a shallow clone of the production table if supported by the engine.
50-
51-
In either case the data produced this way in the development environment can only be used for preview and will **not** be reused once the change is deployed to production. See [Forward-only Plans](#forward-only-plans) for more details.
52-
53-
This category is assigned by SQLMesh automatically either when a user opts into using a [forward-only plan](#forward-only-plans) or when a model is explicitly configured to be forward-only.
54-
5546
### Summary
5647

5748
| Change Category | Change Type | Behaviour |
5849
|--------------------------------------|--------------------------------------------------------------------------------------------|----------------------------------------------------|
5950
| [Breaking](#breaking-change) | [Direct](glossary.md#direct-modification) or [Indirect](glossary.md#indirect-modification) | [Backfill](glossary.md#backfill) |
6051
| [Non-breaking](#non-breaking-change) | [Direct](glossary.md#direct-modification) | [Backfill](glossary.md#backfill) |
6152
| [Non-breaking](#non-breaking-change) | [Indirect](glossary.md#indirect-modification) | [No Backfill](glossary.md#backfill) |
62-
| [Forward-only](#forward-only-change) | [Direct](glossary.md#direct-modification) or [Indirect](glossary.md#indirect-modification) | [No Backfill](glossary.md#backfill), schema change |
53+
54+
## Forward-only change
55+
In addition to categorizing a change as breaking or non-breaking, it can also be classified as forward-only.
56+
57+
A model change classified as forward-only will continue to use the existing physical table once the change is deployed to production (the `prod` environment). This means that no backfill will take place.
58+
59+
While iterating on forward-only changes in the development environment, the model's output will be stored in either a temporary table or a shallow clone of the production table if supported by the engine.
60+
61+
In either case the data produced this way in the development environment can only be used for preview and will **not** be reused once the change is deployed to production. See [Forward-only Plans](#forward-only-plans) for more details.
62+
63+
This category is assigned by SQLMesh automatically either when a user opts into using a [forward-only plan](#forward-only-plans) or when a model is explicitly configured to be forward-only.
6364

6465
## Plan application
6566
Once a plan has been created and reviewed, it is then applied to the target [environment](environments.md) in order for its changes to take effect.

sqlmesh/core/plan/builder.py

Lines changed: 49 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,6 @@ def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> Plan
239239
snapshot: The target snapshot.
240240
choice: The user decision on how to version the target snapshot and its children.
241241
"""
242-
if self._forward_only:
243-
raise PlanError("Choice setting is not supported by a forward-only plan.")
244242
if not self._is_new_snapshot(snapshot):
245243
raise PlanError(
246244
f"A choice can't be changed for the existing version of {snapshot.name}."
@@ -250,8 +248,6 @@ def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> Plan
250248
and snapshot.snapshot_id not in self._context_diff.added
251249
):
252250
raise PlanError(f"Only directly modified models can be categorized ({snapshot.name}).")
253-
if snapshot.is_model and snapshot.model.forward_only:
254-
raise PlanError(f"Forward-only model {snapshot.name} cannot be categorized manually.")
255251

256252
self._choices[snapshot.snapshot_id] = choice
257253
self._latest_plan = None
@@ -369,8 +365,10 @@ def _build_restatements(
369365
restate_models = {
370366
s.name
371367
for s in self._context_diff.new_snapshots.values()
372-
if s.is_materialized
373-
and (self._forward_only or s.model.forward_only)
368+
if s.is_model
369+
and not s.is_symbolic
370+
and (s.is_forward_only or s.model.forward_only)
371+
and not s.is_no_preview
374372
and (
375373
# Metadata changes should not be previewed.
376374
self._context_diff.directly_modified(s.name)
@@ -395,6 +393,9 @@ def _build_restatements(
395393
for s_id in dag:
396394
snapshot = self._context_diff.snapshots[s_id]
397395

396+
if is_preview and snapshot.is_no_preview:
397+
continue
398+
398399
# Since we are traversing the graph in topological order and the largest interval range is pushed down
399400
# the graph we just have to check our immediate parents in the graph and not the whole upstream graph.
400401
restating_parents = [
@@ -526,6 +527,9 @@ def _adjust_new_snapshot_intervals(self) -> None:
526527

527528
def _check_destructive_changes(self, directly_modified: t.Set[SnapshotId]) -> None:
528529
for s_id in sorted(directly_modified):
530+
if s_id.name not in self._context_diff.modified_snapshots:
531+
continue
532+
529533
snapshot = self._context_diff.snapshots[s_id]
530534
# should we raise/warn if this snapshot has/inherits a destructive change?
531535
should_raise_or_warn = (
@@ -583,38 +587,38 @@ def _categorize_snapshots(
583587
if not snapshot or not self._is_new_snapshot(snapshot):
584588
continue
585589

590+
forward_only = self._is_forward_only_change(s_id) or self._forward_only
591+
586592
if s_id in self._choices:
587-
snapshot.categorize_as(self._choices[s_id])
593+
snapshot.categorize_as(self._choices[s_id], forward_only)
588594
continue
589595

590596
if s_id in self._context_diff.added:
591-
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
592-
elif self._is_forward_only_change(s_id) or self._forward_only:
593-
# In case of the forward only plan any modifications result in reuse of the
594-
# previous version for non-seed models.
595-
# New snapshots of seed models are considered non-breaking ones.
596-
category = (
597-
SnapshotChangeCategory.NON_BREAKING
598-
if snapshot.is_seed
599-
else SnapshotChangeCategory.FORWARD_ONLY
600-
)
601-
# If the model kind changes mark as breaking
602-
if snapshot.is_model and snapshot.name in self._context_diff.modified_snapshots:
603-
_, old = self._context_diff.modified_snapshots[snapshot.name]
604-
if _is_breaking_kind_change(old, snapshot):
605-
category = SnapshotChangeCategory.BREAKING
606-
607-
snapshot.categorize_as(category)
597+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only)
608598
elif s_id.name in self._context_diff.modified_snapshots:
609-
self._categorize_snapshot(snapshot, dag, indirectly_modified)
599+
self._categorize_snapshot(snapshot, forward_only, dag, indirectly_modified)
610600

611601
def _categorize_snapshot(
612-
self, snapshot: Snapshot, dag: DAG[SnapshotId], indirectly_modified: SnapshotMapping
602+
self,
603+
snapshot: Snapshot,
604+
forward_only: bool,
605+
dag: DAG[SnapshotId],
606+
indirectly_modified: SnapshotMapping,
613607
) -> None:
614608
s_id = snapshot.snapshot_id
615609

616610
if self._context_diff.directly_modified(s_id.name):
611+
new, old = self._context_diff.modified_snapshots[s_id.name]
612+
is_breaking_kind_change = _is_breaking_kind_change(old, new)
613+
if is_breaking_kind_change or snapshot.is_seed:
614+
# Breaking kind changes and seed changes can't be forward-only.
615+
forward_only = False
616+
617617
if self._auto_categorization_enabled:
618+
if is_breaking_kind_change:
619+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only)
620+
return
621+
618622
s_id_with_missing_columns: t.Optional[SnapshotId] = None
619623
this_sid_with_downstream = indirectly_modified.get(s_id, set()) | {s_id}
620624
for downstream_s_id in this_sid_with_downstream:
@@ -626,18 +630,18 @@ def _categorize_snapshot(
626630
s_id_with_missing_columns = downstream_s_id
627631
break
628632

629-
new, old = self._context_diff.modified_snapshots[s_id.name]
630633
if s_id_with_missing_columns is None:
631634
change_category = categorize_change(new, old, config=self._categorizer_config)
632635
if change_category is not None:
633-
snapshot.categorize_as(change_category)
636+
snapshot.categorize_as(change_category, forward_only)
634637
else:
635638
mode = self._categorizer_config.dict().get(
636639
new.model.source_type, AutoCategorizationMode.OFF
637640
)
638641
if mode == AutoCategorizationMode.FULL:
639-
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
642+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only)
640643
elif self._context_diff.indirectly_modified(snapshot.name):
644+
all_upstream_forward_only = set()
641645
all_upstream_categories = set()
642646
direct_parent_categories = set()
643647

@@ -646,27 +650,30 @@ def _categorize_snapshot(
646650

647651
if parent and self._is_new_snapshot(parent):
648652
all_upstream_categories.add(parent.change_category)
653+
all_upstream_forward_only.add(parent.is_forward_only)
649654
if p_id in snapshot.parents:
650655
direct_parent_categories.add(parent.change_category)
651656

652-
if snapshot.is_model and snapshot.model.forward_only:
653-
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
654-
elif direct_parent_categories.intersection(
657+
if all_upstream_forward_only == {True} or (
658+
snapshot.is_model and snapshot.model.forward_only
659+
):
660+
forward_only = True
661+
662+
if direct_parent_categories.intersection(
655663
{SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.INDIRECT_BREAKING}
656664
):
657-
snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING)
665+
snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING, forward_only)
658666
elif not direct_parent_categories:
659-
snapshot.categorize_as(self._get_orphaned_indirect_change_category(snapshot))
660-
elif SnapshotChangeCategory.FORWARD_ONLY in all_upstream_categories:
661-
# FORWARD_ONLY must take precedence over INDIRECT_NON_BREAKING
662-
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
667+
snapshot.categorize_as(
668+
self._get_orphaned_indirect_change_category(snapshot), forward_only
669+
)
663670
elif all_upstream_categories == {SnapshotChangeCategory.METADATA}:
664-
snapshot.categorize_as(SnapshotChangeCategory.METADATA)
671+
snapshot.categorize_as(SnapshotChangeCategory.METADATA, forward_only)
665672
else:
666-
snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING)
673+
snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING, forward_only)
667674
else:
668675
# Metadata updated.
669-
snapshot.categorize_as(SnapshotChangeCategory.METADATA)
676+
snapshot.categorize_as(SnapshotChangeCategory.METADATA, forward_only)
670677

671678
def _get_orphaned_indirect_change_category(
672679
self, indirect_snapshot: Snapshot
@@ -769,10 +776,7 @@ def _is_forward_only_change(self, s_id: SnapshotId) -> bool:
769776
if snapshot.is_model and _is_breaking_kind_change(old, snapshot):
770777
return False
771778
return (
772-
snapshot.is_model
773-
and snapshot.model.forward_only
774-
and not snapshot.change_category
775-
and bool(snapshot.previous_versions)
779+
snapshot.is_model and snapshot.model.forward_only and bool(snapshot.previous_versions)
776780
)
777781

778782
def _is_new_snapshot(self, snapshot: Snapshot) -> bool:
@@ -811,7 +815,7 @@ def _ensure_no_forward_only_revert(self) -> None:
811815
and not candidate.model.forward_only
812816
and promoted.is_forward_only
813817
and not promoted.is_paused
814-
and not candidate.reuses_previous_version
818+
and not candidate.is_no_rebuild
815819
and promoted.version == candidate.version
816820
):
817821
raise PlanError(

sqlmesh/core/snapshot/definition.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ class SnapshotChangeCategory(IntEnum):
7676

7777
BREAKING = 1
7878
NON_BREAKING = 2
79+
# FORWARD_ONLY category is deprecated and is kept for backwards compatibility.
7980
FORWARD_ONLY = 3
8081
INDIRECT_BREAKING = 4
8182
INDIRECT_NON_BREAKING = 5
@@ -336,6 +337,7 @@ class SnapshotInfoMixin(ModelKindMixin):
336337
base_table_name_override: t.Optional[str]
337338
dev_table_suffix: str
338339
table_naming_convention: TableNamingConvention = Field(default=TableNamingConvention.default)
340+
forward_only: bool
339341

340342
@cached_property
341343
def identifier(self) -> str:
@@ -383,7 +385,7 @@ def fully_qualified_table(self) -> t.Optional[exp.Table]:
383385

384386
@property
385387
def is_forward_only(self) -> bool:
386-
return self.change_category == SnapshotChangeCategory.FORWARD_ONLY
388+
return self.forward_only or self.change_category == SnapshotChangeCategory.FORWARD_ONLY
387389

388390
@property
389391
def is_metadata(self) -> bool:
@@ -394,9 +396,18 @@ def is_indirect_non_breaking(self) -> bool:
394396
return self.change_category == SnapshotChangeCategory.INDIRECT_NON_BREAKING
395397

396398
@property
397-
def reuses_previous_version(self) -> bool:
398-
return self.change_category in (
399-
SnapshotChangeCategory.FORWARD_ONLY,
399+
def is_no_rebuild(self) -> bool:
400+
"""Returns true if this snapshot doesn't require a rebuild in production."""
401+
return self.forward_only or self.change_category in (
402+
SnapshotChangeCategory.FORWARD_ONLY, # Backwards compatibility
403+
SnapshotChangeCategory.METADATA,
404+
SnapshotChangeCategory.INDIRECT_NON_BREAKING,
405+
)
406+
407+
@property
408+
def is_no_preview(self) -> bool:
409+
"""Returns true if this snapshot doesn't require a preview in development."""
410+
return self.forward_only and self.change_category in (
400411
SnapshotChangeCategory.METADATA,
401412
SnapshotChangeCategory.INDIRECT_NON_BREAKING,
402413
)
@@ -487,6 +498,7 @@ class SnapshotTableInfo(PydanticModel, SnapshotInfoMixin, frozen=True):
487498
custom_materialization: t.Optional[str] = None
488499
dev_table_suffix: str
489500
model_gateway: t.Optional[str] = None
501+
forward_only: bool = False
490502

491503
def __lt__(self, other: SnapshotTableInfo) -> bool:
492504
return self.name < other.name
@@ -614,6 +626,7 @@ class Snapshot(PydanticModel, SnapshotInfoMixin):
614626
table_naming_convention_: TableNamingConvention = Field(
615627
default=TableNamingConvention.default, alias="table_naming_convention"
616628
)
629+
forward_only: bool = False
617630

618631
@field_validator("ttl")
619632
@classmethod
@@ -1006,22 +1019,26 @@ def check_ready_intervals(
10061019
)
10071020
return intervals
10081021

1009-
def categorize_as(self, category: SnapshotChangeCategory) -> None:
1022+
def categorize_as(self, category: SnapshotChangeCategory, forward_only: bool = False) -> None:
10101023
"""Assigns the given category to this snapshot.
10111024
10121025
Args:
10131026
category: The change category to assign to this snapshot.
1027+
forward_only: Whether or not this snapshot is applied going forward in production.
10141028
"""
1029+
assert category != SnapshotChangeCategory.FORWARD_ONLY, (
1030+
"FORWARD_ONLY change category is deprecated"
1031+
)
1032+
10151033
self.dev_version_ = self.fingerprint.to_version()
1016-
reuse_previous_version = category in (
1017-
SnapshotChangeCategory.FORWARD_ONLY,
1034+
is_no_rebuild = forward_only or category in (
10181035
SnapshotChangeCategory.INDIRECT_NON_BREAKING,
10191036
SnapshotChangeCategory.METADATA,
10201037
)
10211038
if self.is_model and self.model.physical_version:
10221039
# If the model has a pinned version then use that.
10231040
self.version = self.model.physical_version
1024-
elif reuse_previous_version and self.previous_version:
1041+
elif is_no_rebuild and self.previous_version:
10251042
previous_version = self.previous_version
10261043
self.version = previous_version.data_version.version
10271044
self.physical_schema_ = previous_version.physical_schema
@@ -1040,6 +1057,7 @@ def categorize_as(self, category: SnapshotChangeCategory) -> None:
10401057
self.version = self.fingerprint.to_version()
10411058

10421059
self.change_category = category
1060+
self.forward_only = forward_only
10431061

10441062
@property
10451063
def categorized(self) -> bool:
@@ -1220,6 +1238,7 @@ def table_info(self) -> SnapshotTableInfo:
12201238
dev_table_suffix=self.dev_table_suffix,
12211239
model_gateway=self.model_gateway,
12221240
table_naming_convention=self.table_naming_convention, # type: ignore
1241+
forward_only=self.forward_only,
12231242
)
12241243

12251244
@property

sqlmesh/core/snapshot/evaluator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ def create(
338338
continue
339339
deployability_flags = [True]
340340
if (
341-
snapshot.reuses_previous_version
341+
snapshot.is_no_rebuild
342342
or snapshot.is_managed
343343
or (snapshot.is_model and snapshot.model.forward_only)
344344
or (deployability_index and not deployability_index.is_deployable(snapshot))

sqlmesh/core/state_sync/db/snapshot.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -727,14 +727,15 @@ class SharedVersionSnapshot(PydanticModel):
727727
disable_restatement: bool
728728
effective_from: t.Optional[TimeLike]
729729
raw_snapshot: t.Dict[str, t.Any]
730+
forward_only: bool
730731

731732
@property
732733
def snapshot_id(self) -> SnapshotId:
733734
return SnapshotId(name=self.name, identifier=self.identifier)
734735

735736
@property
736737
def is_forward_only(self) -> bool:
737-
return self.change_category == SnapshotChangeCategory.FORWARD_ONLY
738+
return self.forward_only or self.change_category == SnapshotChangeCategory.FORWARD_ONLY
738739

739740
@property
740741
def normalized_effective_from_ts(self) -> t.Optional[int]:
@@ -797,4 +798,5 @@ def from_snapshot_record(
797798
disable_restatement=raw_node.get("kind", {}).get("disable_restatement", False),
798799
effective_from=raw_snapshot.get("effective_from"),
799800
raw_snapshot=raw_snapshot,
801+
forward_only=raw_snapshot.get("forward_only", False),
800802
)

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ def _make_function(
438438
metadata_hash="test_metadata_hash",
439439
),
440440
version="test_version",
441-
change_category=SnapshotChangeCategory.FORWARD_ONLY,
441+
change_category=SnapshotChangeCategory.NON_BREAKING,
442442
dev_table_suffix="dev",
443443
),
444444
)

0 commit comments

Comments
 (0)