Skip to content

Commit 0964f37

Browse files
committed
Ensure GetWithPriority does not block after queue shutdown
1 parent 2d909e6 commit 0964f37

File tree

2 files changed

+41
-3
lines changed

2 files changed

+41
-3
lines changed

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,9 +290,18 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool)
290290

291291
w.notifyItemOrWaiterAdded()
292292

293-
item := <-w.get
294-
295-
return item.Key, item.Priority, w.shutdown.Load()
293+
select {
294+
case <-w.done:
295+
// Return if the queue was shutdown while we were already waiting for an item here.
296+
// For example controller workers are continuously calling GetWithPriority and
297+
// GetWithPriority is blocking the workers if there are no items in the queue.
298+
// If the controller and accordingly the queue is then shut down, without this code
299+
// branch the controller workers remain blocked here and are unable to shut down.
300+
var zero T
301+
return zero, 0, true
302+
case item := <-w.get:
303+
return item.Key, item.Priority, w.shutdown.Load()
304+
}
296305
}
297306

298307
func (w *priorityqueue[T]) Get() (item T, shutdown bool) {

pkg/controller/priorityqueue/priorityqueue_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,35 @@ var _ = Describe("Controllerworkqueue", func() {
296296
Expect(isShutDown).To(BeTrue())
297297
})
298298

299+
It("Get from priority queue should get unblocked when the priority queue is shut down", func() {
300+
q, _ := newQueue()
301+
302+
getUnblocked := make(chan struct{})
303+
304+
go func() {
305+
defer GinkgoRecover()
306+
defer close(getUnblocked)
307+
308+
item, priority, isShutDown := q.GetWithPriority()
309+
Expect(item).To(Equal(""))
310+
Expect(priority).To(Equal(0))
311+
Expect(isShutDown).To(BeTrue())
312+
}()
313+
314+
// Without this the go routine above does not get scheduled quickly enough.
315+
time.Sleep(1 * time.Millisecond)
316+
317+
// Verify the go routine above is now waiting for an item.
318+
Eventually(q.waiters.Load()).Should(Equal(int64(1)))
319+
Consistently(getUnblocked).ShouldNot(BeClosed())
320+
321+
// shut down
322+
q.ShutDown()
323+
324+
// Verify the shutdown unblocked the go routine.
325+
Eventually(getUnblocked).Should(BeClosed())
326+
})
327+
299328
It("items are included in Len() and the queueDepth metric once they are ready", func() {
300329
q, metrics := newQueue()
301330
defer q.ShutDown()

0 commit comments

Comments
 (0)