Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
ae853e6
issue-11979 - WIP - tests passing (reproducing bugs)
hbelmiro Jul 30, 2025
a1c07b4
issue-11979 - WIP - conditional tests
hbelmiro Jul 31, 2025
7ed092a
issue-11979 - WIP - nested tests
hbelmiro Jul 31, 2025
032489c
issue-11979 - WIP - tests now fail
hbelmiro Jul 31, 2025
5821420
issue-11979-only-tests - WIP
hbelmiro Aug 5, 2025
892d433
issue-11979-only-tests - WIP - high level plan
hbelmiro Aug 5, 2025
3126a5d
issue-11979 - Add root cause analysis for conditional DAG task counting
hbelmiro Aug 5, 2025
d77c4e5
issue-11979 - WIP - Phase 1: ✅ COMPLETED - Universal detection working
hbelmiro Aug 5, 2025
39d694d
issue-11979 - WIP - Updated unit tests
hbelmiro Aug 5, 2025
9a6b84a
issue-11979 - WIP - conditional tests passing
hbelmiro Aug 5, 2025
1c8a44f
issue-11979 - WIP - Phase 3 Plan
hbelmiro Aug 5, 2025
5a3ae48
issue-11979 - WIP - complete with known limitations
hbelmiro Aug 6, 2025
48aead2
issue-11979 - WIP - fixed tests
hbelmiro Aug 6, 2025
36625d8
issue-11979 - WIP - tests moved to the right place
hbelmiro Aug 6, 2025
0618c4c
issue-11979 - WIP - Add detailed debugging and instrumentation to DAG…
hbelmiro Aug 6, 2025
a5265b2
issue-11979 - WIP - fixed tests
hbelmiro Aug 6, 2025
e78a785
issue-11979 - WIP - Skipped nested DAG tests due to architectural lim…
hbelmiro Aug 6, 2025
22658b9
issue-11979 - WIP - Fixed tests
hbelmiro Aug 6, 2025
2563969
issue-11979 - WIP - Fixed tests
hbelmiro Aug 7, 2025
88db057
issue-11979 - WIP - Fixed tests
hbelmiro Aug 7, 2025
81ddbfa
issue-11979 - WIP - Refactored nested and conditional DAG tests to ex…
hbelmiro Aug 7, 2025
2b2ac7a
issue-11979 - WIP - Added detailed debugging logs and updated pipelin…
hbelmiro Aug 7, 2025
fd65480
issue-11979 - WIP - Refactored DAG tests to replace `testV2` with `te…
hbelmiro Aug 8, 2025
9396282
issue-11979 - WIP - fix tests - set params fields
hbelmiro Aug 8, 2025
05a843d
issue-11979 - WIP - updated context
hbelmiro Aug 8, 2025
3e3e9dd
issue-11979 - WIP - Added safe access checks for task execution prope…
hbelmiro Aug 8, 2025
9c85469
issue-11979 - WIP - Added detailed debugging logs to upgrade tests an…
hbelmiro Aug 8, 2025
7ce8ccd
issue-11979 - WIP - CI passing
hbelmiro Aug 8, 2025
8a8ffe8
issue-11979 - WIP - Documented confirmed Dynamic ParallelFor limitati…
hbelmiro Aug 8, 2025
fe1455a
issue-11979 - WIP - added complex conditional test
hbelmiro Aug 11, 2025
7d0d3eb
issue-11979 - Fixed complex conditional
hbelmiro Aug 11, 2025
26ccb61
issue-11979 - WIP - reverted change in .gitignore
hbelmiro Aug 11, 2025
5f2a60e
issue-11979 - WIP - updated CONTEXT.md
hbelmiro Aug 12, 2025
af16e85
issue-11979 - WIP - nested pipelines
hbelmiro Aug 12, 2025
79fa013
issue-11979 - Set test timeout to 15m in e2e workflow
hbelmiro Aug 12, 2025
f859eb2
issue-11979 - Added integration test for ParallelFor task failure pro…
hbelmiro Aug 12, 2025
2b21e2a
issue-11979 - Disabled tests for ParallelFor failure scenarios and up…
hbelmiro Aug 12, 2025
da31348
issue-11979 - Refactored ParallelFor DAG status validation logic: add…
hbelmiro Aug 12, 2025
676bf69
issue-11979 - WIP - clear code
hbelmiro Aug 12, 2025
607ddf6
issue-11979 - WIP - refactor
hbelmiro Aug 12, 2025
5986396
issue-11979 - WIP - cleanups
hbelmiro Aug 13, 2025
fe35989
issue-11979 - WIP - fixed tests
hbelmiro Aug 13, 2025
22911e6
issue-11979 - WIP - fixed tests
hbelmiro Aug 13, 2025
13d4126
issue-11979 - WIP - cleanups
hbelmiro Aug 13, 2025
0c00608
issue-11979 - WIP - cleanups
hbelmiro Aug 13, 2025
9547915
issue-11979 - WIP - tests skipped instead of commented out
hbelmiro Aug 13, 2025
c7db043
issue-11979 - WIP - cleanups
hbelmiro Aug 13, 2025
23586fd
issue-11979 - WIP - refactored tests
hbelmiro Aug 13, 2025
1b46cdf
issue-11979 - WIP - refactored DAG test helpers for improved validation
hbelmiro Aug 13, 2025
48d0e7e
issue-11979 - WIP - refactored conditional tests
hbelmiro Aug 13, 2025
01db55c
issue-11979 - WIP - added status.md
hbelmiro Aug 14, 2025
7dead6f
issue-11979 - WIP - added note to assess test complexity in status.md
hbelmiro Aug 14, 2025
24f1958
issue-11979 - WIP - Simplified tests
hbelmiro Aug 15, 2025
b05184e
issue-11979 - Reverted changes to manifests
hbelmiro Aug 15, 2025
9162781
issue-11979 - Fixed tests
hbelmiro Aug 15, 2025
532d196
issue-11979 - Fixed TestDeeplyNestedPipelineFailurePropagation
hbelmiro Aug 15, 2025
026c4ca
issue-11979 - Updated status.md
hbelmiro Aug 15, 2025
a47b2fe
issue-11979 - Increased test timeout to 20m in e2e-test workflow
hbelmiro Aug 15, 2025
0cece6b
issue-11979 - Increased test timeout to 25m in e2e-test workflow
hbelmiro Aug 15, 2025
39f311a
issue-11979 - Updated status.md
hbelmiro Aug 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ jobs:
id: tests
if: ${{ steps.forward-api-port.outcome == 'success' }}
working-directory: ./backend/test/v2/integration
run: go test -v ./... -namespace kubeflow -args -runIntegrationTests=true
run: go test -v -timeout 25m ./... -namespace kubeflow -args -runIntegrationTests=true
env:
PULL_NUMBER: ${{ github.event.pull_request.number }}
PIPELINE_STORE: ${{ matrix.pipeline_store }}
Expand Down Expand Up @@ -297,7 +297,7 @@ jobs:
id: tests
if: ${{ steps.forward-mlmd-port.outcome == 'success' }}
working-directory: ./backend/test/v2/integration
run: go test -v ./... -namespace kubeflow -args -runIntegrationTests=true -useProxy=true
run: go test -v -timeout 25m ./... -namespace kubeflow -args -runIntegrationTests=true -useProxy=true
env:
PULL_NUMBER: ${{ github.event.pull_request.number }}
continue-on-error: true
Expand Down Expand Up @@ -359,7 +359,7 @@ jobs:
id: tests
if: ${{ steps.forward-mlmd-port.outcome == 'success' }}
working-directory: ./backend/test/v2/integration
run: go test -v ./... -namespace kubeflow -args -runIntegrationTests=true -cacheEnabled=false
run: go test -v -timeout 25m ./... -namespace kubeflow -args -runIntegrationTests=true -cacheEnabled=false
env:
PULL_NUMBER: ${{ github.event.pull_request.number }}
continue-on-error: true
Expand Down
1,743 changes: 1,743 additions & 0 deletions CONTEXT.md

Large diffs are not rendered by default.

19 changes: 18 additions & 1 deletion backend/src/v2/driver/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,10 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E
ecfg.OutputArtifacts = opts.Component.GetDag().GetOutputs().GetArtifacts()
glog.V(4).Info("outputArtifacts: ", ecfg.OutputArtifacts)

// Initial totalDagTasks calculation based on compile-time component tasks
totalDagTasks := len(opts.Component.GetDag().GetTasks())
ecfg.TotalDagTasks = &totalDagTasks
glog.V(4).Info("totalDagTasks: ", *ecfg.TotalDagTasks)
glog.V(4).Info("initial totalDagTasks: ", *ecfg.TotalDagTasks)

if opts.Task.GetArtifactIterator() != nil {
return execution, fmt.Errorf("ArtifactIterator is not implemented")
Expand Down Expand Up @@ -162,6 +163,22 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E
count := len(items)
ecfg.IterationCount = &count
execution.IterationCount = &count

// FIX: For ParallelFor, total_dag_tasks should equal iteration_count
totalDagTasks = count
ecfg.TotalDagTasks = &totalDagTasks
glog.Infof("ParallelFor: Updated totalDagTasks=%d to match iteration_count", totalDagTasks)
} else if opts.IterationIndex >= 0 {
// FIX: For individual ParallelFor iteration DAGs, inherit iteration_count from parent
// Get parent DAG to find the iteration_count
parentExecution, err := mlmd.GetExecution(ctx, dag.Execution.GetID())
if err == nil && parentExecution.GetExecution().GetCustomProperties()["iteration_count"] != nil {
parentIterationCount := int(parentExecution.GetExecution().GetCustomProperties()["iteration_count"].GetIntValue())
totalDagTasks = parentIterationCount
ecfg.IterationCount = &parentIterationCount
ecfg.TotalDagTasks = &totalDagTasks
glog.Infof("ParallelFor iteration %d: Set totalDagTasks=%d from parent iteration_count", opts.IterationIndex, totalDagTasks)
}
}

glog.V(4).Info("pipeline: ", pipeline)
Expand Down
98 changes: 84 additions & 14 deletions backend/src/v2/driver/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/kubeflow/pipelines/backend/src/v2/component"
"github.com/kubeflow/pipelines/backend/src/v2/expression"
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
pb "github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata"
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/structpb"
Expand Down Expand Up @@ -580,7 +581,7 @@ func resolveUpstreamParameters(cfg resolveUpstreamOutputsConfig) (*structpb.Valu
for {
glog.V(4).Info("currentTask: ", currentTask.TaskName())
// If the current task is a DAG:
if *currentTask.GetExecution().Type == "system.DAGExecution" {
if currentTask.GetExecution() != nil && currentTask.GetExecution().Type != nil && *currentTask.GetExecution().Type == "system.DAGExecution" {
// Since currentTask is a DAG, we need to deserialize its
// output parameter map so that we can look up its
// corresponding producer sub-task, reassign currentTask,
Expand Down Expand Up @@ -610,7 +611,14 @@ func resolveUpstreamParameters(cfg resolveUpstreamOutputsConfig) (*structpb.Valu
// output we need has multiple iterations so we have to gather all
// them and fan them in by collecting them into a list i.e.
// kfp.dsl.Collected support.
parentDAG, err := cfg.mlmd.GetExecution(cfg.ctx, currentTask.GetExecution().GetCustomProperties()["parent_dag_id"].GetIntValue())
// Safe access to parent_dag_id
var parentDAGID int64
if currentTask.GetExecution().GetCustomProperties() != nil && currentTask.GetExecution().GetCustomProperties()["parent_dag_id"] != nil {
parentDAGID = currentTask.GetExecution().GetCustomProperties()["parent_dag_id"].GetIntValue()
} else {
return nil, cfg.err(fmt.Errorf("parent_dag_id not found in task %s", currentTask.TaskName()))
}
parentDAG, err := cfg.mlmd.GetExecution(cfg.ctx, parentDAGID)
if err != nil {
return nil, cfg.err(err)
}
Expand Down Expand Up @@ -705,9 +713,16 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamOutputsConfig) (*pipelinespec.A
for {
glog.V(4).Info("currentTask: ", currentTask.TaskName())
// If the current task is a DAG:
if *currentTask.GetExecution().Type == "system.DAGExecution" {
if currentTask.GetExecution() != nil && currentTask.GetExecution().Type != nil && *currentTask.GetExecution().Type == "system.DAGExecution" {
// Get the sub-task.
parentDAG, err := cfg.mlmd.GetExecution(cfg.ctx, currentTask.GetExecution().GetCustomProperties()["parent_dag_id"].GetIntValue())
// Safe access to parent_dag_id
var parentDAGID int64
if currentTask.GetExecution().GetCustomProperties() != nil && currentTask.GetExecution().GetCustomProperties()["parent_dag_id"] != nil {
parentDAGID = currentTask.GetExecution().GetCustomProperties()["parent_dag_id"].GetIntValue()
} else {
return nil, cfg.err(fmt.Errorf("parent_dag_id not found in task %s", currentTask.TaskName()))
}
parentDAG, err := cfg.mlmd.GetExecution(cfg.ctx, parentDAGID)
if err != nil {
return nil, cfg.err(err)
}
Expand Down Expand Up @@ -834,7 +849,9 @@ func CollectInputs(
outputKey string,
isArtifact bool,
) (outputParameterList *structpb.Value, outputArtifactList *pipelinespec.ArtifactList, err error) {
glog.V(4).Infof("currentTask is a ParallelFor DAG. Attempting to gather all nested producer_subtasks")
glog.Infof("DEBUG CollectInputs: ENTRY - parallelForDAGTaskName='%s', outputKey='%s', isArtifact=%v, tasks count=%d",
parallelForDAGTaskName, outputKey, isArtifact, len(tasks))
glog.Infof("currentTask is a ParallelFor DAG. Attempting to gather all nested producer_subtasks")
// Set some helpers for the start and looping for BFS
var currentTask *metadata.Execution
var workingSubTaskName string
Expand All @@ -845,20 +862,58 @@ func CollectInputs(
parallelForParameterList := make([]*structpb.Value, 0)
parallelForArtifactList := make([]*pipelinespec.RuntimeArtifact, 0)
tasksToResolve := make([]string, 0)
// Track visited tasks to prevent infinite loops
visitedTasks := make(map[string]bool)
// Add safety limit to prevent infinite loops
maxIterations := 1000
iterationCount := 0
// Set up the queue for BFS by setting the parallelFor DAG task as the
// initial node. The loop will add the iteration dag task names for us into
// the slice/queue.
tasksToResolve = append(tasksToResolve, parallelForDAGTaskName)
previousTaskName := tasks[tasksToResolve[0]].TaskName()

// Safe access to initial task for previousTaskName
var previousTaskName string
glog.V(4).Infof("DEBUG CollectInputs: Looking up initial task '%s' in tasks map", tasksToResolve[0])
if initialTask := tasks[tasksToResolve[0]]; initialTask != nil {
previousTaskName = initialTask.TaskName()
glog.V(4).Infof("DEBUG CollectInputs: Found initial task, TaskName='%s'", previousTaskName)
} else {
glog.V(4).Infof("DEBUG CollectInputs: Initial task '%s' not found in tasks map", tasksToResolve[0])
}

for len(tasksToResolve) > 0 {
// Safety check to prevent infinite loops
iterationCount++
if iterationCount > maxIterations {
glog.Errorf("DEBUG CollectInputs: INFINITE LOOP DETECTED! Stopping after %d iterations. Queue length=%d", maxIterations, len(tasksToResolve))
return nil, nil, fmt.Errorf("infinite loop detected in CollectInputs after %d iterations", maxIterations)
}

// The starterQueue contains the first set of child DAGs from the
// parallelFor, i.e. the iteration dags.
glog.V(4).Infof("tasksToResolve: %v", tasksToResolve)
glog.Infof("DEBUG CollectInputs: Iteration %d/%d - tasksToResolve queue length=%d, queue=%v", iterationCount, maxIterations, len(tasksToResolve), tasksToResolve)
currentTaskName := tasksToResolve[0]
tasksToResolve = tasksToResolve[1:]

// Check if we've already visited this task to prevent infinite loops
if visitedTasks[currentTaskName] {
glog.Infof("DEBUG CollectInputs: Task '%s' already visited, skipping to prevent infinite loop", currentTaskName)
continue
}
visitedTasks[currentTaskName] = true
glog.Infof("DEBUG CollectInputs: Processing task '%s', visited tasks count=%d", currentTaskName, len(visitedTasks))

glog.V(4).Infof("DEBUG CollectInputs: Looking up task '%s' in tasks map (total tasks: %d)", currentTaskName, len(tasks))
currentTask = tasks[currentTaskName]

// Safe access to currentTask - check if it exists in the tasks map
if currentTask == nil {
glog.Warningf("DEBUG CollectInputs: currentTask with name '%s' not found in tasks map, skipping", currentTaskName)
continue
}

glog.V(4).Infof("DEBUG CollectInputs: Successfully found task '%s', proceeding with processing", currentTaskName)

// We check if these values need to be updated going through the
// resolution of dags/tasks Most commonly the subTaskName will change
Expand All @@ -883,19 +938,33 @@ func CollectInputs(

glog.V(4).Infof("currentTask ID: %v", currentTask.GetID())
glog.V(4).Infof("currentTask Name: %v", currentTask.TaskName())
glog.V(4).Infof("currentTask Type: %v", currentTask.GetExecution().GetType())

// Safe access to execution type
var taskType string
if currentTask.GetExecution() != nil && currentTask.GetExecution().Type != nil {
taskType = *currentTask.GetExecution().Type
glog.V(4).Infof("currentTask Type: %v", taskType)
} else {
glog.V(4).Infof("currentTask Type: nil")
}

glog.V(4).Infof("workingSubTaskName %v", workingSubTaskName)
glog.V(4).Infof("workingOutputKey: %v", workingOutputKey)

iterations := currentTask.GetExecution().GetCustomProperties()["iteration_count"]
iterationIndex := currentTask.GetExecution().GetCustomProperties()["iteration_index"]
// Safe access to custom properties
var iterations *pb.Value
var iterationIndex *pb.Value
if currentTask.GetExecution() != nil && currentTask.GetExecution().GetCustomProperties() != nil {
iterations = currentTask.GetExecution().GetCustomProperties()["iteration_count"]
iterationIndex = currentTask.GetExecution().GetCustomProperties()["iteration_index"]
}

// Base cases for handling the task that actually maps to the task that
// created the artifact/parameter we are searching for.

// Base case 1: currentTask is a ContainerExecution that we can load
// the values off of.
if *currentTask.GetExecution().Type == "system.ContainerExecution" {
if taskType == "system.ContainerExecution" {
glog.V(4).Infof("currentTask, %v, is a ContainerExecution", currentTaskName)
paramValue, artifact, err := collectContainerOutput(cfg, currentTask, workingOutputKey, isArtifact)
if err != nil {
Expand All @@ -920,7 +989,7 @@ func CollectInputs(
tempSubTaskName = metadata.GetParallelForTaskName(tempSubTaskName, iterationIndex.GetIntValue())
glog.V(4).Infof("subTaskIterationName: %v", tempSubTaskName)
}
glog.V(4).Infof("tempSubTaskName: %v", tempSubTaskName)
glog.Infof("DEBUG CollectInputs: Adding tempSubTaskName '%s' to queue", tempSubTaskName)
tasksToResolve = append(tasksToResolve, tempSubTaskName)
continue
}
Expand All @@ -930,10 +999,11 @@ func CollectInputs(
// currentTask is in fact a ParallelFor Head DAG, thus we need to add
// its iteration DAGs to the queue.

glog.Infof("DEBUG CollectInputs: Adding %d iteration tasks for ParallelFor DAG", iterations.GetIntValue())
for i := range iterations.GetIntValue() {
loopName := metadata.GetTaskNameWithDagID(currentTask.TaskName(), currentTask.GetID())
loopIterationName := metadata.GetParallelForTaskName(loopName, i)
glog.V(4).Infof("loopIterationName: %v", loopIterationName)
glog.Infof("DEBUG CollectInputs: Adding loopIterationName '%s' to queue", loopIterationName)
tasksToResolve = append(tasksToResolve, loopIterationName)
}
}
Expand Down Expand Up @@ -1071,7 +1141,7 @@ func GetProducerTask(parentTask *metadata.Execution, tasks map[string]*metadata.
func InferIndexedTaskName(producerTaskName string, dag *metadata.Execution) string {
// Check if the DAG in question is a parallelFor iteration DAG. If it is, we need to
// update the producerTaskName so the downstream task resolves the appropriate index.
if dag.GetExecution().GetCustomProperties()["iteration_index"] != nil {
if dag.GetExecution().GetCustomProperties() != nil && dag.GetExecution().GetCustomProperties()["iteration_index"] != nil {
task_iteration_index := dag.GetExecution().GetCustomProperties()["iteration_index"].GetIntValue()
producerTaskName = metadata.GetParallelForTaskName(producerTaskName, task_iteration_index)
glog.V(4).Infof("TaskIteration - ProducerTaskName: %v", producerTaskName)
Expand Down
Loading
Loading