Skip to content

Commit bca0d2d

Browse files
biluriudaysajmera-pensando
authored andcommitted
remediation workflow - misc fixes (#915)
* remediation workflow - misc fixes * split validate node conditions method (cherry picked from commit d1dc65784b9e6672c196798a481f359ad3e1f2c8)
1 parent 974ce8d commit bca0d2d

File tree

2 files changed

+167
-80
lines changed

2 files changed

+167
-80
lines changed

internal/controllers/mock_remediation_handler.go

Lines changed: 43 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/controllers/remediation_handler.go

Lines changed: 124 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ package controllers
3434

3535
import (
3636
"context"
37+
"errors"
3738
"fmt"
3839
"strings"
3940
"time"
@@ -131,93 +132,47 @@ func (n *remediationMgr) HandleRemediation(ctx context.Context, devConfig *amdv1
131132
mappings[m.NodeCondition] = m
132133
}
133134

135+
var errs error
134136
for _, node := range nodes.Items {
135-
NodeLoop:
136-
for _, cond := range node.Status.Conditions {
137-
138-
wfList, err := n.helper.getWorkflowList(ctx, devConfig.Namespace)
139-
if err != nil {
140-
logger.Error(err, fmt.Sprintf("Failed to list workflows. Workflow list: %v", wfList))
141-
break NodeLoop
142-
}
143-
144-
// If a workflow is already running on that node, then skip the node but resume/delete workflow if needed
145-
for _, wf := range wfList.Items {
146-
if strings.HasPrefix(wf.Name, fmt.Sprintf("%s-", node.Name)) {
147-
if wf.Status.Phase == workflowv1alpha1.WorkflowSucceeded {
148-
if err := n.helper.deleteWorkflow(ctx, &wf); err != nil {
149-
logger.Error(err, fmt.Sprintf("Failed to delete workflow %s", wf.Name))
150-
}
151-
logger.Info(fmt.Sprintf("Deleted workflow: %s", wf.Name))
152-
} else if wf.Status.Phase == workflowv1alpha1.WorkflowRunning {
153-
stages := wf.Status.Nodes
154-
for _, wfStage := range stages {
155-
if wfStage.Type == "Suspend" && wfStage.Phase == "Running" {
156-
logger.Info(fmt.Sprintf("Suspended workflow %s found for node %s. Attempting resume.", wf.Name, node.Name))
157-
if err := n.helper.resumeSuspendedWorkflow(ctx, wf.Name, wf.Namespace); err != nil {
158-
logger.Error(err, fmt.Sprintf("Failed to resume workflow %s", wf.Name))
159-
}
160-
break NodeLoop
161-
}
162-
}
163-
logger.Info(fmt.Sprintf("Workflow: %s already running on the node: %s, skipping creation of workflow", wf.Name, node.Name))
164-
break NodeLoop
165-
}
166-
}
167-
}
168-
169-
if cond.Status != v1.ConditionTrue {
170-
continue
171-
}
172-
mapping, exists := mappings[string(cond.Type)]
173-
if !exists {
174-
continue
175-
}
176-
177-
logger.Info(fmt.Sprintf("Matching condition found on node %s for condition %s", node.Name, mapping.NodeCondition))
178-
179-
taint := v1.Taint{
180-
Key: RemediationTaintKey,
181-
Value: mapping.NodeCondition,
182-
Effect: v1.TaintEffectNoSchedule,
183-
}
184-
185-
// If taint already exists, skip the node
186-
if hasTaint := n.helper.checkIfTaintExists(&node, taint); hasTaint {
187-
logger.Info(fmt.Sprintf("Taint %s already present on node %s, skipping creation of workflow", taint.Key, node.Name))
188-
break NodeLoop
189-
}
190-
191-
// If driver install/upgrade is in progress, skip the node
192-
if driverUpgradeInProgress := n.helper.isDriverUpgradeInProgress(devConfig, &node); driverUpgradeInProgress {
193-
logger.Info(fmt.Sprintf("Driver Install/Upgrade is in progress, skipping creation of workflow on node %s", node.Name))
194-
break NodeLoop
195-
}
196-
197-
logger.Info(fmt.Sprintf("GPU Condition: %s observed and node: %s is unhealthy. Triggering Remediation Workflow: %s", mapping.NodeCondition, node.Name, mapping.WorkflowTemplate))
198-
199-
// Fetch WorkflowTemplate
200-
wfTemplate, err := n.helper.getWorkflowTemplate(ctx, mapping.WorkflowTemplate, devConfig.Namespace)
201-
if err != nil {
202-
logger.Error(err, fmt.Sprintf("Failed to fetch WorkflowTemplate %s", mapping.WorkflowTemplate))
203-
return res, err
204-
}
137+
// Validate node conditions
138+
mapping, err := n.helper.validateNodeConditions(ctx, devConfig, &node, mappings)
139+
if err != nil {
140+
logger.Info(fmt.Sprintf("Node conditions validations for node %s failed with error: %v", node.Name, err))
141+
continue
142+
}
143+
canSchedule := n.helper.isWorkflowSchedulableOnNode(ctx, devConfig, &node, mapping)
144+
if !canSchedule {
145+
continue
146+
}
205147

206-
// Populate Workflow Object
207-
wf := n.helper.populateWorkflow(ctx, wfTemplate, &mapping, node.Name, devConfig)
148+
createNewWorkflow := n.helper.handleExistingWorkflowsOnNode(ctx, devConfig, &node)
149+
if !createNewWorkflow {
150+
continue
151+
}
152+
logger.Info(fmt.Sprintf("GPU Condition: %s observed and node: %s is unhealthy. Starting Remediation Workflow: %s", mapping.NodeCondition, node.Name, mapping.WorkflowTemplate))
153+
154+
// Fetch WorkflowTemplate
155+
wfTemplate, err := n.helper.getWorkflowTemplate(ctx, mapping.WorkflowTemplate, devConfig.Namespace)
156+
if err != nil {
157+
logger.Error(err, fmt.Sprintf("Failed to start remediation workflow %s on node %s", mapping.WorkflowTemplate, node.Name))
158+
errs = errors.Join(errs, err)
159+
continue
160+
}
208161

209-
// Create Workflow
210-
if err := n.helper.createWorkflow(ctx, wf); err != nil {
211-
logger.Error(err, fmt.Sprintf("Failed to create Remediation Workflow for node %s", node.Name))
212-
return res, err
213-
}
162+
// Populate Workflow Object
163+
wf := n.helper.populateWorkflow(ctx, wfTemplate, &mapping, node.Name, devConfig)
214164

215-
logger.Info(fmt.Sprintf("Remediation Workflow for the condition is created successfully on node %s using template %s", node.Name, mapping.WorkflowTemplate))
216-
break NodeLoop
165+
// Create Workflow
166+
if err := n.helper.createWorkflow(ctx, wf); err != nil {
167+
logger.Error(err, fmt.Sprintf("Failed to create remediation workflow %s on node %s", mapping.WorkflowTemplate, node.Name))
168+
errs = errors.Join(errs, err)
169+
continue
217170
}
171+
172+
logger.Info(fmt.Sprintf("Remediation Workflow for the condition is created successfully on node %s using template %s", node.Name, mapping.WorkflowTemplate))
218173
}
219174
logger.Info("Requeue for any node conditions that may be present")
220-
return res, nil
175+
return res, errs
221176
}
222177

223178
// HandleDelete handles the delete operations during remediation process
@@ -266,6 +221,9 @@ type remediationMgrHelperAPI interface {
266221
populateWorkflow(ctx context.Context, wfTemplate *workflowv1alpha1.WorkflowTemplate, mapping *ConditionWorkflowMapping, nodeName string, devCfg *amdv1alpha1.DeviceConfig) *workflowv1alpha1.Workflow
267222
createWorkflow(ctx context.Context, workflow *workflowv1alpha1.Workflow) error
268223
deleteWorkflow(ctx context.Context, workflow *workflowv1alpha1.Workflow) error
224+
validateNodeConditions(ctx context.Context, devConfig *amdv1alpha1.DeviceConfig, node *v1.Node, mappings map[string]ConditionWorkflowMapping) (ConditionWorkflowMapping, error)
225+
isWorkflowSchedulableOnNode(ctx context.Context, devConfig *amdv1alpha1.DeviceConfig, node *v1.Node, mapping ConditionWorkflowMapping) bool
226+
handleExistingWorkflowsOnNode(ctx context.Context, devConfig *amdv1alpha1.DeviceConfig, node *v1.Node) bool
269227
}
270228

271229
type remediationMgrHelper struct {
@@ -971,3 +929,89 @@ func (h *remediationMgrHelper) getWorkflowTemplate(ctx context.Context, workflow
971929
}
972930
return wfTemplate, nil
973931
}
932+
933+
func (h *remediationMgrHelper) validateNodeConditions(ctx context.Context, devConfig *amdv1alpha1.DeviceConfig, node *v1.Node, mappings map[string]ConditionWorkflowMapping) (ConditionWorkflowMapping, error) {
934+
// Check if any node condition of interest is set to True
935+
conditionMet := false
936+
exists := false
937+
var mapping ConditionWorkflowMapping
938+
logger := log.FromContext(ctx)
939+
for _, cond := range node.Status.Conditions {
940+
if cond.Status != v1.ConditionTrue {
941+
continue
942+
}
943+
mapping, exists = mappings[string(cond.Type)]
944+
if !exists {
945+
continue
946+
}
947+
logger.Info(fmt.Sprintf("Matching condition %s found on node %s", mapping.NodeCondition, node.Name))
948+
conditionMet = true
949+
break
950+
}
951+
if !conditionMet {
952+
return mapping, fmt.Errorf("No matching condition found on node %s for condition %s", node.Name, mapping.NodeCondition)
953+
}
954+
955+
return mapping, nil
956+
}
957+
958+
func (h *remediationMgrHelper) isWorkflowSchedulableOnNode(ctx context.Context, devConfig *amdv1alpha1.DeviceConfig, node *v1.Node, mapping ConditionWorkflowMapping) bool {
959+
logger := log.FromContext(ctx)
960+
taint := v1.Taint{
961+
Key: RemediationTaintKey,
962+
Value: mapping.NodeCondition,
963+
Effect: v1.TaintEffectNoSchedule,
964+
}
965+
966+
// If taint already exists, skip the node
967+
if hasTaint := h.checkIfTaintExists(node, taint); hasTaint {
968+
logger.Info(fmt.Sprintf("Taint %s already present on node %s, skipping creation of workflow", taint.Key, node.Name))
969+
return false
970+
}
971+
972+
// If driver install/upgrade is in progress, skip the node
973+
if driverUpgradeInProgress := h.isDriverUpgradeInProgress(devConfig, node); driverUpgradeInProgress {
974+
logger.Info(fmt.Sprintf("Driver Install/Upgrade is in progress, skipping creation of workflow on node %s", node.Name))
975+
return false
976+
}
977+
return true
978+
}
979+
980+
func (h *remediationMgrHelper) handleExistingWorkflowsOnNode(ctx context.Context, devConfig *amdv1alpha1.DeviceConfig, node *v1.Node) bool {
981+
logger := log.FromContext(ctx)
982+
wfList, err := h.getWorkflowList(ctx, devConfig.Namespace)
983+
if err != nil {
984+
logger.Error(err, "Get workflow list failed")
985+
return false
986+
}
987+
988+
// If a workflow is already running on that node, then skip the node but resume/delete workflow if needed
989+
for _, wf := range wfList.Items {
990+
if strings.HasPrefix(wf.Name, fmt.Sprintf("%s-", node.Name)) {
991+
if wf.Status.Phase == workflowv1alpha1.WorkflowSucceeded {
992+
if err := h.deleteWorkflow(ctx, &wf); err != nil {
993+
logger.Error(err, fmt.Sprintf("Failed to delete workflow %s on node %v", wf.Name, node.Name))
994+
return false
995+
}
996+
logger.Info(fmt.Sprintf("Deleted completed workflow %s on node %v", wf.Name, node.Name))
997+
} else if wf.Status.Phase == workflowv1alpha1.WorkflowRunning {
998+
stages := wf.Status.Nodes
999+
for _, wfStage := range stages {
1000+
if wfStage.Type == "Suspend" && wfStage.Phase == "Running" {
1001+
logger.Info(fmt.Sprintf("Found suspended workflow %s on node %s. Attempting resume.", wf.Name, node.Name))
1002+
if err := h.resumeSuspendedWorkflow(ctx, wf.Name, wf.Namespace); err != nil {
1003+
logger.Error(err, fmt.Sprintf("Failed to resume workflow %s on node %s", wf.Name, node.Name))
1004+
} else {
1005+
logger.Info(fmt.Sprintf("successfully resumed workflow %s on node %v", wf.Name, node.Name))
1006+
}
1007+
return false
1008+
}
1009+
}
1010+
logger.Info(fmt.Sprintf("Workflow: %s already running on the node: %s, skipping creation of workflow", wf.Name, node.Name))
1011+
return false
1012+
}
1013+
break
1014+
}
1015+
}
1016+
return true
1017+
}

0 commit comments

Comments
 (0)