Skip to content

[backend] Status update for loops, nested pipelines, dsl.IF/Else/Elif never updates #11979

@HumairAK

Description

@HumairAK

Environment

  • How did you deploy Kubeflow Pipelines (KFP)? Standalone

  • KFP version: 2.5.0

  • KFP SDK version: 2.13.0

Steps to reproduce

This is related to the issue that was resolved here. The issue was resolved for Exit handler but still does not work for Loops, dsl.IF/Else/Elif, and nested pipelines. Also related [1].

The underlying issue is that we only update the dag execution state for a given dag only one level above the containerExecution dag. So any control flow that results in nested dags will result in statuses that do not get updated. To resolve this you will likely need to keep backtracking up the dag graph until you hit a dag that still has pending tasks, at which point there is no point traversing up the dag.

There are additional considerations with these other control flows:

  • total_dag_tasks for the loop case is inaccurate, it will always be set to 2 because we are using the count of dag tasks parsed from the comipled kfp, but this is resolved at run time, iteration_count is possibly a better alternative, so try using that.
  • in the IF case, total_dag_tasks is also not accurate for the condition-branches since only one of those would execute
  • Because we only do one parant up the chain, the root dag is also never updated, and this should be updated once the entire pipeline is complete (i.e. the final launcher should wake all the way to the root)

There maybe other such cases, I encourage you try a large variate of nested dags. Here are some pipelines to get you started, all of these must report successful status for every node. The statuses should be updated right after all of a dag's child tasks are completed.

Test both failures and success states. Include various unit/integration tests testing different variations.

loops.py
import kfp  
import kfp.kubernetes  
from kfp import dsl  
from kfp.dsl import Artifact, Input, Output  
  
  
@dsl.component()  
def fail(model_id: str):  
    import sys  
    print(model_id)  
    sys.exit(1)  
  
@dsl.component()  
def hello_world():  
    print("hellow_world")  
  
@dsl.pipeline(name="Pipeline", description="Pipeline")  
def export_model():  
    # For the iteration_index execution, total_dag_tasks is always 2  
    # because this value is generated from the # of tasks in the component dag (generated at sdk compile time)    # however parallelFor can be a dynamic number and thus likely    # needs to match iteration_count (generated at runtime)    with dsl.ParallelFor(items=['1', '2', '3']) as model_id:  
        hello_task = hello_world().set_caching_options(enable_caching=False)  
        fail_task = fail(model_id=model_id).set_caching_options(enable_caching=False)  
        fail_task.after(hello_task)  
  
if __name__ == "__main__":  
    kfp.compiler.Compiler().compile(export_model, "simple_pipeline.yaml")
conditions.py
from kfp import dsl, compiler
@dsl.component()
def fail():
    import sys
    sys.exit(1)

@dsl.component()
def hello_world():
    print("hellow_world")

@dsl.component()
def post_msg():
    print(f"this is a message")

@dsl.component()
def output_msg() -> str:
    return "that"

@dsl.pipeline
def pipeline():
    output = output_msg().set_caching_options(enable_caching=False)
    # This will fail to report in the outer dag
    # Note that this dag will have multiple total_dag_tasks
    # But only one of them will be executed.
    with dsl.If('this' == output.output):
        hello_world().set_caching_options(enable_caching=False)
    with dsl.Else():
        fail().set_caching_options(enable_caching=False)

    # More nested dags
    with dsl.If('that' == output.output):
        with dsl.If('this' == output.output):
            hello_world().set_caching_options(enable_caching=False)
        with dsl.Elif('this2' == output.output):
            hello_world().set_caching_options(enable_caching=False)
        with dsl.Else():
            fail().set_caching_options(enable_caching=False)


compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path=__file__.replace('.py', '-v2.yaml'))
nested_pipelines.py
import kfp
import kfp.kubernetes
from kfp import dsl
from kfp.dsl import Artifact, Input, Output

@dsl.component()
def fail():
    import sys
    sys.exit(1)

@dsl.component()
def hello_world():
    print("hellow_world")

# Status for inner inner pipeline will be updated to fail
@dsl.pipeline(name="inner_inner_pipeline", description="")
def inner_inner_pipeline():
    fail()

# Status for inner pipeline stays RUNNING
@dsl.pipeline(name="inner__pipeline", description="")
def inner__pipeline():
    inner_inner_pipeline()

# Status for root stays RUNNING
@dsl.pipeline(name="outer_pipeline", description="")
def outer_pipeline():
    inner__pipeline()

if __name__ == "__main__":
    kfp.compiler.Compiler().compile(outer_pipeline, "simple_pipeline.yaml")
### Expected result

All control flow and nested dag variations should report successful statuses upon completion, as soon as they complete.

Notes:

Integration tests should include verification of status updates are accurately reported for the aforementioned control flows.

Impacted by this bug? Give it a 👍.

Metadata

Metadata

Assignees

Type

No type

Projects

Status

Triaged

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions