@@ -259,9 +259,37 @@ func testMpiJobWaitWorkers(t *testing.T, startSuspended bool) {
259
259
}
260
260
s .events .verify (t )
261
261
262
- workerPods , err := getPodsForJob (ctx , s .kClient , mpiJob )
262
+ // The launcher job should not be created until all workers are ready even when we start in suspended mode.
263
+ job , err := getLauncherJobForMPIJob (ctx , s .kClient , mpiJob )
263
264
if err != nil {
264
- t .Fatalf ("Cannot get worker pods from job: %v" , err )
265
+ t .Fatalf ("Cannot get launcher job from job: %v" , err )
266
+ }
267
+ if job != nil {
268
+ t .Fatalf ("Launcher is created before workers" )
269
+ }
270
+
271
+ if startSuspended {
272
+ // Resume the MPIJob so that the test can follow the normal path.
273
+ mpiJob .Spec .RunPolicy .Suspend = ptr .To (false )
274
+ mpiJob , err = s .mpiClient .KubeflowV2beta1 ().MPIJobs (mpiJob .Namespace ).Update (ctx , mpiJob , metav1.UpdateOptions {})
275
+ if err != nil {
276
+ t .Fatalf ("Error Updating MPIJob: %v" , err )
277
+ }
278
+ }
279
+
280
+ var workerPods []corev1.Pod
281
+ if err = wait .PollUntilContextTimeout (ctx , util .WaitInterval , wait .ForeverTestTimeout , false , func (ctx context.Context ) (bool , error ) {
282
+ var err error
283
+ workerPods , err = getPodsForJob (ctx , s .kClient , mpiJob )
284
+ if err != nil {
285
+ return false , err
286
+ }
287
+ if len (workerPods ) != 2 {
288
+ return false , nil
289
+ }
290
+ return true , nil
291
+ }); err != nil {
292
+ t .Errorf ("Failed updating scheduler-plugins PodGroup: %v" , err )
265
293
}
266
294
267
295
err = updatePodsToPhase (ctx , s .kClient , workerPods , corev1 .PodRunning )
0 commit comments