|
49 | 49 | from zenml.config.base_settings import BaseSettings |
50 | 50 | from zenml.constants import ( |
51 | 51 | METADATA_ORCHESTRATOR_RUN_ID, |
| 52 | + ORCHESTRATOR_DOCKER_IMAGE_KEY, |
52 | 53 | ) |
53 | 54 | from zenml.enums import ExecutionMode, ExecutionStatus, StackComponentType |
54 | 55 | from zenml.integrations.kubernetes.constants import ( |
|
80 | 81 | from zenml.stack import StackValidator |
81 | 82 |
|
82 | 83 | if TYPE_CHECKING: |
| 84 | + from zenml.config.step_run_info import StepRunInfo |
83 | 85 | from zenml.models import ( |
84 | 86 | PipelineRunResponse, |
85 | 87 | PipelineSnapshotBase, |
@@ -110,7 +112,10 @@ def should_build_pipeline_image( |
110 | 112 | settings = cast( |
111 | 113 | KubernetesOrchestratorSettings, self.get_settings(snapshot) |
112 | 114 | ) |
113 | | - return settings.always_build_pipeline_image |
| 115 | + if settings.always_build_pipeline_image: |
| 116 | + return True |
| 117 | + else: |
| 118 | + return super().should_build_pipeline_image(snapshot) |
114 | 119 |
|
115 | 120 | def get_kube_client( |
116 | 121 | self, incluster: Optional[bool] = None |
@@ -446,8 +451,6 @@ def submit_pipeline( |
446 | 451 | KubernetesOrchestratorSettings, self.get_settings(snapshot) |
447 | 452 | ) |
448 | 453 |
|
449 | | - assert stack.container_registry |
450 | | - |
451 | 454 | # Get Docker image for the orchestrator pod |
452 | 455 | try: |
453 | 456 | image = self.get_image(snapshot=snapshot) |
@@ -656,6 +659,285 @@ def _wait_for_run_to_finish() -> None: |
656 | 659 | ) |
657 | 660 | return None |
658 | 661 |
|
| 662 | + def submit_dynamic_pipeline( |
| 663 | + self, |
| 664 | + snapshot: "PipelineSnapshotResponse", |
| 665 | + stack: "Stack", |
| 666 | + environment: Dict[str, str], |
| 667 | + placeholder_run: Optional["PipelineRunResponse"] = None, |
| 668 | + ) -> Optional[SubmissionResult]: |
| 669 | + """Submits a dynamic pipeline to the orchestrator.""" |
| 670 | + from zenml.pipelines.dynamic.entrypoint_configuration import ( |
| 671 | + DynamicPipelineEntrypointConfiguration, |
| 672 | + ) |
| 673 | + |
| 674 | + pipeline_name = snapshot.pipeline_configuration.name |
| 675 | + settings = cast( |
| 676 | + KubernetesOrchestratorSettings, self.get_settings(snapshot) |
| 677 | + ) |
| 678 | + image = self.get_image(snapshot=snapshot) |
| 679 | + |
| 680 | + command = ( |
| 681 | + DynamicPipelineEntrypointConfiguration.get_entrypoint_command() |
| 682 | + ) |
| 683 | + args = DynamicPipelineEntrypointConfiguration.get_entrypoint_arguments( |
| 684 | + snapshot_id=snapshot.id, |
| 685 | + run_id=placeholder_run.id if placeholder_run else None, |
| 686 | + ) |
| 687 | + |
| 688 | + # Authorize pod to run Kubernetes commands inside the cluster. |
| 689 | + service_account_name = self._get_service_account_name(settings) |
| 690 | + |
| 691 | + # We set some default minimum resource requests for the orchestrator pod |
| 692 | + # here if the user has not specified any, because the orchestrator pod |
| 693 | + # takes up some memory resources itself and, if not specified, the pod |
| 694 | + # will be scheduled on any node regardless of available memory and risk |
| 695 | + # negatively impacting or even crashing the node due to memory pressure. |
| 696 | + orchestrator_pod_settings = kube_utils.apply_default_resource_requests( |
| 697 | + memory="400Mi", |
| 698 | + cpu="100m", |
| 699 | + pod_settings=settings.orchestrator_pod_settings, |
| 700 | + ) |
| 701 | + |
| 702 | + if self.config.pass_zenml_token_as_secret: |
| 703 | + secret_name = self.get_token_secret_name(snapshot.id) |
| 704 | + token = environment.pop("ZENML_STORE_API_TOKEN") |
| 705 | + kube_utils.create_or_update_secret( |
| 706 | + core_api=self._k8s_core_api, |
| 707 | + namespace=self.config.kubernetes_namespace, |
| 708 | + secret_name=secret_name, |
| 709 | + data={KUBERNETES_SECRET_TOKEN_KEY_NAME: token}, |
| 710 | + ) |
| 711 | + orchestrator_pod_settings.env.append( |
| 712 | + { |
| 713 | + "name": "ZENML_STORE_API_TOKEN", |
| 714 | + "valueFrom": { |
| 715 | + "secretKeyRef": { |
| 716 | + "name": secret_name, |
| 717 | + "key": KUBERNETES_SECRET_TOKEN_KEY_NAME, |
| 718 | + } |
| 719 | + }, |
| 720 | + } |
| 721 | + ) |
| 722 | + |
| 723 | + orchestrator_pod_labels = { |
| 724 | + "pipeline": kube_utils.sanitize_label(pipeline_name), |
| 725 | + } |
| 726 | + |
| 727 | + if placeholder_run: |
| 728 | + orchestrator_pod_labels["run_id"] = kube_utils.sanitize_label( |
| 729 | + str(placeholder_run.id) |
| 730 | + ) |
| 731 | + orchestrator_pod_labels["run_name"] = kube_utils.sanitize_label( |
| 732 | + placeholder_run.name |
| 733 | + ) |
| 734 | + |
| 735 | + pod_manifest = build_pod_manifest( |
| 736 | + pod_name=None, |
| 737 | + image_name=image, |
| 738 | + command=command, |
| 739 | + args=args, |
| 740 | + privileged=False, |
| 741 | + pod_settings=orchestrator_pod_settings, |
| 742 | + service_account_name=service_account_name, |
| 743 | + env=environment, |
| 744 | + labels=orchestrator_pod_labels, |
| 745 | + mount_local_stores=self.config.is_local, |
| 746 | + termination_grace_period_seconds=settings.pod_stop_grace_period, |
| 747 | + ) |
| 748 | + |
| 749 | + pod_failure_policy = settings.pod_failure_policy or { |
| 750 | + # These rules are applied sequentially. This means any failure in |
| 751 | + # the main container will count towards the max retries. Any other |
| 752 | + # disruption will not count towards the max retries. |
| 753 | + "rules": [ |
| 754 | + # If the main container fails, we count it towards the max |
| 755 | + # retries. |
| 756 | + { |
| 757 | + "action": "Count", |
| 758 | + "onExitCodes": { |
| 759 | + "containerName": "main", |
| 760 | + "operator": "NotIn", |
| 761 | + "values": [0], |
| 762 | + }, |
| 763 | + }, |
| 764 | + # If the pod is interrupted at any other time, we don't count |
| 765 | + # it as a retry |
| 766 | + { |
| 767 | + "action": "Ignore", |
| 768 | + "onPodConditions": [ |
| 769 | + { |
| 770 | + "type": "DisruptionTarget", |
| 771 | + "status": "True", |
| 772 | + } |
| 773 | + ], |
| 774 | + }, |
| 775 | + ] |
| 776 | + } |
| 777 | + |
| 778 | + job_name = settings.job_name_prefix or "" |
| 779 | + random_prefix = "".join(random.choices("0123456789abcdef", k=8)) |
| 780 | + job_name += f"-{random_prefix}-{snapshot.pipeline_configuration.name}" |
| 781 | + # The job name will be used as a label on the pods, so we need to make |
| 782 | + # sure it doesn't exceed the label length limit |
| 783 | + job_name = kube_utils.sanitize_label(job_name) |
| 784 | + |
| 785 | + job_manifest = build_job_manifest( |
| 786 | + job_name=job_name, |
| 787 | + pod_template=pod_template_manifest_from_pod(pod_manifest), |
| 788 | + backoff_limit=settings.orchestrator_job_backoff_limit, |
| 789 | + ttl_seconds_after_finished=settings.ttl_seconds_after_finished, |
| 790 | + active_deadline_seconds=settings.active_deadline_seconds, |
| 791 | + pod_failure_policy=pod_failure_policy, |
| 792 | + labels=orchestrator_pod_labels, |
| 793 | + annotations={ |
| 794 | + ORCHESTRATOR_ANNOTATION_KEY: str(self.id), |
| 795 | + }, |
| 796 | + ) |
| 797 | + |
| 798 | + if snapshot.schedule: |
| 799 | + raise RuntimeError("Dynamic pipelines cannot be scheduled yet.") |
| 800 | + else: |
| 801 | + try: |
| 802 | + kube_utils.create_job( |
| 803 | + batch_api=self._k8s_batch_api, |
| 804 | + namespace=self.config.kubernetes_namespace, |
| 805 | + job_manifest=job_manifest, |
| 806 | + ) |
| 807 | + except Exception as e: |
| 808 | + if self.config.pass_zenml_token_as_secret: |
| 809 | + secret_name = self.get_token_secret_name(snapshot.id) |
| 810 | + try: |
| 811 | + kube_utils.delete_secret( |
| 812 | + core_api=self._k8s_core_api, |
| 813 | + namespace=self.config.kubernetes_namespace, |
| 814 | + secret_name=secret_name, |
| 815 | + ) |
| 816 | + except Exception as cleanup_error: |
| 817 | + logger.error( |
| 818 | + "Error cleaning up secret %s: %s", |
| 819 | + secret_name, |
| 820 | + cleanup_error, |
| 821 | + ) |
| 822 | + raise e |
| 823 | + |
| 824 | + if settings.synchronous: |
| 825 | + |
| 826 | + def _wait_for_run_to_finish() -> None: |
| 827 | + logger.info("Waiting for orchestrator job to finish...") |
| 828 | + kube_utils.wait_for_job_to_finish( |
| 829 | + batch_api=self._k8s_batch_api, |
| 830 | + core_api=self._k8s_core_api, |
| 831 | + namespace=self.config.kubernetes_namespace, |
| 832 | + job_name=job_name, |
| 833 | + backoff_interval=settings.job_monitoring_interval, |
| 834 | + fail_on_container_waiting_reasons=settings.fail_on_container_waiting_reasons, |
| 835 | + stream_logs=True, |
| 836 | + ) |
| 837 | + |
| 838 | + return SubmissionResult( |
| 839 | + wait_for_completion=_wait_for_run_to_finish, |
| 840 | + ) |
| 841 | + else: |
| 842 | + logger.info( |
| 843 | + f"Orchestrator job `{job_name}` started. " |
| 844 | + f"Run the following command to inspect the logs: " |
| 845 | + f"`kubectl -n {self.config.kubernetes_namespace} logs " |
| 846 | + f"job/{job_name}`" |
| 847 | + ) |
| 848 | + return None |
| 849 | + |
| 850 | + def run_dynamic_out_of_process_step( |
| 851 | + self, step_run_info: "StepRunInfo", environment: Dict[str, str] |
| 852 | + ) -> None: |
| 853 | + from zenml.step_operators.step_operator_entrypoint_configuration import ( |
| 854 | + StepOperatorEntrypointConfiguration, |
| 855 | + ) |
| 856 | + |
| 857 | + settings = cast( |
| 858 | + KubernetesOrchestratorSettings, self.get_settings(step_run_info) |
| 859 | + ) |
| 860 | + image_name = step_run_info.get_image(key=ORCHESTRATOR_DOCKER_IMAGE_KEY) |
| 861 | + command = StepOperatorEntrypointConfiguration.get_entrypoint_command() |
| 862 | + args = StepOperatorEntrypointConfiguration.get_entrypoint_arguments( |
| 863 | + step_name=step_run_info.pipeline_step_name, |
| 864 | + snapshot_id=step_run_info.snapshot_id, |
| 865 | + step_run_id=str(step_run_info.step_run_id), |
| 866 | + ) |
| 867 | + |
| 868 | + step_labels = { |
| 869 | + "run_id": kube_utils.sanitize_label(str(step_run_info.run_id)), |
| 870 | + "run_name": kube_utils.sanitize_label(str(step_run_info.run_name)), |
| 871 | + "pipeline": kube_utils.sanitize_label(step_run_info.pipeline.name), |
| 872 | + "step_name": kube_utils.sanitize_label( |
| 873 | + step_run_info.pipeline_step_name |
| 874 | + ), |
| 875 | + } |
| 876 | + step_annotations = { |
| 877 | + STEP_NAME_ANNOTATION_KEY: step_run_info.pipeline_step_name, |
| 878 | + } |
| 879 | + |
| 880 | + # We set some default minimum memory resource requests for the step pod |
| 881 | + # here if the user has not specified any, because the step pod takes up |
| 882 | + # some memory resources itself and, if not specified, the pod will be |
| 883 | + # scheduled on any node regardless of available memory and risk |
| 884 | + # negatively impacting or even crashing the node due to memory pressure. |
| 885 | + pod_settings = kube_utils.apply_default_resource_requests( |
| 886 | + memory="400Mi", |
| 887 | + pod_settings=settings.pod_settings, |
| 888 | + ) |
| 889 | + |
| 890 | + pod_manifest = build_pod_manifest( |
| 891 | + pod_name=None, |
| 892 | + image_name=image_name, |
| 893 | + command=command, |
| 894 | + args=args, |
| 895 | + env=environment, |
| 896 | + privileged=settings.privileged, |
| 897 | + pod_settings=pod_settings, |
| 898 | + service_account_name=settings.service_account_name, |
| 899 | + labels=step_labels, |
| 900 | + ) |
| 901 | + |
| 902 | + job_name = settings.job_name_prefix or "" |
| 903 | + random_prefix = "".join(random.choices("0123456789abcdef", k=8)) |
| 904 | + job_name += f"-{random_prefix}-{step_run_info.pipeline_step_name}-{step_run_info.pipeline.name}" |
| 905 | + # The job name will be used as a label on the pods, so we need to make |
| 906 | + # sure it doesn't exceed the label length limit |
| 907 | + job_name = kube_utils.sanitize_label(job_name) |
| 908 | + |
| 909 | + job_manifest = build_job_manifest( |
| 910 | + job_name=job_name, |
| 911 | + pod_template=pod_template_manifest_from_pod(pod_manifest), |
| 912 | + # The orchestrator already handles retries, so we don't need to |
| 913 | + # retry the step operator job. |
| 914 | + backoff_limit=0, |
| 915 | + ttl_seconds_after_finished=settings.ttl_seconds_after_finished, |
| 916 | + active_deadline_seconds=settings.active_deadline_seconds, |
| 917 | + labels=step_labels, |
| 918 | + annotations=step_annotations, |
| 919 | + ) |
| 920 | + |
| 921 | + kube_utils.create_job( |
| 922 | + batch_api=self._k8s_batch_api, |
| 923 | + namespace=self.config.kubernetes_namespace, |
| 924 | + job_manifest=job_manifest, |
| 925 | + ) |
| 926 | + |
| 927 | + logger.info( |
| 928 | + "Waiting for job `%s` to finish...", |
| 929 | + job_name, |
| 930 | + ) |
| 931 | + kube_utils.wait_for_job_to_finish( |
| 932 | + batch_api=self._k8s_batch_api, |
| 933 | + core_api=self._k8s_core_api, |
| 934 | + namespace=self.config.kubernetes_namespace, |
| 935 | + job_name=job_name, |
| 936 | + fail_on_container_waiting_reasons=settings.fail_on_container_waiting_reasons, |
| 937 | + stream_logs=True, |
| 938 | + ) |
| 939 | + logger.info("Job completed.") |
| 940 | + |
659 | 941 | def _get_service_account_name( |
660 | 942 | self, settings: KubernetesOrchestratorSettings |
661 | 943 | ) -> str: |
|
0 commit comments