-
Notifications
You must be signed in to change notification settings - Fork 551
Dynamic pipelines v0 #4074
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Dynamic pipelines v0 #4074
Conversation
4c02511 to
c6153aa
Compare
08b3069 to
250e5d3
Compare
250e5d3 to
e217f58
Compare
575843d to
226b65f
Compare
597b9ce to
a83c54c
Compare
e82964c to
093ee9a
Compare
093ee9a to
b0b6162
Compare
cffc807 to
ec7bae4
Compare
d5f90e8 to
e902eaf
Compare
| """ | ||
| return self._wrapped.result() | ||
|
|
||
| def load(self) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should keep a cache dictionary for materialized artifacts, so if (for some weird reason) users do multiple load calls (instead of assigning and reusing) we can return the cached results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that idea. I'll also add a boolean flag to the load method to prevent this, so users can at least manually disable it for huge artifacts.
| else: | ||
| raise ValueError(f"Invalid step run output: {result}") | ||
|
|
||
| def __getitem__(self, key: Union[str, int]) -> ArtifactFuture: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should provide a custom implementation for setitem as a better UX/guidance to users than getting an error like this: object does not support item assignment.
| """ | ||
| if isinstance(key, str): | ||
| index = self._output_keys.index(key) | ||
| elif isinstance(key, int): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend adopting one kind of wrapper behavior and stick to it. If we enable users to access futures both dict-style and list/tuple style I think the more we extend the class magic functions the more the behavior may be more confusing regarding what gets executed and what the users expects to be executed.
Plus in general accessing by key is a safer operation and would be a good pattern to enforce. The results may change length, order and accessing by position is a bit un-safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For instance, I can get dict-style (by specifying key) but the __iter__ function I may expect it to return the keys not the values. For better consistency I would say we have 2 options:
- No magic functions - We provide public methods with clean documentation. Uses can manipulate futures based on the available functions and their signatures. No confusion, users stick to following docstrings.
- We implement magic functions but we assume implementation wraps one kind of hidden data structure. Users can manipulate it as dict (preferable imo) or as a list/tuple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I agree we should only support one way. My vote goes for tuple like behaviour though, as the main purpose of my implementations was to actually support the following: When calling any python function with multiple outputs, there are two cases:
def f():
return 1, "str"
# Return value is a tuple that can be accessed with `int` keys
tuple_result = f()
int_result = tuple_result[0]
# Automatic unpacking
int_result, str_result = f()I think the latter is most common, and is the use-case I think we should support to make it feel as pythonic as possible (and also mirrors how you would call sync steps, in which case the return value will not be a future but instead of tuple of artifacts).
Tuples are also immutable which in our case holds true as well as you can't add outputs to the future result of a step run.
We can then add some helper methods like get_output(key: str) like you suggested to get allow fetching specific outputs by key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, yes, tuple should do it as well. As long as we are consistent I think we are ok.
| index=index, | ||
| ) | ||
|
|
||
| def __iter__(self) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would __contains__ make sense to also implement here? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yes I thought I already did, that definitely makes sense!
Actually if we do implement it as a tuple-like data structure the contains will be with values and might not make much sense, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yy it would make more sense in a dict-like scenario!
| Yields: | ||
| None. | ||
| """ | ||
| with env_utils.temporary_environment( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not in the context of this PR but we can do a better job here, thread-safety wise. In general os.environ should be treated as an immutable value, changing its values may also affect un-intentionally other execution paths (for instance code running in other threads).
I think masking the environment under a custom object/class and making that accessible with context vars resolves the issue (context vars are thread-local, wrapper object loads and freezes state and exposes all the operations we want with relative safety).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you're right! This is not a problem in this case, but when running multiple steps in parallel we also set different environment variables which is problematic. At least in this case I can switch to using a context var instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can work something here, but different story of course. ContextVars should work perfectly, maybe in combo with a centralized BaseSettings object. Will create the story and we can discuss implementation.
| Whether to prevent pipeline execution. | ||
| """ | ||
| return handle_bool_env_var( | ||
| ENV_ZENML_PREVENT_PIPELINE_EXECUTION, default=False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is hard to track those environment variables references. Maybe a BaseSettings object to organize those in a single place would be a good idea. Also we wouldn't need this function, pydantic would validate the boolean value for us in one go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep agreed in general that would be a nice thing. We even have some classes that do this for a subset of env variables (ServerConfiguration and GlobalConfiguration), but not for all of them. This function also treats values like "yes" as True, not sure how pydantic would handle that natively, but we could implement it for sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Pydantic captures multiple values as well, I can cross compare. Will open a new story for this just wanted to get your opinion :)
5303878 to
b7ddcd3
Compare
9e4cb68 to
1dbb402
Compare
Describe changes
This PR implements dynamic pipelines for the local/kubernetes orchestrators.
Example
Features
runtime=inline) or if the orchestrator will spin up a separate step execution environment (runtime=isolated).step.submit(...). This will either execute the step or launch a new container in a new thread.depends_on:Limitations/Known issues
Pre-requisites
Please ensure you have done the following:
developand the open PR is targetingdevelop. If your branch wasn't based on develop read Contribution guide on rebasing branch to develop.Types of changes