Skip to content

Commit 811c478

Browse files
committed
Fix: Improve PubSub resilience and exception handling
This commit introduces several improvements to the Pub/Sub system to make it more resilient to connection issues and to handle exceptions more gracefully. The following changes are included: - A `health_check_interval` is added to the Redis connection to keep it alive and prevent timeouts. - Reconnection logic is implemented in the `listen` method. If the connection to Redis is lost, the client will now attempt to reconnect and re-subscribe to the channel automatically. - The exception handling in the `listen` method is made more specific, catching `ConnectionError` for reconnection and other `RedisError` exceptions for graceful exit. These changes improve the overall stability and robustness of the Pub/Sub system, especially in environments where the Redis server might be restarted or the network connection is not perfectly stable. Signed-off-by: Denys Fedoryshchenko <[email protected]>
1 parent 24066ca commit 811c478

File tree

1 file changed

+19
-4
lines changed

1 file changed

+19
-4
lines changed

api/pubsub.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ def __init__(self, host=None, db_number=None):
3939
host = self._settings.redis_host
4040
if db_number is None:
4141
db_number = self._settings.redis_db_number
42-
self._redis = aioredis.from_url(f'redis://{host}/{db_number}')
42+
self._redis = aioredis.from_url(
43+
f'redis://{host}/{db_number}', health_check_interval=30
44+
)
4345
# self._subscriptions is a dict that matches a subscription id
4446
# (key) with a Subscription object ('sub') and a redis
4547
# PubSub object ('redis_sub'). For instance:
@@ -135,9 +137,22 @@ async def listen(self, sub_id, user=None):
135137
f"not owned by {user}")
136138
while True:
137139
self._subscriptions[sub_id]['last_poll'] = datetime.utcnow()
138-
msg = await sub['redis_sub'].get_message(
139-
ignore_subscribe_messages=True, timeout=1.0
140-
)
140+
msg = None
141+
try:
142+
msg = await sub['redis_sub'].get_message(
143+
ignore_subscribe_messages=True, timeout=1.0
144+
)
145+
except aioredis.ConnectionError:
146+
async with self._lock:
147+
channel = self._subscriptions[sub_id]['sub'].channel
148+
new_redis_sub = self._redis.pubsub()
149+
await new_redis_sub.subscribe(channel)
150+
self._subscriptions[sub_id]['redis_sub'] = new_redis_sub
151+
sub['redis_sub'] = new_redis_sub
152+
continue
153+
except aioredis.RedisError:
154+
return None # Handle any exceptions gracefully
155+
141156
if msg is None:
142157
continue
143158
msg_data = json.loads(msg['data'])

0 commit comments

Comments
 (0)