@@ -961,8 +961,13 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1
961
961
// If an error occurs during Get/Create, we'll requeue the item so we
962
962
// can attempt processing again later. This could have been caused by a
963
963
// temporary network failure, or any other transient reason.
964
+ // But, if err is about pod spec invalid, retrying would be
965
+ // futile, the status of job should turn to failed.
964
966
if err != nil {
965
967
c .recorder .Eventf (mpiJob , corev1 .EventTypeWarning , mpiJobFailedReason , "worker pod created failed: %v" , err )
968
+ if errors .IsInvalid (err ) {
969
+ return workerPods , nil
970
+ }
966
971
return nil , err
967
972
}
968
973
// If the worker is not controlled by this MPIJob resource, we should log
@@ -1076,7 +1081,6 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
1076
1081
running = 0
1077
1082
evict = 0
1078
1083
)
1079
-
1080
1084
initializeMPIJobStatuses (mpiJob , kubeflow .MPIReplicaTypeWorker )
1081
1085
//spec := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]
1082
1086
for i := 0 ; i < len (worker ); i ++ {
@@ -1100,7 +1104,19 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
1100
1104
c .recorder .Event (mpiJob , corev1 .EventTypeWarning , mpiJobEvict , msg )
1101
1105
}
1102
1106
1103
- if isMPIJobSuspended (mpiJob ) {
1107
+ // When workerSpec != nil and workerSpec.Replicas != 0 and len(worker) == 0,
1108
+ // pod spec must be wrong, job failed.
1109
+ workerSpec := mpiJob .Spec .MPIReplicaSpecs [kubeflow .MPIReplicaTypeWorker ]
1110
+ if workerSpec != nil && len (worker ) == 0 && * workerSpec .Replicas != 0 {
1111
+ msg := "invalid pod spec"
1112
+ c .recorder .Event (mpiJob , corev1 .EventTypeWarning , mpiJobFailedReason , msg )
1113
+ if mpiJob .Status .CompletionTime == nil {
1114
+ now := metav1 .Now ()
1115
+ mpiJob .Status .CompletionTime = & now
1116
+ }
1117
+ updateMPIJobConditions (mpiJob , kubeflow .JobFailed , corev1 .ConditionTrue , mpiJobFailedReason , msg )
1118
+ mpiJobsFailureCount .Inc ()
1119
+ } else if isMPIJobSuspended (mpiJob ) {
1104
1120
msg := fmt .Sprintf ("MPIJob %s/%s is suspended." , mpiJob .Namespace , mpiJob .Name )
1105
1121
updateMPIJobConditions (mpiJob , kubeflow .JobRunning , corev1 .ConditionFalse , mpiJobSuspendedReason , msg )
1106
1122
} else if launcher != nil && launcherPodsCnt >= 1 && running == len (worker ) {
0 commit comments