Skip to content

Commit d04f428

Browse files
sbueringerk8s-infra-cherrypick-robot
authored andcommitted
Don't block on Get when queue is shutdown (2nd try)
Signed-off-by: Stefan Büringer [email protected]
1 parent 7f146f7 commit d04f428

File tree

2 files changed

+38
-3
lines changed

2 files changed

+38
-3
lines changed

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,9 +290,18 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool)
290290

291291
w.notifyItemOrWaiterAdded()
292292

293-
item := <-w.get
294-
295-
return item.Key, item.Priority, w.shutdown.Load()
293+
select {
294+
case <-w.done:
295+
// Return if the queue was shutdown while we were already waiting for an item here.
296+
// For example controller workers are continuously calling GetWithPriority and
297+
// GetWithPriority is blocking the workers if there are no items in the queue.
298+
// If the controller and accordingly the queue is then shut down, without this code
299+
// branch the controller workers remain blocked here and are unable to shut down.
300+
var zero T
301+
return zero, 0, true
302+
case item := <-w.get:
303+
return item.Key, item.Priority, w.shutdown.Load()
304+
}
296305
}
297306

298307
func (w *priorityqueue[T]) Get() (item T, shutdown bool) {

pkg/controller/priorityqueue/priorityqueue_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,32 @@ var _ = Describe("Controllerworkqueue", func() {
321321
Expect(isShutDown).To(BeTrue())
322322
})
323323

324+
It("Get from priority queue should get unblocked when the priority queue is shut down", func() {
325+
q, _ := newQueue()
326+
327+
getUnblocked := make(chan struct{})
328+
329+
go func() {
330+
defer GinkgoRecover()
331+
defer close(getUnblocked)
332+
333+
item, priority, isShutDown := q.GetWithPriority()
334+
Expect(item).To(Equal(""))
335+
Expect(priority).To(Equal(0))
336+
Expect(isShutDown).To(BeTrue())
337+
}()
338+
339+
// Verify the go routine above is now waiting for an item.
340+
Eventually(q.waiters.Load).Should(Equal(int64(1)))
341+
Consistently(getUnblocked).ShouldNot(BeClosed())
342+
343+
// shut down
344+
q.ShutDown()
345+
346+
// Verify the shutdown unblocked the go routine.
347+
Eventually(getUnblocked).Should(BeClosed())
348+
})
349+
324350
It("items are included in Len() and the queueDepth metric once they are ready", func() {
325351
q, metrics := newQueue()
326352
defer q.ShutDown()

0 commit comments

Comments
 (0)