2929
3030logger = logging .getLogger (__name__ )
3131
32+
3233def _START_OF_UNIX_TIME ():
33- dt = datetime .strptime (
34- "1970-01-01T00:00:00Z" , "%Y-%m-%dT%H:%M:%SZ"
35- )
34+ dt = datetime .strptime ("1970-01-01T00:00:00Z" , "%Y-%m-%dT%H:%M:%SZ" )
3635 return dt .astimezone (UTC )
3736
3837
@@ -72,7 +71,9 @@ def as_dagster_metadata(
7271 last_promoted = dg .MetadataValue .timestamp (
7372 max (previous .last_promoted , self .last_promoted )
7473 )
75- last_backfill = dg .MetadataValue .timestamp (max (previous .last_backfill , self .last_backfill ))
74+ last_backfill = dg .MetadataValue .timestamp (
75+ max (previous .last_backfill , self .last_backfill )
76+ )
7677 created_at = dg .MetadataValue .timestamp (previous .created_at )
7778 else :
7879 # If there is no previous materialization status all dates can use
@@ -98,21 +99,25 @@ def from_dagster_metadata(
9899 # convert metadata values
99100 converted : dict [str , dg .MetadataValue ] = {}
100101 for key , value in metadata .items ():
101- assert isinstance (value , dg .MetadataValue ), f"Expected MetadataValue for { key } , got { type (value )} "
102+ assert isinstance (
103+ value , dg .MetadataValue
104+ ), f"Expected MetadataValue for { key } , got { type (value )} "
102105 converted [key ] = value
103106
104- return cls .model_validate (dict (
105- model_fqn = converted ["model_fqn" ].value ,
106- snapshot_id = converted ["snapshot_id" ].value ,
107- created_at = converted ["created_at" ].value ,
108- last_updated_or_restated = converted ["last_updated_or_restated" ].value ,
109- last_promoted = converted ["last_promoted" ].value ,
110- last_backfill = converted ["last_backfill" ].value ,
111- ))
112-
107+ return cls .model_validate (
108+ dict (
109+ model_fqn = converted ["model_fqn" ].value ,
110+ snapshot_id = converted ["snapshot_id" ].value ,
111+ created_at = converted ["created_at" ].value ,
112+ last_updated_or_restated = converted ["last_updated_or_restated" ].value ,
113+ last_promoted = converted ["last_promoted" ].value ,
114+ last_backfill = converted ["last_backfill" ].value ,
115+ )
116+ )
117+
113118 def as_glot_table (self ) -> exp .Table :
114119 return sqlglot .to_table (self .model_fqn )
115-
120+
116121 def is_match (self , input : str , ignore_catalog : bool = False ) -> bool :
117122 """Tests if the passed in string matches this model's table
118123
@@ -133,7 +138,7 @@ def is_match(self, input: str, ignore_catalog: bool = False) -> bool:
133138 return False
134139 if input_as_table .db != table .db :
135140 return False
136-
141+
137142 if not ignore_catalog :
138143 if input_as_table .catalog != table .catalog :
139144 return False
@@ -251,12 +256,17 @@ def notify_queue_next(self) -> tuple[str, ModelMaterializationStatus] | None:
251256
252257 if model_name_for_notification in self ._non_model_names :
253258 self ._current_index += 1
254- self .logger .debug (f"skipping non-model snapshot { model_name_for_notification } " )
259+ self .logger .debug (
260+ f"skipping non-model snapshot { model_name_for_notification } "
261+ )
255262 continue
256263
257264 if model_name_for_notification in self ._model_metadata :
258265 self ._current_index += 1
259- return (model_name_for_notification , self ._model_metadata [model_name_for_notification ])
266+ return (
267+ model_name_for_notification ,
268+ self ._model_metadata [model_name_for_notification ],
269+ )
260270 return None
261271
262272
@@ -320,7 +330,23 @@ def __init__(
320330 dag : DAG [t .Any ],
321331 prefix : str ,
322332 is_testing : bool = False ,
333+ materializations_enabled : bool = True ,
323334 ) -> None :
335+ """Dagster event handler for SQLMesh models.
336+
337+ The handler is responsible for reporting events from sqlmesh to dagster.
338+
339+ Args:
340+ context: The Dagster asset execution context.
341+ models_map: A mapping of model names to their SQLMesh model instances.
342+ dag: The directed acyclic graph representing the SQLMesh models.
343+ prefix: A prefix to use for all asset keys generated by this handler.
344+ is_testing: Whether the handler is being used in a testing context.
345+ materializations_enabled: Whether the handler is to generate
346+ materializations, this should be disabled if you with to run a
347+ sqlmesh plan or run in an environment different from the normal
348+ target environment.
349+ """
324350 self ._models_map = models_map
325351 self ._prefix = prefix
326352 self ._context = context
@@ -331,6 +357,7 @@ def __init__(
331357 self ._stage = "plan"
332358 self ._errors : list [Exception ] = []
333359 self ._is_testing = is_testing
360+ self ._materializations_enabled = materializations_enabled
334361
335362 def process_events (self , event : console .ConsoleEvent ) -> None :
336363 self .report_event (event )
@@ -363,7 +390,14 @@ def notify_success(
363390 )
364391 else :
365392 asset_key = self ._context .asset_key_for_output (output_key )
366- yield self .create_materialize_result (self ._context , asset_key , materialization_status )
393+ if self ._materializations_enabled :
394+ yield self .create_materialize_result (
395+ self ._context , asset_key , materialization_status
396+ )
397+ else :
398+ self ._logger .debug (
399+ f"Materializations disabled. Would have materialized for { asset_key .to_user_string ()} "
400+ )
367401 notify = self ._tracker .notify_queue_next ()
368402 else :
369403 self ._logger .debug ("No more materializations to process" )
@@ -384,12 +418,14 @@ def create_materialize_result(
384418 )
385419 last_materialization_status = None
386420 else :
387- assert last_materialization . asset_materialization is not None , (
388- "Expected asset materialization to be present."
389- )
421+ assert (
422+ last_materialization . asset_materialization is not None
423+ ), "Expected asset materialization to be present."
390424 try :
391- last_materialization_status = ModelMaterializationStatus .from_dagster_metadata (
392- dict (last_materialization .asset_materialization .metadata )
425+ last_materialization_status = (
426+ ModelMaterializationStatus .from_dagster_metadata (
427+ dict (last_materialization .asset_materialization .metadata )
428+ )
393429 )
394430 except Exception as e :
395431 self ._logger .warning (
@@ -399,7 +435,9 @@ def create_materialize_result(
399435
400436 return dg .MaterializeResult (
401437 asset_key = asset_key ,
402- metadata = current_materialization_status .as_dagster_metadata (last_materialization_status ),
438+ metadata = current_materialization_status .as_dagster_metadata (
439+ last_materialization_status
440+ ),
403441 )
404442
405443 def report_event (self , event : console .ConsoleEvent ) -> None :
@@ -554,6 +592,7 @@ def run(
554592 skip_run : bool = False ,
555593 plan_options : PlanOptions | None = None ,
556594 run_options : RunOptions | None = None ,
595+ materializations_enabled : bool = True ,
557596 ) -> t .Iterable [dg .MaterializeResult ]:
558597 """Execute SQLMesh based on the configuration given"""
559598 plan_options = plan_options or {}
@@ -592,6 +631,7 @@ def run(
592631 dag = dag ,
593632 prefix = "sqlmesh: " ,
594633 is_testing = self .is_testing ,
634+ materializations_enabled = materializations_enabled ,
595635 )
596636
597637 def raise_for_sqlmesh_errors (
@@ -645,13 +685,15 @@ def create_event_handler(
645685 models_map : dict [str , Model ],
646686 prefix : str ,
647687 is_testing : bool ,
688+ materializations_enabled : bool ,
648689 ) -> DagsterSQLMeshEventHandler :
649690 return DagsterSQLMeshEventHandler (
650691 context = context ,
651692 dag = dag ,
652693 models_map = models_map ,
653694 prefix = prefix ,
654695 is_testing = is_testing ,
696+ materializations_enabled = materializations_enabled ,
655697 )
656698
657699 def _get_selected_models_from_context (
0 commit comments