Skip to content

Conversation

@VaniHaripriya
Copy link
Contributor

Description of your changes:

Resolves #12352 .

This PR adds download_to_workspace option to dsl.importer to download artifacts into the pipeline workspace and consume them downstream without re-downloading.

Checklist:

@google-oss-prow
Copy link

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@google-oss-prow
Copy link

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign droctothorpe for approval. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@VaniHaripriya VaniHaripriya force-pushed the dsl-importer-download-to-workspace branch 4 times, most recently from be5f8f5 to 9866919 Compare October 13, 2025 21:31
@VaniHaripriya VaniHaripriya marked this pull request as ready for review October 14, 2025 05:15
@google-oss-prow google-oss-prow bot requested a review from mprahl October 14, 2025 05:15
@VaniHaripriya VaniHaripriya force-pushed the dsl-importer-download-to-workspace branch from 9866919 to 60212b1 Compare October 14, 2025 15:05
return nil, fmt.Errorf("failed to open bucket for uri %q: %w", artifactUri, err)
}
defer bucket.Close()
if err := objectstore.DownloadBlob(ctx, bucket, workspaceRoot, blobKey); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the workspace paths are quite correct. This is more in line with the design of having the same path but just under /kfp-workspace/.artifacts:

diff --git a/backend/src/v2/component/importer_launcher.go b/backend/src/v2/component/importer_launcher.go
index 7c11f9fbd..78e8a6d03 100644
--- a/backend/src/v2/component/importer_launcher.go
+++ b/backend/src/v2/component/importer_launcher.go
@@ -4,8 +4,6 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
-	"os"
-	"path/filepath"
 	"strings"
 
 	"github.com/kubeflow/pipelines/backend/src/common/util"
@@ -310,20 +308,22 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact
 				bucketConfig.SessionInfo = &sess
 			}
 		}
+
+		localPath, err := LocalWorkspacePathForURI(artifactUri)
+		if err != nil {
+			return nil, fmt.Errorf("failed to get local path for uri %q: %w", artifactUri, err)
+		}
+
 		blobKey, err := bucketConfig.KeyFromURI(artifactUri)
 		if err != nil {
 			return nil, fmt.Errorf("failed to derive blob key from uri %q while downloading artifact into workspace: %w", artifactUri, err)
 		}
-		workspaceRoot := filepath.Join(WorkspaceMountPath, ".artifacts")
-		if err := os.MkdirAll(workspaceRoot, 0755); err != nil {
-			return nil, fmt.Errorf("failed to create workspace directory %q: %w", workspaceRoot, err)
-		}
 		bucket, err := objectstore.OpenBucket(ctx, l.k8sClient, l.launcherV2Options.Namespace, bucketConfig)
 		if err != nil {
 			return nil, fmt.Errorf("failed to open bucket for uri %q: %w", artifactUri, err)
 		}
 		defer bucket.Close()
-		if err := objectstore.DownloadBlob(ctx, bucket, workspaceRoot, blobKey); err != nil {
+		if err := objectstore.DownloadBlob(ctx, bucket, localPath, blobKey); err != nil {
 			return nil, fmt.Errorf("failed to download artifact to workspace: %w", err)
 		}
 	}
diff --git a/backend/src/v2/component/launcher_v2.go b/backend/src/v2/component/launcher_v2.go
index 95caf375b..ac670ca2d 100644
--- a/backend/src/v2/component/launcher_v2.go
+++ b/backend/src/v2/component/launcher_v2.go
@@ -865,16 +865,13 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
 		// If the artifact is marked as already in the workspace, map the workspace path.
 		if inputArtifact.GetMetadata() != nil {
 			if v, ok := inputArtifact.GetMetadata().GetFields()["_kfp_workspace"]; ok && v.GetBoolValue() {
-				bucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(inputArtifact.Uri)
-				if err == nil {
-					blobKey, err := bucketConfig.KeyFromURI(inputArtifact.Uri)
-					if err == nil {
-						localPath := filepath.Join(WorkspaceMountPath, ".artifacts", blobKey)
-						key = fmt.Sprintf(`{{$.inputs.artifacts['%s'].path}}`, name)
-						placeholders[key] = localPath
-						continue
-					}
+				localPath, err := LocalWorkspacePathForURI(inputArtifact.Uri)
+				if err != nil {
+					return nil, fmt.Errorf("failed to get local workspace path for input artifact %q: %w", name, err)
 				}
+				key = fmt.Sprintf(`{{$.inputs.artifacts['%s'].path}}`, name)
+				placeholders[key] = localPath
+				continue
 			}
 		}
 
diff --git a/sdk/python/kfp/dsl/types/artifact_types.py b/sdk/python/kfp/dsl/types/artifact_types.py
index ae737b483..39ae42adb 100644
--- a/sdk/python/kfp/dsl/types/artifact_types.py
+++ b/sdk/python/kfp/dsl/types/artifact_types.py
@@ -94,39 +94,26 @@ class Artifact:
         self._set_path(path)
 
     def _get_path(self) -> Optional[str]:
-        # If the artifact is already present in the pipeline workspace, map to the workspace path.
-        # This is indicated by backend setting metadata['_kfp_workspace'] = True.
-        if self.metadata.get('_kfp_workspace') is True:
-            uri = self.uri or ''
-            for prefix in (RemotePrefix.GCS.value, RemotePrefix.MINIO.value,
-                           RemotePrefix.S3.value):
-                if uri.startswith(prefix):
-                    # Derive the object key relative to the bucket:
-                    # "<bucket>/<key>" -> blob_key == "<key>"
-                    without_scheme = uri[len(prefix):]
-                    parts = without_scheme.split('/', 1)
-                    blob_key = parts[1] if len(parts) == 2 else ''
-                    if blob_key:
-                        return os.path.join(WORKSPACE_MOUNT_PATH, '.artifacts',
-                                            blob_key)
-
-                    return os.path.join(WORKSPACE_MOUNT_PATH, '.artifacts')
+        local_path = self.uri
 
         if self.uri.startswith(RemotePrefix.GCS.value):
-            return _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS.value
-                                                         ):]
-        if self.uri.startswith(RemotePrefix.MINIO.value):
-            return _MINIO_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.MINIO
-                                                            .value):]
-        if self.uri.startswith(RemotePrefix.S3.value):
-            return _S3_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.S3.value
-                                                        ):]
-        if self.uri.startswith(RemotePrefix.OCI.value):
+            local_path = _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS.value                                             ):]
+        elif self.uri.startswith(RemotePrefix.MINIO.value):
+            local_path = _MINIO_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.MINIO                                                 .value):]
+        elif self.uri.startswith(RemotePrefix.S3.value):
+            local_path = _S3_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.S3.value                                             ):]
+        elif self.uri.startswith(RemotePrefix.OCI.value):
             escaped_uri = self.uri[len(RemotePrefix.OCI.value):].replace(
                 '/', '_')
-            return _OCI_LOCAL_MOUNT_PREFIX + escaped_uri
+            local_path = _OCI_LOCAL_MOUNT_PREFIX + escaped_uri
+
+        # If the artifact is already present in the pipeline workspace, map to the workspace path.
+        # This is indicated by backend setting metadata['_kfp_workspace'] = True.
+        if self.metadata.get('_kfp_workspace') is True:
+            local_path = os.path.join(WORKSPACE_MOUNT_PATH, ".artifacts", local_path.lstrip("/"))
+
         # uri == path for local execution
-        return self.uri
+        return local_path
 
     def _set_path(self, path: str) -> None:
         self.uri = convert_local_path_to_remote_path(path)

}
// Add workspace volume only if the workflow defines a workspace PVC
hasWorkspacePVC := false
for _, pvc := range c.wf.Spec.VolumeClaimTemplates {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This always mounts the PVC for all importers if the pipeline uses a workspace. Let's make this more conditional to avoid unnecessary PVC mounting which can cause pod scheduling issues. Something like this:

diff --git a/backend/src/v2/compiler/argocompiler/dag.go b/backend/src/v2/compiler/argocompiler/dag.go
index 73e3efc8b..c57193222 100644
--- a/backend/src/v2/compiler/argocompiler/dag.go
+++ b/backend/src/v2/compiler/argocompiler/dag.go
@@ -326,7 +326,8 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
 				// it's impossible to add a when condition based on driver outputs.
 				return nil, fmt.Errorf("triggerPolicy.condition on importer task is not supported")
 			}
-			importer, err := c.importerTask(name, task, taskSpecJson, inputs.parentDagID)
+
+			importer, err := c.importerTask(name, task, taskSpecJson, inputs.parentDagID, e.Importer.GetDownloadToWorkspace())
 			if err != nil {
 				return nil, err
 			}
diff --git a/backend/src/v2/compiler/argocompiler/importer.go b/backend/src/v2/compiler/argocompiler/importer.go
index 5c509bf4e..096481ee6 100644
--- a/backend/src/v2/compiler/argocompiler/importer.go
+++ b/backend/src/v2/compiler/argocompiler/importer.go
@@ -32,7 +32,7 @@ func (c *workflowCompiler) Importer(name string, componentSpec *pipelinespec.Com
 	return c.saveComponentImpl(name, importer)
 }
 
-func (c *workflowCompiler) importerTask(name string, task *pipelinespec.PipelineTaskSpec, taskJSON string, parentDagID string) (*wfapi.DAGTask, error) {
+func (c *workflowCompiler) importerTask(name string, task *pipelinespec.PipelineTaskSpec, taskJSON string, parentDagID string, downloadToWorkspace bool) (*wfapi.DAGTask, error) {
 	componentPlaceholder, err := c.useComponentSpec(task.GetComponentRef().GetName())
 	if err != nil {
 		return nil, err
@@ -43,7 +43,7 @@ func (c *workflowCompiler) importerTask(name string, task *pipelinespec.Pipeline
 	}
 	return &wfapi.DAGTask{
 		Name:     name,
-		Template: c.addImporterTemplate(),
+		Template: c.addImporterTemplate(downloadToWorkspace),
 		Arguments: wfapi.Arguments{Parameters: []wfapi.Parameter{{
 			Name:  paramTask,
 			Value: wfapi.AnyStringPtr(taskJSON),
@@ -60,11 +60,16 @@ func (c *workflowCompiler) importerTask(name string, task *pipelinespec.Pipeline
 	}, nil
 }
 
-func (c *workflowCompiler) addImporterTemplate() string {
+func (c *workflowCompiler) addImporterTemplate(downloadToWorkspace bool) string {
 	name := "system-importer"
+	if downloadToWorkspace {
+		name += "-workspace"
+	}
+
 	if _, alreadyExists := c.templates[name]; alreadyExists {
 		return name
 	}
+
 	args := []string{
 		"--executor_type", "importer",
 		"--task_spec", inputValue(paramTask),
@@ -91,18 +96,10 @@ func (c *workflowCompiler) addImporterTemplate() string {
 	if value, ok := os.LookupEnv(PublishLogsEnvVar); ok {
 		args = append(args, "--publish_logs", value)
 	}
-	// Add workspace volume only if the workflow defines a workspace PVC
-	hasWorkspacePVC := false
-	for _, pvc := range c.wf.Spec.VolumeClaimTemplates {
-		if pvc.Name == workspaceVolumeName {
-			hasWorkspacePVC = true
-			break
-		}
-	}
 
 	var volumeMounts []k8score.VolumeMount
 	var volumes []k8score.Volume
-	if hasWorkspacePVC {
+	if downloadToWorkspace {
 		volumeMounts = append(volumeMounts, k8score.VolumeMount{
 			Name:      workspaceVolumeName,
 			MountPath: component.WorkspaceMountPath,

Comment on lines 299 to 312
bucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(artifactUri)
if err != nil {
return nil, fmt.Errorf("failed to parse bucket config while downloading artifact into workspace with uri %q: %w", artifactUri, err)
}
// Resolve and attach session info from kfp-launcher config for the artifact provider
if cfg, cfgErr := config.FromConfigMap(ctx, l.k8sClient, l.launcherV2Options.Namespace); cfgErr != nil {
glog.Warningf("failed to load launcher config for workspace download: %v", cfgErr)
} else if cfg != nil {
if sess, sessErr := cfg.GetStoreSessionInfo(artifactUri); sessErr != nil {
glog.Warningf("failed to resolve store session info for %q: %v", artifactUri, sessErr)
} else {
bucketConfig.SessionInfo = &sess
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid duplicating this code and move the code from the Execute method to a helper function.

return nil, fmt.Errorf("failed to open bucket for uri %q: %w", artifactUri, err)
}
defer bucket.Close()
if err := objectstore.DownloadBlob(ctx, bucket, workspaceRoot, blobKey); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also log a message indicating the artifact is being downloaded?

)
def pipeline_with_importer_workspace() -> str:
ds = importer(
artifact_uri='minio://mlpipeline/sample/sample.txt',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test case that downloads a directory as well.

@VaniHaripriya VaniHaripriya force-pushed the dsl-importer-download-to-workspace branch 4 times, most recently from c92e63f to 9eaea2c Compare October 24, 2025 21:23
@VaniHaripriya VaniHaripriya force-pushed the dsl-importer-download-to-workspace branch 3 times, most recently from e367ac3 to e034cc7 Compare October 28, 2025 04:23
@VaniHaripriya VaniHaripriya force-pushed the dsl-importer-download-to-workspace branch from 9c1df93 to 9f12059 Compare October 28, 2025 18:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[feature] Add download_to_workspace option to dsl.importer

2 participants