Skip to content

Commit fc3ce1c

Browse files
committed
Refactor listen/notify as a pubsub backend
1 parent e118bfd commit fc3ce1c

File tree

6 files changed

+79
-71
lines changed

6 files changed

+79
-71
lines changed

pulpcore/app/models/task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class Task(BaseModel, AutoAddObjPermsMixin):
8989
9090
The transitions to CANCELING (marked with *) are the only ones allowed to happen without
9191
holding the tasks advisory lock. Canceling is meant to be initiated asyncronously by a sparate
92-
process before signalling the worker via Postgres LISTEN.
92+
process before signalling the worker via a pubsub notification (e.g, Postgres LISTEN).
9393
9494
Fields:
9595

pulpcore/constants.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@
1616
TASK_WAKEUP_UNBLOCK = "unblock"
1717
TASK_WAKEUP_HANDLE = "handle"
1818

19+
#: All valid tasking pubsub channels
20+
TASK_PUBSUB = SimpleNamespace(
21+
WAKEUP_WORKER="pulp_worker_wakeup",
22+
CANCEL_TASK="pulp_worker_cancel",
23+
WORKER_METRICS="pulp_worker_metrics_heartbeat",
24+
)
25+
1926
#: All valid task states.
2027
TASK_STATES = SimpleNamespace(
2128
WAITING="waiting",

pulpcore/tasking/pubsub.py

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,24 @@
11
from typing import NamedTuple
2+
from pulpcore.constants import TASK_PUBSUB
23
import os
34
import logging
45
from contextlib import suppress
56

67
logger = logging.getLogger(__name__)
78

89

9-
def wakeup_worker(pubsub_backend, reason="unknown"):
10-
pubsub_backend.publish(BasePubSubBackend.WORKER_WAKEUP, reason)
11-
12-
13-
def cancel_task(task_pk, pubsub_backend):
14-
pubsub_backend.publish(BasePubSubBackend.TASK_CANCELLATION, str(task_pk))
15-
16-
17-
def record_worker_metrics(pubsub_backend, now):
18-
pubsub_backend.publish(BasePubSubBackend.WORKER_METRIC, str(now))
10+
class BasePubSubBackend:
11+
# Utils
12+
def wakeup_worker(self, reason="unknown"):
13+
self.publish(TASK_PUBSUB.WAKEUP_WORKER, reason)
1914

15+
def cancel_task(self, task_pk):
16+
self.publish(TASK_PUBSUB.CANCEL_TASK, str(task_pk))
2017

21-
class BasePubSubBackend:
22-
WORKER_WAKEUP = "pulp_worker_wakeup"
23-
TASK_CANCELLATION = "pulp_worker_cancel"
24-
WORKER_METRIC = "pulp_worker_metrics_heartbeat"
18+
def record_worker_metrics(self, now):
19+
self.publish(TASK_PUBSUB.WORKER_METRICS, str(now))
2520

21+
# Interface
2622
def subscribe(self, channel, callback):
2723
raise NotImplementedError()
2824

pulpcore/tasking/tasks.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
TASK_WAKEUP_UNBLOCK,
2929
)
3030
from pulpcore.middleware import x_task_diagnostics_var
31+
from pulpcore.tasking import pubsub
3132
from pulpcore.tasking.kafka import send_task_notification
3233

3334
_logger = logging.getLogger(__name__)
@@ -48,12 +49,6 @@ def _validate_and_get_resources(resources):
4849
return list(resource_set)
4950

5051

51-
def wakeup_worker(reason="unknown"):
52-
# Notify workers
53-
with connection.connection.cursor() as cursor:
54-
cursor.execute("SELECT pg_notify('pulp_worker_wakeup', %s)", (reason,))
55-
56-
5752
def execute_task(task):
5853
# This extra stack is needed to isolate the current_task ContextVar
5954
contextvars.copy_context().run(_execute_task, task)
@@ -309,7 +304,8 @@ def dispatch(
309304
task.set_canceling()
310305
task.set_canceled(TASK_STATES.CANCELED, "Resources temporarily unavailable.")
311306
if notify_workers:
312-
wakeup_worker(TASK_WAKEUP_UNBLOCK)
307+
with pubsub.PostgresPubSub(connection) as pubsub_client:
308+
pubsub_client.wakeup_worker(reason=TASK_WAKEUP_UNBLOCK)
313309
return task
314310

315311

@@ -343,9 +339,9 @@ def cancel_task(task_id):
343339
# This is the only valid transition without holding the task lock
344340
task.set_canceling()
345341
# Notify the worker that might be running that task and other workers to clean up
346-
with connection.cursor() as cursor:
347-
cursor.execute("SELECT pg_notify('pulp_worker_cancel', %s)", (str(task.pk),))
348-
cursor.execute("NOTIFY pulp_worker_wakeup")
342+
with pubsub.PostgresPubSub(connection) as pubsub_client:
343+
pubsub_client.cancel_task(task_pk=task.pk)
344+
pubsub_client.wakeup_worker()
349345
return task
350346

351347

pulpcore/tasking/worker.py

Lines changed: 55 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@
2626
TASK_METRICS_HEARTBEAT_LOCK,
2727
TASK_WAKEUP_UNBLOCK,
2828
TASK_WAKEUP_HANDLE,
29+
TASK_PUBSUB,
2930
)
3031
from pulpcore.metrics import init_otel_meter
3132
from pulpcore.app.apps import pulp_plugin_configs
3233
from pulpcore.app.models import Worker, Task, ApiAppStatus, ContentAppStatus
3334
from pulpcore.app.util import PGAdvisoryLock, get_domain
3435
from pulpcore.exceptions import AdvisoryLockError
3536

37+
from pulpcore.tasking import pubsub
3638
from pulpcore.tasking.storage import WorkerDirectory
3739
from pulpcore.tasking._util import (
3840
delete_incomplete_resources,
@@ -72,7 +74,6 @@ def __init__(self, auxiliary=False):
7274
self.heartbeat_period = timedelta(seconds=settings.WORKER_TTL / 3)
7375
self.last_metric_heartbeat = timezone.now()
7476
self.versions = {app.label: app.version for app in pulp_plugin_configs()}
75-
self.cursor = connection.cursor()
7677
self.worker = self.handle_worker_heartbeat()
7778
# This defaults to immediate task cancellation.
7879
# It will be set into the future on moderately graceful worker shutdown,
@@ -81,6 +82,9 @@ def __init__(self, auxiliary=False):
8182
self.worker_cleanup_countdown = random.randint(
8283
int(WORKER_CLEANUP_INTERVAL / 10), WORKER_CLEANUP_INTERVAL
8384
)
85+
# Pubsub handling
86+
self.pubsub_client = pubsub.PostgresPubSub(connection)
87+
self.pubsub_channel_callback = {}
8488

8589
# Add a file descriptor to trigger select on signals
8690
self.sentinel, sentinel_w = os.pipe()
@@ -127,25 +131,6 @@ def _signal_handler(self, thesignal, frame):
127131
)
128132
self.shutdown_requested = True
129133

130-
def _pg_notify_handler(self, notification):
131-
if notification.channel == "pulp_worker_wakeup":
132-
if notification.payload == TASK_WAKEUP_UNBLOCK:
133-
# Auxiliary workers don't do this.
134-
self.wakeup_unblock = not self.auxiliary
135-
elif notification.payload == TASK_WAKEUP_HANDLE:
136-
self.wakeup_handle = True
137-
else:
138-
_logger.warn("Unknown wakeup call recieved. Reason: '%s'", notification.payload)
139-
# We cannot be sure so assume everything happened.
140-
self.wakeup_unblock = not self.auxiliary
141-
self.wakeup_handle = True
142-
143-
elif notification.channel == "pulp_worker_metrics_heartbeat":
144-
self.last_metric_heartbeat = datetime.fromisoformat(notification.payload)
145-
elif self.task and notification.channel == "pulp_worker_cancel":
146-
if notification.payload == str(self.task.pk):
147-
self.cancel_task = True
148-
149134
def handle_worker_heartbeat(self):
150135
"""
151136
Create or update worker heartbeat records.
@@ -205,9 +190,6 @@ def beat(self):
205190
# to be able to report on a congested tasking system to produce reliable results.
206191
self.record_unblocked_waiting_tasks_metric()
207192

208-
def notify_workers(self, reason="unknown"):
209-
self.cursor.execute("SELECT pg_notify('pulp_worker_wakeup', %s)", (reason,))
210-
211193
def cancel_abandoned_task(self, task, final_state, reason=None):
212194
"""Cancel and clean up an abandoned task.
213195
@@ -240,7 +222,7 @@ def cancel_abandoned_task(self, task, final_state, reason=None):
240222
delete_incomplete_resources(task)
241223
task.set_canceled(final_state=final_state, reason=reason)
242224
if task.reserved_resources_record:
243-
self.notify_workers(TASK_WAKEUP_UNBLOCK)
225+
self.pubsub_client.wakeup_worker(reason=TASK_WAKEUP_UNBLOCK)
244226
return True
245227

246228
def is_compatible(self, task):
@@ -280,7 +262,7 @@ def unblock_tasks(self):
280262
self.wakeup_unblock_tasks = False
281263
with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(TASK_UNBLOCKING_LOCK):
282264
if count := self._unblock_tasks():
283-
self.notify_workers(TASK_WAKEUP_HANDLE)
265+
self.pubsub_client.wakeup_worker(TASK_WAKEUP_HANDLE)
284266
return count
285267

286268
def _unblock_tasks(self):
@@ -405,11 +387,11 @@ def sleep(self):
405387
_logger.debug(_("Worker %s entering sleep state."), self.name)
406388
while not self.shutdown_requested and not self.wakeup_handle:
407389
r, w, x = select.select(
408-
[self.sentinel, connection.connection], [], [], self.heartbeat_period.seconds
390+
[self.sentinel, self.pubsub_client], [], [], self.heartbeat_period.seconds
409391
)
410392
self.beat()
411-
if connection.connection in r:
412-
connection.connection.execute("SELECT 1")
393+
if self.pubsub_client in r:
394+
self.pubsub_handle_messages(self.pubsub_client.fetch())
413395
if self.wakeup_unblock:
414396
self.unblock_tasks()
415397
if self.sentinel in r:
@@ -447,14 +429,14 @@ def supervise_task(self, task):
447429
os.kill(task_process.pid, signal.SIGUSR1)
448430

449431
r, w, x = select.select(
450-
[self.sentinel, connection.connection, task_process.sentinel],
432+
[self.sentinel, self.pubsub_client, task_process.sentinel],
451433
[],
452434
[],
453435
self.heartbeat_period.seconds,
454436
)
455437
self.beat()
456-
if connection.connection in r:
457-
connection.connection.execute("SELECT 1")
438+
if self.pubsub_client in r:
439+
self.pubsub_handle_messages(self.pubsub_client.fetch())
458440
if self.cancel_task:
459441
_logger.info(
460442
_("Received signal to cancel current task %s in domain: %s."),
@@ -506,7 +488,7 @@ def supervise_task(self, task):
506488
if cancel_state:
507489
self.cancel_abandoned_task(task, cancel_state, cancel_reason)
508490
if task.reserved_resources_record:
509-
self.notify_workers(TASK_WAKEUP_UNBLOCK)
491+
self.pubsub_client.wakeup_worker(reason=TASK_WAKEUP_UNBLOCK)
510492
self.task = None
511493

512494
def handle_unblocked_tasks(self):
@@ -559,26 +541,60 @@ def _record_unblocked_waiting_tasks_metric(self):
559541
unblocked_tasks_stats["longest_unblocked_waiting_time"].seconds
560542
)
561543

562-
self.cursor.execute(f"NOTIFY pulp_worker_metrics_heartbeat, '{str(now)}'")
544+
self.pubsub_client.record_worker_metrics(now)
545+
546+
def pubsub_handle_messages(self, messages: pubsub.PubsubMessage):
547+
for message in messages:
548+
callback = self.pubsub_channel_callback[message.channel]
549+
callback(message.payload)
550+
551+
def pubsub_setup(self):
552+
def cancellation_callback(message):
553+
if self.task and message == str(self.task.pk):
554+
self.cancel_task = True
555+
556+
def wakeup_callback(message):
557+
if message == TASK_WAKEUP_UNBLOCK:
558+
# Auxiliary workers don't do this.
559+
self.wakeup_unblock = not self.auxiliary
560+
elif message == TASK_WAKEUP_HANDLE:
561+
self.wakeup_handle = True
562+
else:
563+
_logger.warn("Unknown wakeup call recieved. Reason: '%s'", message)
564+
# We cannot be sure so assume everything happened.
565+
self.wakeup_unblock = not self.auxiliary
566+
self.wakeup_handle = True
567+
568+
def metric_callback(message):
569+
self.last_metric_heartbeat = datetime.fromisoformat(message)
570+
571+
self.pubsub_client.subscribe(TASK_PUBSUB.WAKEUP_WORKER)
572+
self.pubsub_channel_callback[TASK_PUBSUB.WAKEUP_WORKER] = wakeup_callback
573+
self.pubsub_client.subscribe(TASK_PUBSUB.CANCEL_TASK)
574+
self.pubsub_channel_callback[TASK_PUBSUB.CANCEL_TASK] = cancellation_callback
575+
self.pubsub_client.subscribe(TASK_PUBSUB.WORKER_METRICS)
576+
self.pubsub_channel_callback[TASK_PUBSUB.WORKER_METRICS] = metric_callback
577+
578+
def pubsub_teardown(self):
579+
self.pubsub_client.unsubscribe(TASK_PUBSUB.WAKEUP_WORKER)
580+
self.pubsub_client.unsubscribe(TASK_PUBSUB.CANCEL_TASK)
581+
self.pubsub_client.unsubscribe(TASK_PUBSUB.WORKER_METRICS)
563582

564583
def run(self, burst=False):
565584
with WorkerDirectory(self.name):
566585
signal.signal(signal.SIGINT, self._signal_handler)
567586
signal.signal(signal.SIGTERM, self._signal_handler)
568587
signal.signal(signal.SIGHUP, self._signal_handler)
569-
# Subscribe to pgsql channels
570-
connection.connection.add_notify_handler(self._pg_notify_handler)
571-
self.cursor.execute("LISTEN pulp_worker_cancel")
572-
self.cursor.execute("LISTEN pulp_worker_metrics_heartbeat")
588+
self.pubsub_setup()
573589
if burst:
590+
self.pubsub_client.unsubscribe(self.pubsub_client.WORKER_WAKEUP)
574591
if not self.auxiliary:
575592
# Attempt to flush the task queue completely.
576593
# Stop iteration if no new tasks were found to unblock.
577594
while self.unblock_tasks():
578595
self.handle_unblocked_tasks()
579596
self.handle_unblocked_tasks()
580597
else:
581-
self.cursor.execute("LISTEN pulp_worker_wakeup")
582598
while not self.shutdown_requested:
583599
# do work
584600
if self.shutdown_requested:
@@ -588,7 +604,5 @@ def run(self, burst=False):
588604
break
589605
# rest until notified to wakeup
590606
self.sleep()
591-
self.cursor.execute("UNLISTEN pulp_worker_wakeup")
592-
self.cursor.execute("UNLISTEN pulp_worker_metrics_heartbeat")
593-
self.cursor.execute("UNLISTEN pulp_worker_cancel")
607+
self.pubsub_teardown()
594608
self.shutdown()

pulpcore/tests/functional/test_pubsub.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
1-
import django
2-
3-
django.setup()
4-
51
from django.db import connection
62
from pulpcore.tasking import pubsub
73
from types import SimpleNamespace
84
import select
9-
import time
105
import pytest
116

127

0 commit comments

Comments
 (0)