Skip to content

Commit fa52373

Browse files
authored
feat: introduce dagster sqlmesh cache (#48)
* chore: update lock file * feat: sqlmesh cache for large projects This introduces a cache when translating sqlmesh context to assets * fix: attempt to decouple caching further * fix: remove oso specific caching utilities * fix: fix sample
1 parent 010cd90 commit fa52373

File tree

8 files changed

+291
-60
lines changed

8 files changed

+291
-60
lines changed

dagster_sqlmesh/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,6 @@
22

33
from .asset import *
44
from .config import *
5+
from .controller import *
56
from .resource import *
7+
from .translator import *

dagster_sqlmesh/asset.py

Lines changed: 64 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,61 @@
1111
DagsterSQLMeshController,
1212
)
1313
from dagster_sqlmesh.translator import SQLMeshDagsterTranslator
14+
from dagster_sqlmesh.types import SQLMeshMultiAssetOptions
1415

1516
logger = logging.getLogger(__name__)
1617

18+
def sqlmesh_to_multi_asset_options(
19+
*,
20+
environment: str,
21+
config: SQLMeshContextConfig,
22+
context_factory: ContextFactory[ContextCls] = lambda **kwargs: Context(**kwargs),
23+
dagster_sqlmesh_translator: SQLMeshDagsterTranslator | None = None,
24+
) -> SQLMeshMultiAssetOptions:
25+
"""Converts sqlmesh project into a SQLMeshMultiAssetOptions object which is
26+
an intermediate representation of the SQLMesh project that can be used to
27+
create a dagster multi_asset definition."""
28+
controller = DagsterSQLMeshController.setup_with_config(
29+
config=config, context_factory=context_factory
30+
)
31+
if not dagster_sqlmesh_translator:
32+
dagster_sqlmesh_translator = SQLMeshDagsterTranslator()
33+
34+
conversion = controller.to_asset_outs(
35+
environment,
36+
translator=dagster_sqlmesh_translator,
37+
)
38+
return conversion
39+
40+
def sqlmesh_asset_from_multi_asset_options(
41+
*,
42+
sqlmesh_multi_asset_options: SQLMeshMultiAssetOptions,
43+
name: str | None = None,
44+
compute_kind: str = "sqlmesh",
45+
op_tags: t.Mapping[str, t.Any] | None = None,
46+
required_resource_keys: set[str] | None = None,
47+
retry_policy: RetryPolicy | None = None,
48+
enabled_subsetting: bool = False,
49+
) -> t.Callable[[t.Callable[..., t.Any]], AssetsDefinition]:
50+
"""Creates a dagster multi_asset definition from a SQLMeshMultiAssetOptions object."""
51+
kwargs: dict[str, t.Any] = {}
52+
if enabled_subsetting:
53+
kwargs["can_subset"] = True
54+
55+
#asset_deps = sqlmesh_multi_asset_options.to_asset_deps()
56+
#print("Asset deps boop:", asset_deps) # Debugging line
57+
58+
return multi_asset(
59+
outs=sqlmesh_multi_asset_options.to_asset_outs(),
60+
deps=sqlmesh_multi_asset_options.to_asset_deps(),
61+
internal_asset_deps=sqlmesh_multi_asset_options.to_internal_asset_deps(),
62+
name=name,
63+
compute_kind=compute_kind,
64+
op_tags=op_tags,
65+
required_resource_keys=required_resource_keys,
66+
retry_policy=retry_policy,
67+
**kwargs,
68+
)
1769

1870
# Define a SQLMesh Asset
1971
def sqlmesh_assets(
@@ -30,19 +82,19 @@ def sqlmesh_assets(
3082
# For now we don't set this by default
3183
enabled_subsetting: bool = False,
3284
) -> t.Callable[[t.Callable[..., t.Any]], AssetsDefinition]:
33-
controller = DagsterSQLMeshController.setup_with_config(config=config, context_factory=context_factory)
34-
if not dagster_sqlmesh_translator:
35-
dagster_sqlmesh_translator = SQLMeshDagsterTranslator()
36-
conversion = controller.to_asset_outs(environment, translator=dagster_sqlmesh_translator)
37-
38-
return multi_asset(
85+
conversion = sqlmesh_to_multi_asset_options(
86+
environment=environment,
87+
config=config,
88+
context_factory=context_factory,
89+
dagster_sqlmesh_translator=dagster_sqlmesh_translator,
90+
)
91+
92+
return sqlmesh_asset_from_multi_asset_options(
93+
sqlmesh_multi_asset_options=conversion,
3994
name=name,
40-
outs=conversion.outs,
41-
deps=conversion.deps,
42-
internal_asset_deps=conversion.internal_asset_deps,
43-
op_tags=op_tags,
4495
compute_kind=compute_kind,
45-
retry_policy=retry_policy,
46-
can_subset=enabled_subsetting,
96+
op_tags=op_tags,
4797
required_resource_keys=required_resource_keys,
98+
retry_policy=retry_policy,
99+
enabled_subsetting=enabled_subsetting,
48100
)

dagster_sqlmesh/config.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
from dataclasses import dataclass
2+
from pathlib import Path
23
from typing import Any
34

45
from dagster import Config
56
from pydantic import Field
67
from sqlmesh.core.config import Config as MeshConfig
8+
from sqlmesh.core.config.loader import load_configs
79

810

911
@dataclass
@@ -27,7 +29,11 @@ class SQLMeshContextConfig(Config):
2729
config_override: dict[str, Any] | None = Field(default_factory=lambda: None)
2830

2931
@property
30-
def sqlmesh_config(self) -> MeshConfig | None:
32+
def sqlmesh_config(self) -> MeshConfig:
3133
if self.config_override:
3234
return MeshConfig.parse_obj(self.config_override)
33-
return None
35+
sqlmesh_path = Path(self.path)
36+
configs = load_configs(None, MeshConfig, [sqlmesh_path])
37+
if sqlmesh_path not in configs:
38+
raise ValueError(f"SQLMesh configuration not found at {sqlmesh_path}")
39+
return configs[sqlmesh_path]
Lines changed: 53 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
# pyright: reportPrivateImportUsage=false
22
import logging
3-
from inspect import signature
43

5-
from dagster import AssetDep, AssetKey, AssetOut
6-
from dagster._core.definitions.asset_dep import CoercibleToAssetDep
7-
8-
from dagster_sqlmesh.controller.base import ContextCls, SQLMeshController
4+
from dagster_sqlmesh.controller.base import (
5+
ContextCls,
6+
SQLMeshController,
7+
)
98
from dagster_sqlmesh.translator import SQLMeshDagsterTranslator
10-
from dagster_sqlmesh.types import SQLMeshModelDep, SQLMeshMultiAssetOptions
9+
from dagster_sqlmesh.types import (
10+
ConvertibleToAssetDep,
11+
ConvertibleToAssetOut,
12+
SQLMeshModelDep,
13+
SQLMeshMultiAssetOptions,
14+
)
1115
from dagster_sqlmesh.utils import get_asset_key_str
1216

1317
logger = logging.getLogger(__name__)
@@ -17,47 +21,65 @@ class DagsterSQLMeshController(SQLMeshController[ContextCls]):
1721
"""An extension of the sqlmesh controller specifically for dagster use"""
1822

1923
def to_asset_outs(
20-
self, environment: str, translator: SQLMeshDagsterTranslator,
24+
self,
25+
environment: str,
26+
translator: SQLMeshDagsterTranslator,
2127
) -> SQLMeshMultiAssetOptions:
28+
"""Loads all the asset outs of the current sqlmesh environment. If a
29+
cache is provided, it will be tried first to load the asset outs."""
30+
31+
internal_asset_deps_map: dict[str, set[str]] = {}
32+
deps_map: dict[str, ConvertibleToAssetDep] = {}
33+
asset_outs: dict[str, ConvertibleToAssetOut] = {}
34+
2235
with self.instance(environment, "to_asset_outs") as instance:
2336
context = instance.context
24-
output = SQLMeshMultiAssetOptions()
25-
depsMap: dict[str, CoercibleToAssetDep] = {}
2637

2738
for model, deps in instance.non_external_models_dag():
2839
asset_key = translator.get_asset_key(context=context, fqn=model.fqn)
40+
asset_key_str = asset_key.to_user_string()
2941
model_deps = [
3042
SQLMeshModelDep(fqn=dep, model=context.get_model(dep))
3143
for dep in deps
3244
]
33-
internal_asset_deps: set[AssetKey] = set()
45+
internal_asset_deps: set[str] = set()
3446
asset_tags = translator.get_tags(context, model)
3547

3648
for dep in model_deps:
3749
if dep.model:
38-
internal_asset_deps.add(
39-
translator.get_asset_key(context, dep.model.fqn)
40-
)
50+
dep_asset_key_str = translator.get_asset_key(
51+
context, dep.model.fqn
52+
).to_user_string()
53+
54+
internal_asset_deps.add(dep_asset_key_str)
4155
else:
4256
table = get_asset_key_str(dep.fqn)
43-
key = translator.get_asset_key(context, dep.fqn)
57+
key = translator.get_asset_key(
58+
context, dep.fqn
59+
).to_user_string()
4460
internal_asset_deps.add(key)
61+
4562
# create an external dep
46-
depsMap[table] = AssetDep(key)
63+
deps_map[table] = translator.create_asset_dep(key=key)
64+
4765
model_key = get_asset_key_str(model.fqn)
48-
# If current Dagster supports "kinds", add labels for Dagster UI
49-
if "kinds" in signature(AssetOut).parameters:
50-
output.outs[model_key] = AssetOut(
51-
key=asset_key, tags=asset_tags, is_required=False,
52-
group_name=translator.get_group_name(context, model),
53-
kinds={"sqlmesh", translator._get_context_dialect(context).lower()}
54-
)
55-
else:
56-
output.outs[model_key] = AssetOut(
57-
key=asset_key, tags=asset_tags, is_required=False,
58-
group_name=translator.get_group_name(context, model)
59-
)
60-
output.internal_asset_deps[model_key] = internal_asset_deps
61-
62-
output.deps = list(depsMap.values())
63-
return output
66+
asset_outs[model_key] = translator.create_asset_out(
67+
model_key=model_key,
68+
asset_key=asset_key_str,
69+
tags=asset_tags,
70+
is_required=False,
71+
group_name=translator.get_group_name(context, model),
72+
kinds={
73+
"sqlmesh",
74+
translator.get_context_dialect(context).lower(),
75+
},
76+
)
77+
internal_asset_deps_map[model_key] = internal_asset_deps
78+
79+
deps = list(deps_map.values())
80+
81+
return SQLMeshMultiAssetOptions(
82+
outs=asset_outs,
83+
deps=deps,
84+
internal_asset_deps=internal_asset_deps_map,
85+
)

dagster_sqlmesh/translator.py

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,48 @@
1+
import typing as t
12
from collections.abc import Sequence
3+
from inspect import signature
24

3-
from dagster import AssetKey
5+
from dagster import AssetDep, AssetKey, AssetOut
6+
from pydantic import BaseModel, Field
47
from sqlglot import exp
58
from sqlmesh.core.context import Context
69
from sqlmesh.core.model import Model
710

11+
from .types import ConvertibleToAssetDep, ConvertibleToAssetOut
12+
13+
14+
class IntermediateAssetOut(BaseModel):
15+
model_key: str
16+
asset_key: str
17+
tags: t.Mapping[str, str] | None = None
18+
is_required: bool = True
19+
group_name: str | None = None
20+
kinds: set[str] | None = None
21+
kwargs: dict[str, t.Any] = Field(default_factory=dict)
22+
23+
def to_asset_out(self) -> AssetOut:
24+
asset_key = AssetKey.from_user_string(self.asset_key)
25+
26+
if "kinds" not in signature(AssetOut).parameters:
27+
self.kinds = None
28+
29+
return AssetOut(
30+
key=asset_key,
31+
tags=self.tags,
32+
is_required=self.is_required,
33+
group_name=self.group_name,
34+
kinds=self.kinds,
35+
**self.kwargs,
36+
)
37+
38+
39+
class IntermediateAssetDep(BaseModel):
40+
key: str
41+
kwargs: dict[str, t.Any] = Field(default_factory=dict)
42+
43+
def to_asset_dep(self) -> AssetDep:
44+
return AssetDep(AssetKey.from_user_string(self.key))
45+
846

947
class SQLMeshDagsterTranslator:
1048
"""Translates sqlmesh objects for dagster"""
@@ -19,14 +57,40 @@ def get_asset_key_name(self, fqn: str) -> Sequence[str]:
1957
asset_key_name = [table.catalog, table.db, table.name]
2058

2159
return asset_key_name
22-
60+
2361
def get_group_name(self, context: Context, model: Model) -> str:
2462
path = self.get_asset_key_name(model.fqn)
2563
return path[-2]
2664

27-
def _get_context_dialect(self, context: Context) -> str:
65+
def get_context_dialect(self, context: Context) -> str:
2866
return context.engine_adapter.dialect
2967

68+
def create_asset_dep(self, *, key: str, **kwargs: t.Any) -> ConvertibleToAssetDep:
69+
"""Create an object that resolves to an AssetDep
70+
71+
Most users of this library will not need to use this method, it is
72+
primarily the way we enable cacheable assets from dagster-sqlmesh.
73+
"""
74+
return IntermediateAssetDep(key=key, kwargs=kwargs)
75+
76+
def create_asset_out(
77+
self, *, model_key: str, asset_key: str, **kwargs: t.Any
78+
) -> ConvertibleToAssetOut:
79+
"""Create an object that resolves to an AssetOut
80+
81+
Most users of this library will not need to use this method, it is
82+
primarily the way we enable cacheable assets from dagster-sqlmesh.
83+
"""
84+
return IntermediateAssetOut(
85+
model_key=model_key,
86+
asset_key=asset_key,
87+
kinds=kwargs.pop("kinds", None),
88+
tags=kwargs.pop("tags", None),
89+
group_name=kwargs.pop("group_name", None),
90+
is_required=kwargs.pop("is_required", False),
91+
kwargs=kwargs,
92+
)
93+
3094
def get_tags(self, context: Context, model: Model) -> dict[str, str]:
3195
"""Given the sqlmesh context and a model return the tags for that model"""
3296
return {k: "true" for k in model.tags}

dagster_sqlmesh/types.py

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import typing as t
22
from dataclasses import dataclass, field
33

4-
from dagster import AssetCheckResult, AssetKey, AssetMaterialization, AssetOut
5-
from dagster._core.definitions.asset_dep import CoercibleToAssetDep
4+
from dagster import AssetCheckResult, AssetDep, AssetKey, AssetMaterialization, AssetOut
65
from sqlmesh.core.model import Model
76

87
MultiAssetResponse = t.Iterable[AssetCheckResult | AssetMaterialization]
@@ -30,10 +29,44 @@ class SQLMeshModelDep:
3029

3130
def parse_fqn(self) -> SQLMeshParsedFQN:
3231
return SQLMeshParsedFQN.parse(self.fqn)
32+
33+
class ConvertibleToAssetOut(t.Protocol):
34+
def to_asset_out(self) -> AssetOut:
35+
"""Convert to an AssetOut object."""
36+
...
3337

38+
class ConvertibleToAssetDep(t.Protocol):
39+
def to_asset_dep(self) -> AssetDep:
40+
"""Convert to an AssetDep object."""
41+
...
42+
43+
class ConvertibleToAssetKey(t.Protocol):
44+
def to_asset_key(self) -> AssetKey:
45+
...
3446

3547
@dataclass(kw_only=True)
3648
class SQLMeshMultiAssetOptions:
37-
outs: dict[str, AssetOut] = field(default_factory=lambda: {})
38-
deps: t.Iterable[CoercibleToAssetDep] = field(default_factory=lambda: {})
39-
internal_asset_deps: dict[str, set[AssetKey]] = field(default_factory=lambda: {})
49+
"""Generic class for returning dagster multi asset options from SQLMesh, the
50+
types used are intentionally generic so to allow for potentially using an
51+
intermediate representation of the dagster asset objects. This is most
52+
useful in caching purposes and is done to allow for users of this library to
53+
manipulate the dagster asset creation process as they see fit."""
54+
55+
outs: t.Mapping[str, ConvertibleToAssetOut] = field(default_factory=lambda: {})
56+
deps: t.Iterable[ConvertibleToAssetDep] = field(default_factory=lambda: [])
57+
internal_asset_deps: t.Mapping[str, set[str]] = field(default_factory=lambda: {})
58+
59+
def to_asset_outs(self) -> t.Mapping[str, AssetOut]:
60+
"""Convert to an iterable of AssetOut objects."""
61+
return {key: out.to_asset_out() for key, out in self.outs.items()}
62+
63+
def to_asset_deps(self) -> t.Iterable[AssetDep]:
64+
"""Convert to an iterable of AssetDep objects."""
65+
return [dep.to_asset_dep() for dep in self.deps]
66+
67+
def to_internal_asset_deps(self) -> dict[str, set[AssetKey]]:
68+
"""Convert to a dictionary of internal asset dependencies."""
69+
return {
70+
key: {AssetKey.from_user_string(dep) for dep in deps}
71+
for key, deps in self.internal_asset_deps.items()
72+
}

0 commit comments

Comments
 (0)