Skip to content

Commit a6bac24

Browse files
committed
Fix signalling around unblock
fixes #6873
1 parent 14322ad commit a6bac24

File tree

2 files changed

+36
-30
lines changed

2 files changed

+36
-30
lines changed

CHANGES/6873.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix situations with a single worker per installation failing to unblock all tasks that come in quick succession.

pulpcore/tasking/worker.py

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,9 @@ def worker_cleanup(self):
194194
"Cleanup record of missing %s process %s.", app_worker.app_type, app_worker.name
195195
)
196196
qs.delete()
197+
# This will also serve as a pacemaker because it will be triggered regularly.
198+
# Don't bother the others.
199+
self.wakeup_unblock = True
197200

198201
def beat(self):
199202
if self.app_status.last_heartbeat < timezone.now() - self.heartbeat_period:
@@ -214,7 +217,7 @@ def beat(self):
214217
# to be able to report on a congested tasking system to produce reliable results.
215218
self.record_unblocked_waiting_tasks_metric()
216219

217-
def notify_workers(self, reason="unknown"):
220+
def notify_workers(self, reason):
218221
self.cursor.execute("SELECT pg_notify('pulp_worker_wakeup', %s)", (reason,))
219222

220223
def cancel_abandoned_task(self, task, final_state, reason=None):
@@ -274,25 +277,28 @@ def unblock_tasks(self):
274277
Also it clears the notification about tasks to be unblocked and sends the notification that
275278
new unblocked tasks are made available.
276279
277-
Returns the number of new unblocked tasks.
280+
Returns None if another worker held the lock, True if unblocked tasks exist, else False.
278281
"""
279282

280283
assert not self.auxiliary
281284

282-
count = 0
283-
self.wakeup_unblock_tasks = False
285+
self.wakeup_unblock = False
284286
with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(TASK_UNBLOCKING_LOCK):
285-
if count := self._unblock_tasks():
287+
self._unblock_tasks()
288+
289+
if (
290+
Task.objects.filter(state__in=TASK_INCOMPLETE_STATES, app_lock=None)
291+
.exclude(unblocked_at=None)
292+
.exists()
293+
):
286294
self.notify_workers(TASK_WAKEUP_HANDLE)
287-
return count
295+
return True
296+
return False
297+
return None
288298

289299
def _unblock_tasks(self):
290-
"""Iterate over waiting tasks and mark them unblocked accordingly.
300+
"""Iterate over waiting tasks and mark them unblocked accordingly."""
291301

292-
Returns the number of new unblocked tasks.
293-
"""
294-
295-
count = 0
296302
taken_exclusive_resources = set()
297303
taken_shared_resources = set()
298304
# When batching this query, be sure to use "pulp_created" as a cursor
@@ -320,7 +326,6 @@ def _unblock_tasks(self):
320326
task.pulp_domain.name,
321327
)
322328
task.unblock()
323-
count += 1
324329

325330
elif (
326331
task.state == TASK_STATES.WAITING
@@ -339,7 +344,6 @@ def _unblock_tasks(self):
339344
task.pulp_domain.name,
340345
)
341346
task.unblock()
342-
count += 1
343347
elif task.state == TASK_STATES.RUNNING and task.unblocked_at is None:
344348
# This should not happen in normal operation.
345349
# And it is only an issue if the worker running that task died, because it will
@@ -356,21 +360,22 @@ def _unblock_tasks(self):
356360
taken_exclusive_resources.update(exclusive_resources)
357361
taken_shared_resources.update(shared_resources)
358362

359-
return count
360-
361363
def sleep(self):
362364
"""Wait for signals on the wakeup channel while heart beating."""
363365

364366
_logger.debug(_("Worker %s entering sleep state."), self.name)
365367
while not self.shutdown_requested and not self.wakeup_handle:
366368
r, w, x = select.select(
367-
[self.sentinel, connection.connection], [], [], self.heartbeat_period.seconds
369+
[self.sentinel, connection.connection],
370+
[],
371+
[],
372+
0 if self.wakeup_unblock else self.heartbeat_period.seconds,
368373
)
369374
self.beat()
370375
if connection.connection in r:
371376
connection.connection.execute("SELECT 1")
372-
if self.wakeup_unblock:
373-
self.unblock_tasks()
377+
if self.wakeup_unblock:
378+
self.unblock_tasks()
374379
if self.sentinel in r:
375380
os.read(self.sentinel, 256)
376381
_logger.debug(_("Worker %s leaving sleep state."), self.name)
@@ -407,21 +412,21 @@ def supervise_task(self, task):
407412
[self.sentinel, connection.connection, task_process.sentinel],
408413
[],
409414
[],
410-
self.heartbeat_period.seconds,
415+
0 if self.wakeup_unblock or self.cancel_task else self.heartbeat_period.seconds,
411416
)
412417
self.beat()
413418
if connection.connection in r:
414419
connection.connection.execute("SELECT 1")
415-
if self.cancel_task:
416-
_logger.info(
417-
_("Received signal to cancel current task %s in domain: %s."),
418-
task.pk,
419-
domain.name,
420-
)
421-
cancel_state = TASK_STATES.CANCELED
422-
self.cancel_task = False
423-
if self.wakeup_unblock:
424-
self.unblock_tasks()
420+
if self.cancel_task:
421+
_logger.info(
422+
_("Received signal to cancel current task %s in domain: %s."),
423+
task.pk,
424+
domain.name,
425+
)
426+
cancel_state = TASK_STATES.CANCELED
427+
self.cancel_task = False
428+
if self.wakeup_unblock:
429+
self.unblock_tasks()
425430
if task_process.sentinel in r:
426431
if not task_process.is_alive():
427432
break
@@ -589,7 +594,7 @@ def run(self, burst=False):
589594
if not self.auxiliary:
590595
# Attempt to flush the task queue completely.
591596
# Stop iteration if no new tasks were found to unblock.
592-
while self.unblock_tasks():
597+
while self.unblock_tasks() is not False:
593598
self.handle_unblocked_tasks()
594599
self.handle_unblocked_tasks()
595600
else:

0 commit comments

Comments
 (0)