From 1cebbf0a4ba5401a36d5f3986475fc627fe0ce97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jordi=20Guti=C3=A9rrez=20Hermoso?= Date: Tue, 29 Jul 2025 14:53:30 -0400 Subject: [PATCH 1/7] FlexServer: use actual getters and setters instead of `setReady` It will be convenient to allow this to be set outside of the Flex Server, so let's allow it. --- app/server/MergedServer.ts | 2 +- app/server/lib/FlexServer.ts | 6 +++++- app/server/lib/GristServer.ts | 4 ++-- app/server/lib/attachEarlyEndpoints.ts | 2 +- test/server/lib/Authorizer.ts | 2 +- 5 files changed, 10 insertions(+), 6 deletions(-) diff --git a/app/server/MergedServer.ts b/app/server/MergedServer.ts index d1467b6e8b..214de01097 100644 --- a/app/server/MergedServer.ts +++ b/app/server/MergedServer.ts @@ -206,7 +206,7 @@ export class MergedServer { await this.flexServer.finalizePlugins(this.hasComponent("home") ? checkUserContentPort() : null); this.flexServer.checkOptionCombinations(); this.flexServer.summary(); - this.flexServer.setReady(true); + this.flexServer.ready = true; if (this._options.extraWorkers) { if (!process.env.REDIS_URL) { diff --git a/app/server/lib/FlexServer.ts b/app/server/lib/FlexServer.ts index 098267ada4..0058fab18f 100644 --- a/app/server/lib/FlexServer.ts +++ b/app/server/lib/FlexServer.ts @@ -1890,7 +1890,7 @@ export class FlexServer implements GristServer { } } - public setReady(value: boolean) { + public set ready(value: boolean) { if(value) { log.debug('FlexServer is ready'); } else { @@ -1899,6 +1899,10 @@ export class FlexServer implements GristServer { this._isReady = value; } + public get ready() { + return this._isReady; + } + public checkOptionCombinations() { // Check for some bad combinations we should warn about. const allowedWebhookDomains = appSettings.section('integrations').flag('allowedWebhookDomains').readString({ diff --git a/app/server/lib/GristServer.ts b/app/server/lib/GristServer.ts index fcb3d7bf9b..2ceaf1ae37 100644 --- a/app/server/lib/GristServer.ts +++ b/app/server/lib/GristServer.ts @@ -52,6 +52,7 @@ export interface StorageCoordinator { export interface GristServer extends StorageCoordinator { readonly create: ICreate; readonly testPending: boolean; + ready: boolean; settings?: IGristCoreConfig; getHost(): string; getHomeUrl(req: express.Request, relPath?: string): string; @@ -103,7 +104,6 @@ export interface GristServer extends StorageCoordinator { isRestrictedMode(): boolean; onUserChange(callback: (change: UserChange) => Promise): void; onStreamingDestinationsChange(callback: (orgId?: number) => Promise): void; - setReady(value: boolean): void; } export interface GristLoginSystem { @@ -163,6 +163,7 @@ export function createDummyGristServer(): GristServer { return { create, testPending: false, + ready: true, settings: loadGristCoreConfig(), getHost() { return 'localhost:4242'; }, getHomeUrl() { return 'http://localhost:4242'; }, @@ -214,7 +215,6 @@ export function createDummyGristServer(): GristServer { onUserChange() { /* do nothing */ }, onStreamingDestinationsChange() { /* do nothing */ }, hardDeleteDoc() { return Promise.resolve(); }, - setReady() { /* do nothing */ }, }; } diff --git a/app/server/lib/attachEarlyEndpoints.ts b/app/server/lib/attachEarlyEndpoints.ts index fb433b8fa0..d28780265d 100644 --- a/app/server/lib/attachEarlyEndpoints.ts +++ b/app/server/lib/attachEarlyEndpoints.ts @@ -113,7 +113,7 @@ export function attachEarlyEndpoints(options: AttachOptions) { }); } // We're going down, so we're no longer ready to serve requests. - gristServer.setReady(false); + gristServer.ready = false; return res.status(200).send({ msg: "ok" }); }) ); diff --git a/test/server/lib/Authorizer.ts b/test/server/lib/Authorizer.ts index f854970caa..f03921d009 100644 --- a/test/server/lib/Authorizer.ts +++ b/test/server/lib/Authorizer.ts @@ -43,7 +43,7 @@ async function activateServer(home: FlexServer, docManager: DocManager) { home.addApiErrorHandlers(); home.finalizeEndpoints(); await home.finalizePlugins(null); - home.setReady(true); + home.ready = true; serverUrl = home.getOwnUrl(); } From f11e75ba7b78fa5a89a66424977cbc9c655bf763 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jordi=20Guti=C3=A9rrez=20Hermoso?= Date: Tue, 29 Jul 2025 14:54:32 -0400 Subject: [PATCH 2/7] PubSubManager: allow fetching the client There is already logic in here for defining a client or not depending if a Redis connexion is available or not, so let's reuse it and expose the publishing client, which is available. --- app/server/lib/PubSubManager.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/app/server/lib/PubSubManager.ts b/app/server/lib/PubSubManager.ts index 8de879f2e2..83e347b1c8 100644 --- a/app/server/lib/PubSubManager.ts +++ b/app/server/lib/PubSubManager.ts @@ -31,6 +31,7 @@ export interface IPubSubManager { subscribe(channel: string, callback: Callback): UnsubscribeCallbackPromise; publish(channel: string, message: string): Promise; publishBatch(batch: Array<{channel: string, message: string}>): Promise; + getClient(): IORedis|undefined; } export type Callback = (message: string) => void; @@ -106,6 +107,8 @@ abstract class PubSubManagerBase implements IPubSubManager { */ public abstract publishBatch(batch: Array<{channel: string, message: string}>): Promise; + public abstract getClient(): IORedis|undefined; + protected abstract _redisSubscribe(channel: string): Promise; protected abstract _redisUnsubscribe(channel: string): Promise; @@ -133,6 +136,7 @@ class PubSubManagerNoRedis extends PubSubManagerBase { public async publishBatch(batch: Array<{channel: string, message: string}>) { batch.forEach(({channel, message}) => this._deliverMessage(channel, message)); } + public getClient(): IORedis|undefined { return; } protected async _redisSubscribe(channel: string): Promise {} protected async _redisUnsubscribe(channel: string): Promise {} } @@ -182,6 +186,12 @@ class PubSubManagerRedis extends PubSubManagerBase { await pipeline.exec(); } + public getClient(): IORedis|undefined { + // The redisSub client is already tied listening to a channel, but + // the redisPub is "free" for the client to mess around with. + return this._redisPub; + } + protected async _redisSubscribe(channel: string): Promise { await this._redisSub.subscribe(this._prefixChannel(channel)); } From 4e9a3f7f31c612829625c661f83e8b7de7196ab7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jordi=20Guti=C3=A9rrez=20Hermoso?= Date: Tue, 29 Jul 2025 14:55:30 -0400 Subject: [PATCH 3/7] HealthChecker: new class for multi-server checks This class can verify that every Grist instance responds as "ready" via Redis pubsub. --- app/server/lib/HealthChecker.ts | 140 ++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 app/server/lib/HealthChecker.ts diff --git a/app/server/lib/HealthChecker.ts b/app/server/lib/HealthChecker.ts new file mode 100644 index 0000000000..f5ab8f8d56 --- /dev/null +++ b/app/server/lib/HealthChecker.ts @@ -0,0 +1,140 @@ +import {GristServer} from 'app/server/lib/GristServer'; +import log from 'app/server/lib/log'; +import {createPubSubManager, IPubSubManager} from 'app/server/lib/PubSubManager'; +import * as shutdown from 'app/server/lib/shutdown'; + +import {v4 as uuidv4} from 'uuid'; + +// Not to be confused with health checks from the frontend, these +// request/response pairs are internal checks between Grist instances +// in multi-server environments +interface ServerHealthcheckRequest { + id: string; + instanceId: string; + checkReady: boolean; +} +interface ServerHealthcheckResponse { + instanceId: string; + requestId: string; + healthy: boolean; +} + +// For keeping track of pending health checks for all other servers +// for each request that was broadcast to all of them. +interface PendingServerHealthCheck { + expectedCount: number; + responses: Record; + resolve: (res: boolean) => void; + reject: (err: Error) => void; + timeout: NodeJS.Timeout; +} + +/** This class uses pubsub via Redis, if available, to register this + * Grist instance and check that all other instances are healthy. + * + * In single-server instances, it also works without Redis, leveraging + * the dummy defaults of `PubSubManager`. + */ +export class HealthChecker { + private _pendingServerHealthChecks: Map; + private _serverInstanceID: string; + private _pubSubManager: IPubSubManager; + + constructor( + private _server: GristServer + ) { + this._pubSubManager = createPubSubManager(process.env.REDIS_URL); + this._pendingServerHealthChecks = new Map(); + this._serverInstanceID = process.env.GRIST_INSTANCE_ID || `testInsanceId_${this._server.getHost()}`; + this._pubSubManager.getClient()?.sadd('grist-instances', this._serverInstanceID).catch((err) => { + log.error('Failed to contact redis', err); + }); + this._subscribeToChannels(); + + // Make sure we clean up our Redis mess, if any, even if we exit + // by signal. + shutdown.addCleanupHandler(null, () => this.close()); + } + + + /** This returns a promise that resolves to `true` when all other + * registered instances must respond as healthy within the given + * timeout. + * + * @param {number} timeout - number of milliseconds to wait for + * responses from all servers before timeout + * + * @param {boolean} checkReady - whether to insist on `ready` status + * or just a simple health check + */ + public async allServersOkay(timeout: number, checkReady: boolean): Promise { + const requestId = uuidv4(); + const client = this._pubSubManager.getClient(); + + // If there is no Redis, then our current instance is the only instance + const allInstances = await client?.smembers('grist-instances') || [this._serverInstanceID]; + + const allInstancesPromise: Promise = new Promise((resolve: (res: boolean) => void, reject) => { + const allInstancesTimeout = setTimeout(() => { + log.warn('allServersOkay: timeout waiting for responses'); + reject(new Error('Timeout waiting for health responses')); + this._pendingServerHealthChecks.delete(requestId); + }, timeout); + + this._pendingServerHealthChecks.set(requestId, { + responses: {}, + expectedCount: allInstances.length, + resolve, + reject, + timeout: allInstancesTimeout, + }); + }).catch(() => false); + const request: ServerHealthcheckRequest = { + id: requestId, + instanceId: this._serverInstanceID, + checkReady, + }; + await this._pubSubManager.publish('healthcheck:requests', JSON.stringify(request)); + return allInstancesPromise; + } + + public async close() { + await this._pubSubManager.getClient()?.srem('grist-instances', [this._serverInstanceID]); + await this._pubSubManager.close(); + } + + private _subscribeToChannels() { + this._pubSubManager.subscribe('healthcheck:requests', async (message) => { + const request: ServerHealthcheckRequest = JSON.parse(message); + const response: ServerHealthcheckResponse = { + instanceId: this._serverInstanceID|| '', + requestId: request.id, + healthy: !request.checkReady || this._server.ready, + }; + log.debug('allServersOkay request', response); + await this._pubSubManager.publish(`healthcheck:responses-${request.instanceId}`, JSON.stringify(response)); + }); + + this._pubSubManager.subscribe(`healthcheck:responses-${this._serverInstanceID}`, (message) => { + const response: ServerHealthcheckResponse = JSON.parse(message); + const pending = this._pendingServerHealthChecks.get(response.requestId); + if (!pending) { + // This instance didn't broadcast a health check request with + // this requestId, so nothing to do. + return; + } + + pending.responses[response.instanceId] = response.healthy; + log.debug( + `allServersOkay cleared pending response on ${this._serverInstanceID} for ${response.instanceId}` + ); + + if (Object.keys(pending.responses).length === pending.expectedCount) { + // All servers have replied. Make it known and clean up. + clearTimeout(pending.timeout); + pending.resolve(Object.values(pending.responses).every(e => e)); + this._pendingServerHealthChecks.delete(response.requestId); + } + }); + } +} From da702d4b55914f66e1ab711af38b059bfab109dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jordi=20Guti=C3=A9rrez=20Hermoso?= Date: Tue, 29 Jul 2025 15:23:16 -0400 Subject: [PATCH 4/7] FlexServer: enable new allInstancesReady option in status endpoint This new option uses the `HealthChecker` class from before. --- app/server/lib/FlexServer.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/app/server/lib/FlexServer.ts b/app/server/lib/FlexServer.ts index 0058fab18f..03a0f2d237 100644 --- a/app/server/lib/FlexServer.ts +++ b/app/server/lib/FlexServer.ts @@ -101,6 +101,7 @@ import {AddressInfo} from 'net'; import fetch from 'node-fetch'; import * as path from 'path'; import * as serveStatic from 'serve-static'; +import { HealthChecker } from './HealthChecker'; // Health checks are a little noisy in the logs, so we don't show them all. // We show the first N health checks: @@ -213,6 +214,7 @@ export class FlexServer implements GristServer { private _emitNotifier = new EmitNotifier(); private _testPendingNotifications: number = 0; private _latestVersionAvailable?: LatestVersionAvailable; + private _healthChecker: HealthChecker; constructor(public port: number, public name: string = 'flexServer', public readonly options: FlexServerOptions = {}) { @@ -274,6 +276,8 @@ export class FlexServer implements GristServer { this.setLatestVersionAvailable(latestVersionAvailable); }); + this._healthChecker = new HealthChecker(this); + // The electron build is not supported at this time, but this stub // implementation of electronServerMethods is present to allow kicking // its tires. @@ -600,6 +604,9 @@ export class FlexServer implements GristServer { if (isParameterOn(req.query.ready)) { checks.set('ready', this._isReady); } + if (isParameterOn(req.query.allInstancesReady)) { + checks.set('allInstancesReady', this._healthChecker.allServersOkay(timeout, true)); + } let extra = ''; let ok = true; // If we had any extra check, collect their status to report them. @@ -1059,6 +1066,7 @@ export class FlexServer implements GristServer { if (this.httpsServer) { this.httpsServer.close(); } if (this.housekeeper) { await this.housekeeper.stop(); } if (this._jobs) { await this._jobs.stop(); } + await this._healthChecker.close(); await this._shutdown(); if (this._accessTokens) { await this._accessTokens.close(); } // Do this after _shutdown, since DocWorkerMap is used during shutdown. From 2b77efb54ede4f9df01927444c631bce12d18591 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jordi=20Guti=C3=A9rrez=20Hermoso?= Date: Tue, 29 Jul 2025 15:25:16 -0400 Subject: [PATCH 5/7] ConfigAPI: switch to the allInstancesReady=1 parameter This works across all instances instead of just reporting if the instance serving the request is reaady. --- app/common/ConfigAPI.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/common/ConfigAPI.ts b/app/common/ConfigAPI.ts index 7aab5c15c4..3cf8bbe136 100644 --- a/app/common/ConfigAPI.ts +++ b/app/common/ConfigAPI.ts @@ -26,7 +26,7 @@ export class ConfigAPI extends BaseAPI { } public async healthcheck(): Promise { - const resp = await this.request(`${this._homeUrl}/status?ready=1`); + const resp = await this.request(`${this._homeUrl}/status?allInstancesReady=1`); if (!resp.ok) { throw new Error(await resp.text()); } From 26eba14a4cd3113ff8664dede14a206e552d264b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jordi=20Guti=C3=A9rrez=20Hermoso?= Date: Tue, 29 Jul 2025 15:23:50 -0400 Subject: [PATCH 6/7] tests: Factor out RedisForwarder This connexion forwarder specifically tailored for redis will be useful for other multi-server Redis-related tests, so let's factor it out. --- test/gen-server/lib/HealthCheck.ts | 18 +++++------------- test/server/tcpForwarder.ts | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/test/gen-server/lib/HealthCheck.ts b/test/gen-server/lib/HealthCheck.ts index d2fa6441db..b643cd1606 100644 --- a/test/gen-server/lib/HealthCheck.ts +++ b/test/gen-server/lib/HealthCheck.ts @@ -1,7 +1,7 @@ import { assert } from 'chai'; import fetch from 'node-fetch'; import { TestServer } from 'test/gen-server/apiUtils'; -import { TcpForwarder } from 'test/server/tcpForwarder'; +import { RedisForwarder } from 'test/server/tcpForwarder'; import * as testUtils from 'test/server/testUtils'; import { waitForIt } from 'test/server/wait'; @@ -12,22 +12,14 @@ describe('HealthCheck', function() { describe(serverType, function() { let server: TestServer; let oldEnv: testUtils.EnvironmentSnapshot; - let redisForwarder: TcpForwarder; + let redisForwarder: RedisForwarder; before(async function() { oldEnv = new testUtils.EnvironmentSnapshot(); - // We set up Redis via a TcpForwarder, so that we can simulate disconnects. - if (!process.env.TEST_REDIS_URL) { - throw new Error("TEST_REDIS_URL is expected"); - } - const redisUrl = new URL(process.env.TEST_REDIS_URL); - const redisPort = parseInt(redisUrl.port, 10) || 6379; - redisForwarder = new TcpForwarder(redisPort, redisUrl.host); - const forwarderPort = await redisForwarder.pickForwarderPort(); - await redisForwarder.connect(); - - process.env.REDIS_URL = `redis://localhost:${forwarderPort}`; + // We set up Redis via a forwarder, so that we can simulate disconnects. + redisForwarder = await RedisForwarder.create(); + process.env.REDIS_URL = `redis://localhost:${redisForwarder.port}`; server = new TestServer(this); await server.start([serverType]); }); diff --git a/test/server/tcpForwarder.ts b/test/server/tcpForwarder.ts index 3e2fe69da7..98d0032191 100644 --- a/test/server/tcpForwarder.ts +++ b/test/server/tcpForwarder.ts @@ -59,3 +59,21 @@ async function destroySock(sock: Socket): Promise { sock.on('close', resolve).destroy()); } } + +export class RedisForwarder extends TcpForwarder { + public static async create() { + if (!process.env.TEST_REDIS_URL) { + throw new Error("TEST_REDIS_URL is expected"); + } + + const url = new URL(process.env.TEST_REDIS_URL); + const port = parseInt(url.port, 10) || 6379; + const forwarder = new RedisForwarder(port, url.hostname); + await forwarder.pickForwarderPort(); + await forwarder.connect(); + forwarder.redisUrl = `redis://localhost:${forwarder.port}${url.pathname}`; + return forwarder; + } + + public redisUrl: string; +} From 6c39891f16c8cf55ff23a489ec15ad0409787116 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jordi=20Guti=C3=A9rrez=20Hermoso?= Date: Wed, 30 Jul 2025 11:01:03 -0400 Subject: [PATCH 7/7] tests: Add tests for health checker --- test/server/lib/HealthChecker.ts | 137 +++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 test/server/lib/HealthChecker.ts diff --git a/test/server/lib/HealthChecker.ts b/test/server/lib/HealthChecker.ts new file mode 100644 index 0000000000..1a68beeac9 --- /dev/null +++ b/test/server/lib/HealthChecker.ts @@ -0,0 +1,137 @@ +import {TestServer} from 'test/server/lib/helpers/TestServer'; +import {RedisForwarder} from 'test/server/tcpForwarder'; +import * as testUtils from 'test/server/testUtils'; +import {prepareFilesystemDirectoryForTests} from 'test/server/lib/helpers/PrepareFilesystemDirectoryForTests'; +import {prepareDatabase} from 'test/server/lib/helpers/PrepareDatabase'; + +import fetch from 'node-fetch'; +import {assert} from 'chai'; +import IORedis from "ioredis"; + +import * as path from 'path'; +import {tmpdir} from 'os'; + +const username = process.env.USER || "nobody"; +const tmpDir = path.join(tmpdir(), `grist_test_${username}_health_checker`); + +describe('HealthChecker', function() { + testUtils.setTmpLogLevel('error'); + this.timeout(10_000); + + let servers: {server: TestServer, forwarder?: RedisForwarder}[] = []; + let client: IORedis; + + before(async function() { + await prepareFilesystemDirectoryForTests(tmpDir); + await prepareDatabase(tmpDir); + + if(process.env.TEST_REDIS_URL) { + client = new IORedis(process.env.TEST_REDIS_URL); + await client.flushall(); + + for(let i = 0; i < 3; i++) { + // Use a forwarder for each server so we can simulate + // disconnects + const forwarder = await RedisForwarder.create(); + + const server = await TestServer.startServer('home', tmpDir, `with-redis-${i}`, { + REDIS_URL: forwarder.redisUrl, + GRIST_INSTANCE_ID: `test-instance-${i}`, + }); + servers.push({server, forwarder}); + } + } + else { + servers = [{server: await TestServer.startServer('home', tmpDir, 'without-redis')}]; + } + }); + + after(async function () { + await Promise.all(servers.map(async (pair) => { + await pair.server.stop(); + await pair.forwarder?.disconnect(); + })); + client?.disconnect(); + }); + + it('registers all servers', async function () { + if(!process.env.TEST_REDIS_URL) { + this.skip(); + } + + const instances = await client.smembers('grist-instances'); + instances.sort(); + assert.deepEqual(instances, ['test-instance-0', 'test-instance-1', 'test-instance-2']); + }); + + it('reports healthy when all servers are healthy', async function () { + const server = servers[process.env.TEST_REDIS_URL ? 2 : 0].server; + const resp = await fetch(`${server.serverUrl}/status?allInstancesReady=1`); + assert.equal(resp.status, 200); + assert.match(await resp.text(), /allInstancesReady ok/); + }); + + it('reports not healthy when one server is not healthy', async function () { + if(!process.env.TEST_REDIS_URL) { + this.skip(); + } + + const downServer = servers[2]; + await downServer.forwarder?.disconnect(); + + const server = servers[0].server; + const resp = await fetch(`${server.serverUrl}/status?allInstancesReady=1&timeout=500`); + const text = await resp.text(); + assert.equal(resp.status, 500); + assert.match(text, /allInstancesReady not ok/); + + await downServer.forwarder?.connect(); + }); + + it('reports healthy when one server is cleanly disconnected', async function () { + if(!process.env.TEST_REDIS_URL) { + this.skip(); + } + + const downServer = servers[2]; + await downServer.server.stop(); + await downServer.forwarder?.disconnect(); + servers.pop(); + + const server = servers[0].server; + const resp = await fetch(`${server.serverUrl}/status?allInstancesReady=1`); + const text = await resp.text(); + assert.equal(resp.status, 200); + assert.match(text, /allInstancesReady ok/); + }); + + it('checks when a new server comes back', async function() { + if(!process.env.TEST_REDIS_URL) { + this.skip(); + } + + let instances = await client.smembers('grist-instances'); + instances.sort(); + assert.deepEqual(instances, ['test-instance-0', 'test-instance-1']); + + const newForwarder = await RedisForwarder.create(); + await newForwarder.connect(); + const newServer = await TestServer.startServer('home', tmpDir, `with-redis-3`, { + REDIS_URL: newForwarder.redisUrl, + GRIST_INSTANCE_ID: `test-instance-3`, + }); + + instances = await client.smembers('grist-instances'); + instances.sort(); + assert.deepEqual(instances, ['test-instance-0', 'test-instance-1', 'test-instance-3']); + + const server = servers[0].server; + const resp = await fetch(`${server.serverUrl}/status?allInstancesReady=1`); + const text = await resp.text(); + assert.equal(resp.status, 200); + assert.match(text, /allInstancesReady ok/); + + await newServer.stop(); + await newForwarder.disconnect(); + }); +});