Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 56 additions & 3 deletions pkg/controller/podgc/gc_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewPodGC(kubeClient clientset.Interface, podInformer coreinformers.PodInfor
func (gcc *PodGCController) Run(stop <-chan struct{}) {
defer utilruntime.HandleCrash()

klog.Infof("Starting GC controller")
klog.Infof("Test Shutdown: Starting GC controller")
defer gcc.nodeQueue.ShutDown()
defer klog.Infof("Shutting down GC controller")

Expand All @@ -113,6 +113,7 @@ func (gcc *PodGCController) gc() {
if gcc.terminatedPodThreshold > 0 {
gcc.gcTerminated(pods)
}
gcc.gcTerminating(pods) // Test Shutdown
gcc.gcOrphaned(pods, nodes)
gcc.gcUnscheduledTerminating(pods)
}
Expand All @@ -124,6 +125,58 @@ func isPodTerminated(pod *v1.Pod) bool {
return false
}

// Test Shutdown
func isPodTerminating(pod *v1.Pod) bool {
klog.Infof("Test shutdown: isPodTerminating : pod %s status phase is %v\n", pod.Name, pod.Status.Phase)
klog.Infof("Test shutdown: isPodTerminating : pod %s status reason is %v\n", pod.Name, pod.Status.Reason)
if phase := pod.Status.Phase; phase != v1.PodPending && phase != v1.PodRunning && phase != v1.PodUnknown {
klog.Infof("Test Shutdown: pod status phase is not pending/running/unknown, returning true")
return true
}
reason := pod.Status.Reason
klog.Infof("Test shutdown: isPodTerminating : pod %s delete time is %v\n", pod.Name, pod.ObjectMeta.DeletionTimestamp)
if pod.ObjectMeta.DeletionTimestamp != nil && reason == "Terminated" {
klog.Infof("garbage collecting pod %s that is terminating. Phase [%v], Reason [%v]", pod.Name, pod.Status.Phase, pod.Status.Reason)
return true
}
return false
}

// Test Shutdown
func (gcc *PodGCController) gcTerminating(pods []*v1.Pod) {
klog.Infof("Test Shutdown: entering PodGCController:gcTerminating")
terminatedPods := []*v1.Pod{}
for _, pod := range pods {
if isPodTerminating(pod) {
terminatedPods = append(terminatedPods, pod)
}
}

terminatedPodCount := len(terminatedPods)
//deleteCount := terminatedPodCount - gcc.terminatedPodThreshold
deleteCount := terminatedPodCount
if deleteCount <= 0 {
klog.Infof("Delete count is %v\n", deleteCount)
return
}

klog.Infof("Test Shutdown: garbage collecting %v pods that are terminating", deleteCount)
// sort only when necessary
sort.Sort(byCreationTimestamp(terminatedPods))
var wait sync.WaitGroup
for i := 0; i < deleteCount; i++ {
wait.Add(1)
go func(namespace string, name string) {
defer wait.Done()
if err := gcc.deletePod(namespace, name); err != nil {
// ignore not founds
defer utilruntime.HandleError(err)
}
}(terminatedPods[i].Namespace, terminatedPods[i].Name)
}
wait.Wait()
}

func (gcc *PodGCController) gcTerminated(pods []*v1.Pod) {
terminatedPods := []*v1.Pod{}
for _, pod := range pods {
Expand Down Expand Up @@ -158,7 +211,7 @@ func (gcc *PodGCController) gcTerminated(pods []*v1.Pod) {

// gcOrphaned deletes pods that are bound to nodes that don't exist.
func (gcc *PodGCController) gcOrphaned(pods []*v1.Pod, nodes []*v1.Node) {
klog.V(4).Infof("GC'ing orphaned")
klog.V(4).Infof("Test Shutdown: GC'ing orphaned")
existingNodeNames := sets.NewString()
for _, node := range nodes {
existingNodeNames.Insert(node.Name)
Expand Down Expand Up @@ -221,7 +274,7 @@ func (gcc *PodGCController) checkIfNodeExists(name string) (bool, error) {

// gcUnscheduledTerminating deletes pods that are terminating and haven't been scheduled to a particular node.
func (gcc *PodGCController) gcUnscheduledTerminating(pods []*v1.Pod) {
klog.V(4).Infof("GC'ing unscheduled pods which are terminating.")
klog.V(4).Infof("Test Shutdown: GC'ing unscheduled pods which are terminating.")

for _, pod := range pods {
if pod.DeletionTimestamp == nil || len(pod.Spec.NodeName) > 0 {
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/volume/attachdetach/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,13 @@ func (rc *reconciler) reconcile() {
}
// Check whether timeout has reached the maximum waiting time
timeout := elapsedTime > rc.maxWaitForUnmountDuration
// Test Shutdown
// TODO: Check CSIDriver SafeDetach
klog.V(5).Infof("Test Shutdown: Can detach volume? elapsed time [%v] max wait [%v]", elapsedTime, rc.maxWaitForUnmountDuration)
// Check whether volume is still mounted. Skip detach if it is still mounted unless timeout
if attachedVolume.MountedByNode && !timeout {
klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Cannot detach volume because it is still mounted", ""))
continue
//continue // Test Shutdown
}

// Before triggering volume detach, mark volume as detached and update the node status
Expand Down
3 changes: 3 additions & 0 deletions pkg/kubelet/kubelet_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,10 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
}

// Patch the current status on the API server
// print out original / updated
klog.Infof("The original node is %v \n", originalNode)
updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
klog.Infof("The updated node is %v \n", updatedNode)
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/clock"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
kubeletevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
Expand Down Expand Up @@ -111,8 +109,10 @@ func (m *Manager) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitR
// Start starts the node shutdown manager and will start watching the node for shutdown events.
func (m *Manager) Start() error {
if !m.isFeatureEnabled() {
klog.Info("Node shutdown feature is not enabled")
return nil
}
klog.Info("Node shutdown manager starting")
stop, err := m.start()
if err != nil {
return err
Expand Down Expand Up @@ -217,11 +217,13 @@ func (m *Manager) start() (chan struct{}, error) {
m.nodeShuttingDownMutex.Unlock()

if isShuttingDown {
klog.Info("Node shutdown. Update node status")
// Update node status and ready condition
go m.syncNodeStatus()

m.processShutdownEvent()
} else {
klog.Info("Not shutting down")
m.aquireInhibitLock()
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/volumemanager/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func (rc *reconciler) mountAttachVolumes() {
}

func (rc *reconciler) unmountDetachDevices() {
// Test Shutdown: csi_attacher.go: UnmountDevice("/var/lib/kubelet/plugins/kubernetes.io/csi/pv/pvc-xxxx-xxxx-xxxxx-xxxxx/globalmount") for k8s-node-000
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
Expand Down