Skip to content

Commit d976358

Browse files
committed
wip
1 parent d6df41b commit d976358

File tree

4 files changed

+109
-80
lines changed

4 files changed

+109
-80
lines changed

pulpcore/app/models/task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class Task(BaseModel, AutoAddObjPermsMixin):
8989
9090
The transitions to CANCELING (marked with *) are the only ones allowed to happen without
9191
holding the tasks advisory lock. Canceling is meant to be initiated asyncronously by a sparate
92-
process before signalling the worker via Postgres LISTEN.
92+
process before signalling the worker via a pubsub notification (e.g, Postgres LISTEN).
9393
9494
Fields:
9595

pulpcore/app/pubsub.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import logging
2+
3+
logger = logging.getLogger(__name__)
4+
5+
6+
def wakeup_worker(pubsub_backend):
7+
pubsub_backend.publish(BasePubSubBackend.WORKER_WAKEUP)
8+
9+
10+
def cancel_task(task_pk, pubsub_backend):
11+
pubsub_backend.publish(BasePubSubBackend.TASK_CANCELLATION, str(task_pk))
12+
13+
14+
def record_worker_metrics(pubsub_backend, now):
15+
pubsub_backend.publish(BasePubSubBackend.WORKER_METRIC, str(now))
16+
17+
18+
class BasePubSubBackend:
19+
WORKER_WAKEUP = "pulp_worker_wakeup"
20+
TASK_CANCELLATION = "pulp_worker_cancel"
21+
WORKER_METRIC = "pulp_worker_metrics_heartbeat"
22+
23+
def subscribe(self, channel, callback):
24+
raise NotImplementedError()
25+
26+
def unsubscribe(self, channel):
27+
raise NotImplementedError()
28+
29+
def publish(self, channel, message=None):
30+
raise NotImplementedError()
31+
32+
def fileno(self):
33+
"""Add support for being used in select loop."""
34+
raise NotImplementedError()
35+
36+
def fetch(self):
37+
"""Fetch messages new message, if required."""
38+
raise NotImplementedError()
39+
40+
def close(self):
41+
raise NotImplementedError()
42+
43+
44+
class PostgresPubSub(BasePubSubBackend):
45+
46+
def __init__(self, connection):
47+
logger.info("Initialized pubsub")
48+
self.cursor = connection.cursor()
49+
self.connection = connection.connection
50+
self.listening_callback = {}
51+
self.connection.add_notify_handler(self._notification_handler)
52+
53+
def _notification_handler(self, notification):
54+
logger.info(f"Handling notification: {notification}")
55+
callback = self.listening_callback[notification.channel]
56+
callback(message=notification.payload)
57+
58+
def subscribe(self, channel, callback):
59+
self.listening_callback[channel] = callback
60+
self.cursor.execute(f"LISTEN {channel}")
61+
62+
def unsubscribe(self, channel):
63+
self.cursor.execute(f"UNLISTEN {channel}")
64+
65+
def publish(self, channel, message=None):
66+
logger.info(f"Publish: {channel}: {message}")
67+
if not message:
68+
self.cursor.execute(f"NOTIFY {channel}")
69+
else:
70+
self.cursor.execute("SELECT pg_notify(%s, %s)", (channel, message))
71+
72+
def fileno(self):
73+
return self.connection.fileno()
74+
75+
def fetch(self):
76+
logger.info("Fetching")
77+
self.connection.execute("SELECT 1").fetchone()
78+
79+
def close(self):
80+
logger.info("Closing")
81+
self.cursor.close()
82+
83+
def __enter__(self):
84+
return self
85+
86+
def __exit__(self, exc_type, exc_value, traceback):
87+
self.cursor.close()

pulpcore/tasking/tasks.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from pulpcore.app.apps import MODULE_PLUGIN_VERSIONS
2020
from pulpcore.app.models import Task, TaskGroup
2121
from pulpcore.app.util import current_task, get_domain, get_prn, deprecation_logger
22+
from pulpcore.app import pubsub
2223
from pulpcore.constants import (
2324
TASK_FINAL_STATES,
2425
TASK_INCOMPLETE_STATES,
@@ -47,12 +48,6 @@ def _validate_and_get_resources(resources):
4748
return list(resource_set)
4849

4950

50-
def wakeup_worker():
51-
# Notify workers
52-
with connection.connection.cursor() as cursor:
53-
cursor.execute("NOTIFY pulp_worker_wakeup")
54-
55-
5651
def execute_task(task):
5752
# This extra stack is needed to isolate the current_task ContextVar
5853
contextvars.copy_context().run(_execute_task, task)
@@ -308,10 +303,18 @@ def dispatch(
308303
task.set_canceling()
309304
task.set_canceled(TASK_STATES.CANCELED, "Resources temporarily unavailable.")
310305
if notify_workers:
311-
wakeup_worker()
306+
pubsub_backend = pubsub.PostgresPubSub(connection)
307+
pubsub.wakeup_worker(pubsub_backend)
308+
pubsub_backend.close()
312309
return task
313310

314311

312+
def wakeup_worker():
313+
pubsub_backend = pubsub.PostgresPubSub(connection)
314+
pubsub.wakeup_worker(pubsub_backend)
315+
pubsub_backend.close()
316+
317+
315318
def cancel_task(task_id):
316319
"""
317320
Cancel the task that is represented by the given task_id.
@@ -342,9 +345,10 @@ def cancel_task(task_id):
342345
# This is the only valid transition without holding the task lock
343346
task.set_canceling()
344347
# Notify the worker that might be running that task and other workers to clean up
345-
with connection.cursor() as cursor:
346-
cursor.execute("SELECT pg_notify('pulp_worker_cancel', %s)", (str(task.pk),))
347-
cursor.execute("NOTIFY pulp_worker_wakeup")
348+
pubsub_backend = pubsub.PostgresPubSub(connection)
349+
pubsub.cancel_task(pubsub_backend, task_pk=task.pk)
350+
pubsub.wakeup_worker(pubsub_backend)
351+
pubsub_backend.close()
348352
return task
349353

350354

pulpcore/tasking/worker.py

Lines changed: 7 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from pulpcore.app.apps import pulp_plugin_configs
3030
from pulpcore.app.models import Worker, Task, ApiAppStatus, ContentAppStatus
3131
from pulpcore.app.util import PGAdvisoryLock, get_domain
32+
from pulpcore.app import pubsub
3233
from pulpcore.exceptions import AdvisoryLockError
3334

3435
from pulpcore.tasking.storage import WorkerDirectory
@@ -56,70 +57,6 @@
5657
THRESHOLD_UNBLOCKED_WAITING_TIME = 5
5758

5859

59-
class BasePubSubBackend:
60-
WORKER_WAKEUP = "pulp_worker_wakeup"
61-
TASK_CANCELLATION = "pulp_worker_cancel"
62-
WORKER_METRIC = "pulp_worker_metrics_heartbeat"
63-
64-
def wakeup_workers(self):
65-
self.publish(self.WORKER_WAKEUP)
66-
67-
def cancel_task(self):
68-
self.publish(self.TASK_CANCELLATION)
69-
70-
def record_worker_metrics(self, now):
71-
self.publish(self.WORKER_METRIC, str(now))
72-
73-
# Specific implementation
74-
def subscribe(self, channel, callback):
75-
raise NotImplementedError()
76-
77-
def unsubscribe(self, channel):
78-
raise NotImplementedError()
79-
80-
def publish(self, channel, message=None):
81-
raise NotImplementedError()
82-
83-
def fileno(self):
84-
"""Add support for being used in select loop."""
85-
raise NotImplementedError()
86-
87-
def fetch(self):
88-
"""Fetch messages new message, if required."""
89-
raise NotImplementedError()
90-
91-
92-
class PostgresPubSub(BasePubSubBackend):
93-
94-
def __init__(self):
95-
self.cursor = connection.cursor()
96-
self.listening_callback = {}
97-
98-
def _notification_handler(self, notification):
99-
callback = self.listening_callback[notification.channel]
100-
callback(message=notification.payload)
101-
102-
def subscribe(self, channel, callback):
103-
self.listening_callback[channel] = callback
104-
self.cursor.execute(f"LISTEN {channel}")
105-
connection.connection.add_notify_handler(self._notification_handler)
106-
107-
def unsubscribe(self, channel):
108-
self.cursor.execute(f"UNLISTEN {channel}")
109-
110-
def publish(self, channel, message=None):
111-
if not message:
112-
self.cursor.execute(f"NOTIFY {channel}")
113-
else:
114-
self.cursor.execute(f"NOTIFY {channel}, {message}")
115-
116-
def fileno(self):
117-
return connection.connection.fileno()
118-
119-
def fetch(self):
120-
connection.connection.execute("SELECT 1")
121-
122-
12360
class PulpcoreWorker:
12461
def __init__(self):
12562
# Notification states from several signal handlers
@@ -132,7 +69,7 @@ def __init__(self):
13269
self.heartbeat_period = timedelta(seconds=settings.WORKER_TTL / 3)
13370
self.last_metric_heartbeat = timezone.now()
13471
self.versions = {app.label: app.version for app in pulp_plugin_configs()}
135-
self.pubsub_backend = PostgresPubSub()
72+
self.pubsub_backend = pubsub.PostgresPubSub(connection)
13673
self.worker = self.handle_worker_heartbeat()
13774
# This defaults to immediate task cancellation.
13875
# It will be set into the future on moderately graceful worker shutdown,
@@ -277,7 +214,7 @@ def cancel_abandoned_task(self, task, final_state, reason=None):
277214
delete_incomplete_resources(task)
278215
task.set_canceled(final_state=final_state, reason=reason)
279216
if task.reserved_resources_record:
280-
self.pubsub_backend.wakeup_workers()
217+
pubsub.wakeup_worker(self.pubsub_backend)
281218
return True
282219

283220
def is_compatible(self, task):
@@ -425,6 +362,7 @@ def sleep(self):
425362
)
426363
self.beat()
427364
if self.pubsub_backend in r:
365+
breakpoint()
428366
self.pubsub_backend.fetch()
429367
if self.sentinel in r:
430368
os.read(self.sentinel, 256)
@@ -524,7 +462,7 @@ def supervise_task(self, task):
524462
if cancel_state:
525463
self.cancel_abandoned_task(task, cancel_state, cancel_reason)
526464
if task.reserved_resources_record:
527-
self.pubsub_backend.wakeup_workers()
465+
pubsub.wakeup_worker(self.pubsub_backend)
528466
self.task = None
529467

530468
def handle_available_tasks(self):
@@ -581,11 +519,11 @@ def _record_unblocked_waiting_tasks_metric(self):
581519
unblocked_tasks_stats["longest_unblocked_waiting_time"].seconds
582520
)
583521

584-
self.pubsub_backend.record_worker_metrics(str(now))
522+
pubsub.record_worker_metrics(self.pubsub_backend, now)
585523

586524
def pubsub_setup(self):
587525
def cancellation_callback(message):
588-
if message == str(self.task.pk):
526+
if self.task and message == str(self.task.pk):
589527
self.cancel_task = True
590528

591529
def wakeup_callback(message):

0 commit comments

Comments
 (0)