-
Notifications
You must be signed in to change notification settings - Fork 1.8k
docs: adding example to run spark job using kubeflow pipelines v2 #12137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Vikas Saxena <[email protected]>
Signed-off-by: Vikas Saxena <[email protected]>
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
||
## vikassaxena02/vikas-kfpv2-python310-kubectl-nokfp-image:0.4 image | ||
This base image is used to run the spark code and generate logs for the same. | ||
The dockerfile to generate the same is in `dockerDir` directory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dockerfile to generate the same is in `dockerDir` directory | |
The containerfile to generate the same is in `containerfile` directory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets move this one level up and get rid of "Dockerdir" we also only support OCI containers.
@@ -0,0 +1,15 @@ | |||
# Use official Python 3.12 slim image | |||
FROM python:3.10-slim |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May we use 3.12 ?
rm -rf /var/lib/apt/lists/* | ||
|
||
# Install kubectl (v1.28.0) | ||
RUN curl -LO "https://dl.k8s.io/release/v1.28.0/bin/linux/amd64/kubectl" && \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let us use 1.33
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this not available as default-editor per namespace ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or pipeline runner is only needed if you do not install kubeflow platfrom
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this not available as default-editor per namespace ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or pipeline runner is only needed if you do not install kubeflow platfrom
""" | ||
|
||
@dsl.component( | ||
base_image="vikassaxena02/vikas-kfpv2-python310-kubectl-nokfp-image:0.4" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets use python 3.12
return spec["metadata"]["name"] | ||
|
||
@dsl.component( | ||
base_image="vikassaxena02/vikas-kfpv2-python310-kubectl-nokfp-image:0.4" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets use python 3.12
# Introduction | ||
This repo has bare minimum code to run a SparkPipeline on Kubeflow through Kubeflow Pipelines and the spark pipelines run in `default` namespace | ||
Please note that this repo uses an old version of KFP SDK to use `ResourceOp` which is deprecated in the KFP SDK v2 onwards. | ||
The code in the repo has been tested on a local cluster running on `kind` that runs both `Kubeflow Piplines` and `Kubeflow SparkOperator` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code in the repo has been tested on a local cluster running on `kind` that runs both `Kubeflow Piplines` and `Kubeflow SparkOperator` | |
The code in this repository has been tested on a local cluster running on `kind` that runs both `Kubeflow Pipelines` and `Kubeflow SparkOperator` |
|
||
## vikassaxena02/vikas-kfpv2-python310-kubectl-nokfp-image:0.4 image | ||
This base image is used to run the spark code and generate logs for the same. | ||
The dockerfile to generate the same is in `dockerDir` directory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dockerfile to generate the same is in `dockerDir` directory | |
The containerfile to generate the same is in the same directory. |
with open(spark_driver_logs.path, "w") as f: | ||
f.write(logs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with open(spark_driver_logs.path, "w") as f: | |
f.write(logs) | |
with open(spark_driver_logs.path, "w") as file: | |
file.write(logs) |
# Wait for SparkApplication to complete | ||
print("Waiting for SparkApplication to complete...") | ||
for attempt in range(60): | ||
try: | ||
get_status_cmd = [ | ||
"kubectl", "get", "sparkapplication", spark_app_name, | ||
"-n", "default", "-o", "json" | ||
] | ||
output = subprocess.check_output(get_status_cmd, text=True) | ||
status_json = json.loads(output) | ||
app_state = status_json.get("status", {}).get("applicationState", {}).get("state", "") | ||
print(f"Attempt {attempt+1}: SparkApplication state: {app_state}") | ||
if app_state in ["COMPLETED", "FAILED"]: | ||
break | ||
except Exception as e: | ||
print("Error checking SparkApplication status:", str(e)) | ||
time.sleep(10) | ||
else: | ||
raise RuntimeError("Timed out waiting for SparkApplication to complete.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the kfp python package instead of sub process if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This way you can also use the latest v2 sdk
Co-authored-by: Julius von Kohout <[email protected]> Signed-off-by: Vikas Saxena <[email protected]>
This PR adds an example to run spark jobs in kubeflow pipleines
https://github.com/vikas-saxena02/KubeflowPipelines/tree/sparkKFPExample/samples/contrib/sparkjob-kubeflowpipeline
Checklist: