@@ -23,19 +23,19 @@ import (
2323 "strings"
2424 "time"
2525
26+ tfv1 "github.com/NexusGPU/tensor-fusion/api/v1"
2627 cloudprovider "github.com/NexusGPU/tensor-fusion/internal/cloudprovider"
2728 "github.com/NexusGPU/tensor-fusion/internal/cloudprovider/types"
28- "github.com/NexusGPU/tensor-fusion/internal/utils"
29-
30- tfv1 "github.com/NexusGPU/tensor-fusion/api/v1"
3129 "github.com/NexusGPU/tensor-fusion/internal/constants"
30+ "github.com/NexusGPU/tensor-fusion/internal/utils"
3231 batchv1 "k8s.io/api/batch/v1"
3332 corev1 "k8s.io/api/core/v1"
3433 "k8s.io/apimachinery/pkg/api/errors"
3534 "k8s.io/apimachinery/pkg/api/resource"
3635 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3736 "k8s.io/apimachinery/pkg/runtime"
3837 "k8s.io/client-go/tools/record"
38+ "k8s.io/client-go/util/retry"
3939 "k8s.io/utils/ptr"
4040 ctrl "sigs.k8s.io/controller-runtime"
4141 "sigs.k8s.io/controller-runtime/pkg/builder"
@@ -457,6 +457,7 @@ func (r *GPUNodeReconciler) reconcileCloudVendorNode(ctx context.Context, node *
457457 return fmt .Errorf ("failed to unmarshal cloud vendor param: %w, GPUNode: %s" , err , node .Name )
458458 }
459459
460+ // TODO: query cloud vendor by node name
460461 status , err := provider .CreateNode (ctx , & nodeParam )
461462 if err != nil {
462463 return err
@@ -469,11 +470,31 @@ func (r *GPUNodeReconciler) reconcileCloudVendorNode(ctx context.Context, node *
469470 if err != nil {
470471 return err
471472 }
473+ gpuNode .Status .Phase = tfv1 .TensorFusionGPUNodePhasePending
472474 gpuNode .Status .NodeInfo .IP = status .PrivateIP
473475 gpuNode .Status .NodeInfo .InstanceID = status .InstanceID
474476 gpuNode .Status .NodeInfo .Region = nodeParam .Region
475- if err := r .Client .Status ().Update (ctx , gpuNode ); err != nil {
476- log .FromContext (ctx ).Info ("Failed to update GPUNode status, must terminate node to keep operation atomic" , "name" , nodeParam .NodeName )
477+
478+ // Retry status update until success to handle version conflicts
479+ err = retry .RetryOnConflict (retry .DefaultBackoff , func () error {
480+ // Get the latest version before attempting an update
481+ latest := & tfv1.GPUNode {}
482+ if err := r .Client .Get (ctx , client.ObjectKey {Name : gpuNode .Name }, latest ); err != nil {
483+ return err
484+ }
485+
486+ // Apply our status updates to the latest version
487+ latest .Status .Phase = tfv1 .TensorFusionGPUNodePhasePending
488+ latest .Status .NodeInfo .IP = status .PrivateIP
489+ latest .Status .NodeInfo .InstanceID = status .InstanceID
490+ latest .Status .NodeInfo .Region = nodeParam .Region
491+
492+ // Attempt to update with the latest version
493+ return r .Client .Status ().Update (ctx , latest )
494+ })
495+
496+ if err != nil {
497+ log .FromContext (ctx ).Error (err , "Failed to update GPUNode status after retries, must terminate node to keep operation atomic" , "name" , nodeParam .NodeName )
477498 errTerminate := provider .TerminateNode (ctx , & types.NodeIdentityParam {
478499 InstanceID : status .InstanceID ,
479500 Region : nodeParam .Region ,
0 commit comments