Skip to content

Commit a508837

Browse files
committed
Some adjustments to pubsub module
1 parent 4da5011 commit a508837

File tree

2 files changed

+20
-13
lines changed

2 files changed

+20
-13
lines changed

pulpcore/tasking/pubsub.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ def subscribe(self, channel):
3030
def unsubscribe(self, channel):
3131
raise NotImplementedError()
3232

33+
def get_subscriptions(self):
34+
raise NotImplementedError()
35+
3336
@classmethod
3437
def publish(cls, channel, payload=None):
3538
raise NotImplementedError()
@@ -57,35 +60,35 @@ def drain_non_blocking_fd(fd):
5760
os.read(fd, 256)
5861

5962

60-
PID = os.getpid()
61-
62-
6363
class PostgresPubSub(BasePubSubBackend):
64+
PID = os.getpid()
6465

6566
def __init__(self):
6667
self._subscriptions = set()
6768
self.message_buffer = []
68-
self.cursor = connection.cursor()
69+
# ensures a connection is initialized
70+
with connection.cursor() as cursor:
71+
cursor.execute("select 1")
6972
self.backend_pid = connection.connection.info.backend_pid
70-
# logger.info(f"{connection.connection.info.backend_pid=}")
7173
self.sentinel_r, self.sentinel_w = os.pipe()
7274
os.set_blocking(self.sentinel_r, False)
7375
os.set_blocking(self.sentinel_w, False)
7476
connection.connection.add_notify_handler(self._store_messages)
7577

78+
@classmethod
79+
def _debug(cls, message):
80+
logger.debug(f"[{cls.PID}] {message}")
81+
7682
def _store_messages(self, notification):
77-
# logger.info(f"[{PID}] Received message: {notification}")
7883
self.message_buffer.append(
7984
PubsubMessage(channel=notification.channel, payload=notification.payload)
8085
)
8186
if notification.pid == self.backend_pid:
8287
os.write(self.sentinel_w, b"1")
83-
84-
def get_subscriptions(self):
85-
return self._subscriptions.copy()
88+
self._debug(f"Received message: {notification}")
8689

8790
@classmethod
88-
def publish(cls, channel, payload=None):
91+
def publish(cls, channel, payload=""):
8992
query = (
9093
(f"NOTIFY {channel}",)
9194
if not payload
@@ -94,6 +97,7 @@ def publish(cls, channel, payload=None):
9497

9598
with connection.cursor() as cursor:
9699
cursor.execute(*query)
100+
cls._debug(f"Sent message: ({channel}, {str(payload)})")
97101

98102
def subscribe(self, channel):
99103
self._subscriptions.add(channel)
@@ -108,7 +112,12 @@ def unsubscribe(self, channel):
108112
with connection.cursor() as cursor:
109113
cursor.execute(f"UNLISTEN {channel}")
110114

115+
def get_subscriptions(self):
116+
return self._subscriptions.copy()
117+
111118
def fileno(self) -> int:
119+
# when pub/sub clients are the same, the notification callback may be called
120+
# asynchronously, making select on connection miss new notifications
112121
ready, _, _ = select.select([self.sentinel_r], [], [], 0)
113122
if self.sentinel_r in ready:
114123
return self.sentinel_r
@@ -120,7 +129,7 @@ def fetch(self) -> list[PubsubMessage]:
120129
result = self.message_buffer.copy()
121130
self.message_buffer.clear()
122131
drain_non_blocking_fd(self.sentinel_r)
123-
# logger.info(f"[{PID}] Fetched messages: {result}")
132+
self._debug(f"Fetched messages: {result}")
124133
return result
125134

126135
def close(self):

pulpcore/tasking/worker.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -607,12 +607,10 @@ def run(self, burst=False):
607607
# do work
608608
if self.shutdown_requested:
609609
break
610-
# _logger.info(_("=== Worker %s will handle unblocked tasks. ==="), self.name)
611610
self.handle_unblocked_tasks()
612611
if self.shutdown_requested:
613612
break
614613
# rest until notified to wakeup
615-
# _logger.info(_("*** Worker %s entering sleep state. ***"), self.name)
616614
self.sleep()
617615
self.pubsub_teardown()
618616
self.shutdown()

0 commit comments

Comments
 (0)