Skip to content

Commit 9e4cb68

Browse files
committed
Allow runtime configuration
1 parent bf57f0d commit 9e4cb68

File tree

6 files changed

+67
-49
lines changed

6 files changed

+67
-49
lines changed

src/zenml/config/step_configurations.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from zenml.config.frozen_base_model import FrozenBaseModel
4545
from zenml.config.retry_config import StepRetryConfig
4646
from zenml.config.source import Source, SourceWithValidator
47+
from zenml.enums import StepRuntime
4748
from zenml.logger import get_logger
4849
from zenml.model.lazy_load import ModelVersionDataLazyLoader
4950
from zenml.model.model import Model
@@ -214,12 +215,11 @@ class StepConfigurationUpdate(FrozenBaseModel):
214215
default=None,
215216
description="The cache policy for the step.",
216217
)
217-
in_process: Optional[bool] = Field(
218+
runtime: Optional[StepRuntime] = Field(
218219
default=None,
219-
description="Whether to run the step in process. This is only "
220-
"applicable for dynamic pipelines. If not set, the step will by "
221-
"default run in-process unless it requires a different Docker image "
222-
"or has special resource requirements.",
220+
description="The step runtime. If not configured, the step will "
221+
"run inline unless a step operator or docker/resource settings "
222+
"are configured. This is only applicable for dynamic pipelines.",
223223
)
224224

225225
outputs: Mapping[str, PartialArtifactConfiguration] = {}

src/zenml/enums.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,3 +545,10 @@ class PipelineRunTriggeredByType(StrEnum):
545545

546546
STEP_RUN = "step_run"
547547
DEPLOYMENT = "deployment"
548+
549+
550+
class StepRuntime(StrEnum):
551+
"""All possible runtime modes for a step."""
552+
553+
INLINE = "inline"
554+
ISOLATED = "isolated"

src/zenml/execution/pipeline/dynamic/runner.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
from zenml.client import Client
3636
from zenml.config.compiler import Compiler
3737
from zenml.config.step_configurations import Step
38-
from zenml.enums import ExecutionMode
38+
from zenml.enums import ExecutionMode, StepRuntime
3939
from zenml.execution.pipeline.dynamic.outputs import (
4040
ArtifactFuture,
4141
OutputArtifact,
@@ -438,43 +438,45 @@ def _should_retry_locally(
438438
if step.config.step_operator:
439439
return True
440440

441-
if should_run_in_process(step, pipeline_docker_settings):
441+
runtime = get_step_runtime(step, pipeline_docker_settings)
442+
if runtime == StepRuntime.INLINE or step.config.step_operator:
442443
return True
443444
else:
444-
# Running out of process with the orchestrator
445+
# Running in isolated mode with the orchestrator
445446
return (
446447
not Client().active_stack.orchestrator.config.handles_step_retries
447448
)
448449

449450

450-
def should_run_in_process(
451+
def get_step_runtime(
451452
step: "Step", pipeline_docker_settings: "DockerSettings"
452-
) -> bool:
453+
) -> StepRuntime:
453454
"""Determine if a step should be run in process.
454455
455456
Args:
456457
step: The step.
457458
pipeline_docker_settings: The Docker settings of the parent pipeline.
458459
459460
Returns:
460-
Whether the step should be run in process.
461+
The runtime for the step.
461462
"""
462463
if step.config.step_operator:
463-
return False
464+
return StepRuntime.ISOLATED
464465

465466
if not Client().active_stack.orchestrator.can_launch_dynamic_steps:
466-
return True
467+
return StepRuntime.INLINE
467468

468-
if step.config.in_process is False:
469-
return False
470-
elif step.config.in_process is None:
471-
if not step.config.resource_settings.empty:
472-
return False
469+
runtime = step.config.runtime
473470

474-
if step.config.docker_settings != pipeline_docker_settings:
475-
return False
471+
if runtime is None:
472+
if not step.config.resource_settings.empty:
473+
runtime = StepRuntime.ISOLATED
474+
elif step.config.docker_settings != pipeline_docker_settings:
475+
runtime = StepRuntime.ISOLATED
476+
else:
477+
runtime = StepRuntime.INLINE
476478

477-
return True
479+
return runtime
478480

479481

480482
def get_config_template(

src/zenml/orchestrators/step_launcher.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
ENV_ZENML_STEP_OPERATOR,
2727
handle_bool_env_var,
2828
)
29-
from zenml.enums import ExecutionMode, ExecutionStatus
29+
from zenml.enums import ExecutionMode, ExecutionStatus, StepRuntime
3030
from zenml.environment import get_run_environment_dict
3131
from zenml.exceptions import RunInterruptedException, RunStoppedException
3232
from zenml.logger import get_logger
@@ -472,19 +472,22 @@ def _run_step(
472472
)
473473
else:
474474
from zenml.execution.pipeline.dynamic.runner import (
475-
should_run_in_process,
475+
get_step_runtime,
476476
)
477477

478-
if should_run_in_process(
479-
self._step,
480-
self._snapshot.pipeline_configuration.docker_settings,
481-
):
482-
if self._step.config.in_process is False:
483-
# The step was configured to run out of process, but
484-
# the orchestrator doesn't support it.
478+
step_runtime = get_step_runtime(
479+
step=self._step,
480+
pipeline_docker_settings=self._snapshot.pipeline_configuration.docker_settings,
481+
)
482+
483+
if step_runtime == StepRuntime.INLINE:
484+
if self._step.config.runtime == StepRuntime.ISOLATED:
485+
# The step was configured to run in an isolated runtime,
486+
# but the orchestrator doesn't support it.
485487
logger.warning(
486-
"The %s does not support running dynamic out of "
487-
"process steps. Running step `%s` locally instead.",
488+
"The %s does not support running dynamic steps "
489+
"in isolated runtimes. Running step `%s` in inline "
490+
"runtime instead.",
488491
self._stack.orchestrator.__class__.__name__,
489492
self._invocation_id,
490493
)

src/zenml/steps/base_step.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
ENV_ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK,
4848
handle_bool_env_var,
4949
)
50+
from zenml.enums import StepRuntime
5051
from zenml.exceptions import StepInterfaceError
5152
from zenml.logger import get_logger
5253
from zenml.materializers.base_materializer import BaseMaterializer
@@ -131,7 +132,7 @@ def __init__(
131132
retry: Optional[StepRetryConfig] = None,
132133
substitutions: Optional[Dict[str, str]] = None,
133134
cache_policy: Optional[CachePolicyOrString] = None,
134-
in_process: Optional[bool] = None,
135+
runtime: Optional[StepRuntime] = None,
135136
) -> None:
136137
"""Initializes a step.
137138
@@ -165,8 +166,10 @@ def __init__(
165166
retry: Configuration for retrying the step in case of failure.
166167
substitutions: Extra placeholders to use in the name template.
167168
cache_policy: Cache policy for this step.
168-
in_process: Whether to run the step in process. This is only
169-
applicable for dynamic pipelines.
169+
runtime: The step runtime. If not configured, the step will
170+
run inline unless a step operator or docker/resource settings
171+
are configured. This is only applicable for dynamic
172+
pipelines.
170173
"""
171174
from zenml.config.step_configurations import PartialStepConfiguration
172175

@@ -233,7 +236,7 @@ def __init__(
233236
retry=retry,
234237
substitutions=substitutions,
235238
cache_policy=cache_policy,
236-
in_process=in_process,
239+
runtime=runtime,
237240
)
238241

239242
notebook_utils.try_to_save_notebook_cell_code(self.source_object)
@@ -717,7 +720,7 @@ def configure(
717720
retry: Optional[StepRetryConfig] = None,
718721
substitutions: Optional[Dict[str, str]] = None,
719722
cache_policy: Optional[CachePolicyOrString] = None,
720-
in_process: Optional[bool] = None,
723+
runtime: Optional[StepRuntime] = None,
721724
merge: bool = True,
722725
) -> T:
723726
"""Configures the step.
@@ -761,8 +764,8 @@ def configure(
761764
retry: Configuration for retrying the step in case of failure.
762765
substitutions: Extra placeholders to use in the name template.
763766
cache_policy: Cache policy for this step.
764-
in_process: Whether to run the step in process. This is only
765-
applicable for dynamic pipelines.
767+
runtime: The step runtime. This is only applicable for dynamic
768+
pipelines.
766769
merge: If `True`, will merge the given dictionary configurations
767770
like `parameters` and `settings` with existing
768771
configurations. If `False` the given configurations will
@@ -841,7 +844,7 @@ def _convert_to_tuple(value: Any) -> Tuple[Source, ...]:
841844
"retry": retry,
842845
"substitutions": substitutions,
843846
"cache_policy": cache_policy,
844-
"in_process": in_process,
847+
"runtime": runtime,
845848
}
846849
)
847850
config = StepConfigurationUpdate(**values)
@@ -870,7 +873,7 @@ def with_options(
870873
retry: Optional[StepRetryConfig] = None,
871874
substitutions: Optional[Dict[str, str]] = None,
872875
cache_policy: Optional[CachePolicyOrString] = None,
873-
in_process: Optional[bool] = None,
876+
runtime: Optional[StepRuntime] = None,
874877
merge: bool = True,
875878
) -> "BaseStep":
876879
"""Copies the step and applies the given configurations.
@@ -904,8 +907,8 @@ def with_options(
904907
retry: Configuration for retrying the step in case of failure.
905908
substitutions: Extra placeholders for the step name.
906909
cache_policy: Cache policy for this step.
907-
in_process: Whether to run the step in process. This is only
908-
applicable for dynamic pipelines.
910+
runtime: The step runtime. This is only applicable for dynamic
911+
pipelines.
909912
merge: If `True`, will merge the given dictionary configurations
910913
like `parameters` and `settings` with existing
911914
configurations. If `False` the given configurations will
@@ -935,7 +938,7 @@ def with_options(
935938
retry=retry,
936939
substitutions=substitutions,
937940
cache_policy=cache_policy,
938-
in_process=in_process,
941+
runtime=runtime,
939942
merge=merge,
940943
)
941944
return step_copy

src/zenml/steps/step_decorator.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
)
3030
from uuid import UUID
3131

32+
from zenml.enums import StepRuntime
3233
from zenml.logger import get_logger
3334

3435
if TYPE_CHECKING:
@@ -80,7 +81,7 @@ def step(
8081
retry: Optional["StepRetryConfig"] = None,
8182
substitutions: Optional[Dict[str, str]] = None,
8283
cache_policy: Optional["CachePolicyOrString"] = None,
83-
in_process: Optional[bool] = None,
84+
runtime: Optional[StepRuntime] = None,
8485
) -> Callable[["F"], "BaseStep"]: ...
8586

8687

@@ -105,7 +106,7 @@ def step(
105106
retry: Optional["StepRetryConfig"] = None,
106107
substitutions: Optional[Dict[str, str]] = None,
107108
cache_policy: Optional["CachePolicyOrString"] = None,
108-
in_process: Optional[bool] = None,
109+
runtime: Optional[StepRuntime] = None,
109110
) -> Union["BaseStep", Callable[["F"], "BaseStep"]]:
110111
"""Decorator to create a ZenML step.
111112
@@ -141,8 +142,10 @@ def step(
141142
retry: configuration of step retry in case of step failure.
142143
substitutions: Extra placeholders for the step name.
143144
cache_policy: Cache policy for this step.
144-
in_process: Whether to run the step in process. This is only
145-
applicable for dynamic pipelines.
145+
runtime: The step runtime. If not configured, the step will
146+
run inline unless a step operator or docker/resource settings
147+
are configured. This is only applicable for dynamic
148+
pipelines.
146149
147150
Returns:
148151
The step instance.
@@ -180,7 +183,7 @@ def inner_decorator(func: "F") -> "BaseStep":
180183
retry=retry,
181184
substitutions=substitutions,
182185
cache_policy=cache_policy,
183-
in_process=in_process,
186+
runtime=runtime,
184187
)
185188

186189
return step_instance

0 commit comments

Comments
 (0)