Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/6873.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix situations with a single worker per installation failing to unblock all tasks that come in quick succession.
65 changes: 35 additions & 30 deletions pulpcore/tasking/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ def worker_cleanup(self):
"Cleanup record of missing %s process %s.", app_worker.app_type, app_worker.name
)
qs.delete()
# This will also serve as a pacemaker because it will be triggered regularly.
# Don't bother the others.
self.wakeup_unblock = True

def beat(self):
if self.app_status.last_heartbeat < timezone.now() - self.heartbeat_period:
Expand All @@ -214,7 +217,7 @@ def beat(self):
# to be able to report on a congested tasking system to produce reliable results.
self.record_unblocked_waiting_tasks_metric()

def notify_workers(self, reason="unknown"):
def notify_workers(self, reason):
self.cursor.execute("SELECT pg_notify('pulp_worker_wakeup', %s)", (reason,))

def cancel_abandoned_task(self, task, final_state, reason=None):
Expand Down Expand Up @@ -274,25 +277,28 @@ def unblock_tasks(self):
Also it clears the notification about tasks to be unblocked and sends the notification that
new unblocked tasks are made available.

Returns the number of new unblocked tasks.
Returns None if another worker held the lock, True if unblocked tasks exist, else False.
"""

assert not self.auxiliary

count = 0
self.wakeup_unblock_tasks = False
self.wakeup_unblock = False
with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(TASK_UNBLOCKING_LOCK):
if count := self._unblock_tasks():
self._unblock_tasks()

if (
Task.objects.filter(state__in=TASK_INCOMPLETE_STATES, app_lock=None)
.exclude(unblocked_at=None)
.exists()
):
self.notify_workers(TASK_WAKEUP_HANDLE)
return count
return True
return False
return None

def _unblock_tasks(self):
"""Iterate over waiting tasks and mark them unblocked accordingly.
"""Iterate over waiting tasks and mark them unblocked accordingly."""

Returns the number of new unblocked tasks.
"""

count = 0
taken_exclusive_resources = set()
taken_shared_resources = set()
# When batching this query, be sure to use "pulp_created" as a cursor
Expand Down Expand Up @@ -320,7 +326,6 @@ def _unblock_tasks(self):
task.pulp_domain.name,
)
task.unblock()
count += 1

elif (
task.state == TASK_STATES.WAITING
Expand All @@ -339,7 +344,6 @@ def _unblock_tasks(self):
task.pulp_domain.name,
)
task.unblock()
count += 1
elif task.state == TASK_STATES.RUNNING and task.unblocked_at is None:
# This should not happen in normal operation.
# And it is only an issue if the worker running that task died, because it will
Expand All @@ -356,21 +360,22 @@ def _unblock_tasks(self):
taken_exclusive_resources.update(exclusive_resources)
taken_shared_resources.update(shared_resources)

return count

def sleep(self):
"""Wait for signals on the wakeup channel while heart beating."""

_logger.debug(_("Worker %s entering sleep state."), self.name)
while not self.shutdown_requested and not self.wakeup_handle:
r, w, x = select.select(
[self.sentinel, connection.connection], [], [], self.heartbeat_period.seconds
[self.sentinel, connection.connection],
[],
[],
0 if self.wakeup_unblock else self.heartbeat_period.seconds,
)
self.beat()
if connection.connection in r:
connection.connection.execute("SELECT 1")
if self.wakeup_unblock:
self.unblock_tasks()
if self.wakeup_unblock:
self.unblock_tasks()
if self.sentinel in r:
os.read(self.sentinel, 256)
_logger.debug(_("Worker %s leaving sleep state."), self.name)
Expand Down Expand Up @@ -407,21 +412,21 @@ def supervise_task(self, task):
[self.sentinel, connection.connection, task_process.sentinel],
[],
[],
self.heartbeat_period.seconds,
0 if self.wakeup_unblock or self.cancel_task else self.heartbeat_period.seconds,
)
self.beat()
if connection.connection in r:
connection.connection.execute("SELECT 1")
if self.cancel_task:
_logger.info(
_("Received signal to cancel current task %s in domain: %s."),
task.pk,
domain.name,
)
cancel_state = TASK_STATES.CANCELED
self.cancel_task = False
if self.wakeup_unblock:
self.unblock_tasks()
if self.cancel_task:
_logger.info(
_("Received signal to cancel current task %s in domain: %s."),
task.pk,
domain.name,
)
cancel_state = TASK_STATES.CANCELED
self.cancel_task = False
if self.wakeup_unblock:
self.unblock_tasks()
if task_process.sentinel in r:
if not task_process.is_alive():
break
Expand Down Expand Up @@ -589,7 +594,7 @@ def run(self, burst=False):
if not self.auxiliary:
# Attempt to flush the task queue completely.
# Stop iteration if no new tasks were found to unblock.
while self.unblock_tasks():
while self.unblock_tasks() is not False:
self.handle_unblocked_tasks()
self.handle_unblocked_tasks()
else:
Expand Down