diff --git a/packages/@webex/internal-plugin-llm/src/constants.ts b/packages/@webex/internal-plugin-llm/src/constants.ts index b985f98c75e..5f78f380d89 100644 --- a/packages/@webex/internal-plugin-llm/src/constants.ts +++ b/packages/@webex/internal-plugin-llm/src/constants.ts @@ -1,2 +1,4 @@ // eslint-disable-next-line import/prefer-default-export export const LLM = 'llm'; + +export const LLM_DEFAULT_SESSION = 'llm-default-session'; diff --git a/packages/@webex/internal-plugin-llm/src/llm.ts b/packages/@webex/internal-plugin-llm/src/llm.ts index a9a1679a50b..14c315e4833 100644 --- a/packages/@webex/internal-plugin-llm/src/llm.ts +++ b/packages/@webex/internal-plugin-llm/src/llm.ts @@ -2,7 +2,7 @@ import Mercury from '@webex/internal-plugin-mercury'; -import {LLM} from './constants'; +import {LLM, LLM_DEFAULT_SESSION} from './constants'; // eslint-disable-next-line no-unused-vars import {ILLMChannel} from './llm.types'; @@ -42,39 +42,46 @@ export const config = { */ export default class LLMChannel extends (Mercury as any) implements ILLMChannel { namespace = LLM; - + defaultSessionId = LLM_DEFAULT_SESSION; /** - * If the LLM plugin has been registered and listening - * @instance - * @type {Boolean} - * @public + * Map to store connection-specific data for multiple LLM connections + * @private + * @type {Map} */ - - private webSocketUrl?: string; - - private binding?: string; - - private locusUrl?: string; - - private datachannelUrl?: string; + private connections: Map< + string, + { + webSocketUrl?: string; + binding?: string; + locusUrl?: string; + datachannelUrl?: string; + } + > = new Map(); /** * Register to the websocket * @param {string} llmSocketUrl + * @param {string} sessionId - Connection identifier * @returns {Promise} */ - private register = (llmSocketUrl: string): Promise => + private register = ( + llmSocketUrl: string, + sessionId: string = LLM_DEFAULT_SESSION + ): Promise => this.request({ method: 'POST', url: llmSocketUrl, body: {deviceUrl: this.webex.internal.device.url}, }) .then((res: {body: {webSocketUrl: string; binding: string}}) => { - this.webSocketUrl = res.body.webSocketUrl; - this.binding = res.body.binding; + // Get or create connection data + const sessionData = this.connections.get(sessionId) || {}; + sessionData.webSocketUrl = res.body.webSocketUrl; + sessionData.binding = res.body.binding; + this.connections.set(sessionId, sessionData); }) .catch((error: any) => { - this.logger.error(`Error connecting to websocket: ${error}`); + this.logger.error(`Error connecting to websocket for ${sessionId}: ${error}`); throw error; }); @@ -82,50 +89,111 @@ export default class LLMChannel extends (Mercury as any) implements ILLMChannel * Register and connect to the websocket * @param {string} locusUrl * @param {string} datachannelUrl + * @param {string} sessionId - Connection identifier * @returns {Promise} */ - public registerAndConnect = (locusUrl: string, datachannelUrl: string): Promise => - this.register(datachannelUrl).then(() => { + public registerAndConnect = ( + locusUrl: string, + datachannelUrl: string, + sessionId: string = LLM_DEFAULT_SESSION + ): Promise => + this.register(datachannelUrl, sessionId).then(() => { if (!locusUrl || !datachannelUrl) return undefined; - this.locusUrl = locusUrl; - this.datachannelUrl = datachannelUrl; - this.connect(this.webSocketUrl); + + // Get or create connection data + const sessionData = this.connections.get(sessionId) || {}; + sessionData.locusUrl = locusUrl; + sessionData.datachannelUrl = datachannelUrl; + this.connections.set(sessionId, sessionData); + console.error( + `registerAndConnect(${sessionId}) --> channel is ${datachannelUrl}! -->websocketurl is ${sessionData.webSocketUrl}` + ); + + return this.connect(sessionData.webSocketUrl, sessionId); }); /** * Tells if LLM socket is connected + * @param {string} sessionId - Connection identifier * @returns {boolean} connected */ - public isConnected = (): boolean => this.connected; + public isConnected = (sessionId = LLM_DEFAULT_SESSION): boolean => { + const socket = this.getSocket(sessionId); + + return socket ? socket.connected : false; + }; /** * Tells if LLM socket is binding + * @param {string} sessionId - Connection identifier * @returns {string} binding */ - public getBinding = (): string => this.binding; + public getBinding = (sessionId = LLM_DEFAULT_SESSION): string => { + const sessionData = this.connections.get(sessionId); + + return sessionData?.binding || ''; + }; /** * Get Locus URL for the connection + * @param {string} sessionId - Connection identifier * @returns {string} locus Url */ - public getLocusUrl = (): string => this.locusUrl; + public getLocusUrl = (sessionId = LLM_DEFAULT_SESSION): string => { + const sessionData = this.connections.get(sessionId); + + return sessionData?.locusUrl || ''; + }; /** * Get data channel URL for the connection + * @param {string} sessionId - Connection identifier * @returns {string} data channel Url */ - public getDatachannelUrl = (): string => this.datachannelUrl; + public getDatachannelUrl = (sessionId = LLM_DEFAULT_SESSION): string => { + const sessionData = this.connections.get(sessionId); + + return sessionData?.datachannelUrl || ''; + }; /** * Disconnects websocket connection * @param {{code: number, reason: string}} options - The disconnect option object with code and reason + * @param {string} sessionId - Connection identifier + * @returns {Promise} + */ + public disconnectLLM = ( + options: {code: number; reason: string}, + sessionId: string = LLM_DEFAULT_SESSION + ): Promise => + this.disconnect(options, sessionId).then(() => { + // Clean up sessions data + console.error(`disconnectLLM(${sessionId})`); + this.connections.delete(sessionId); + }); + + /** + * Disconnects all LLM websocket connections + * @param {{code: number, reason: string}} options - The disconnect option object with code and reason * @returns {Promise} */ - public disconnectLLM = (options: object): Promise => - this.disconnect(options).then(() => { - this.locusUrl = undefined; - this.datachannelUrl = undefined; - this.binding = undefined; - this.webSocketUrl = undefined; + public disconnectAllLLM = (options?: {code: number; reason: string}): Promise => + this.disconnectAll(options).then(() => { + // Clean up all connection data + this.connections.clear(); }); + + /** + * Get all active LLM connections + * @returns {Map} Map of sessionId to session data + */ + public getAllConnections = (): Map< + string, + { + webSocketUrl?: string; + binding?: string; + locusUrl?: string; + datachannelUrl?: string; + } + > => new Map(this.connections); } diff --git a/packages/@webex/internal-plugin-llm/src/llm.types.ts b/packages/@webex/internal-plugin-llm/src/llm.types.ts index a9a462adbcf..16f0ff03673 100644 --- a/packages/@webex/internal-plugin-llm/src/llm.types.ts +++ b/packages/@webex/internal-plugin-llm/src/llm.types.ts @@ -1,9 +1,24 @@ interface ILLMChannel { - registerAndConnect: (locusUrl: string, datachannelUrl: string) => Promise; - isConnected: () => boolean; - getBinding: () => string; - getLocusUrl: () => string; - disconnectLLM: (options: {code: number; reason: string}) => Promise; + registerAndConnect: ( + locusUrl: string, + datachannelUrl: string, + sessionId?: string + ) => Promise; + isConnected: (sessionId?: string) => boolean; + getBinding: (sessionId?: string) => string; + getLocusUrl: (sessionId?: string) => string; + getDatachannelUrl: (sessionId?: string) => string; + disconnectLLM: (options: {code: number; reason: string}, sessionId?: string) => Promise; + disconnectAllLLM: (options?: {code: number; reason: string}) => Promise; + getAllConnections: () => Map< + string, + { + webSocketUrl?: string; + binding?: string; + locusUrl?: string; + datachannelUrl?: string; + } + >; } // eslint-disable-next-line import/prefer-default-export export type {ILLMChannel}; diff --git a/packages/@webex/internal-plugin-mercury/src/mercury.js b/packages/@webex/internal-plugin-mercury/src/mercury.js index 506815b086c..6d52e55c54f 100644 --- a/packages/@webex/internal-plugin-mercury/src/mercury.js +++ b/packages/@webex/internal-plugin-mercury/src/mercury.js @@ -6,7 +6,7 @@ import url from 'url'; import {WebexPlugin} from '@webex/webex-core'; -import {deprecated, oneFlight} from '@webex/common'; +import {deprecated} from '@webex/common'; import {camelCase, get, set} from 'lodash'; import backoff from 'backoff'; @@ -25,6 +25,7 @@ const normalReconnectReasons = ['idle', 'done (forced)', 'pong not received', 'p const Mercury = WebexPlugin.extend({ namespace: 'Mercury', lastError: undefined, + defaultSessionId: 'mercury-default-session', session: { connected: { @@ -39,7 +40,14 @@ const Mercury = WebexPlugin.extend({ default: false, type: 'boolean', }, - socket: 'object', + sockets: { + default: () => new Map(), + type: 'object', + }, + backoffCalls: { + default: () => new Map(), + type: 'object', + }, localClusterServiceUrls: 'object', mercuryTimeOffset: { default: undefined, @@ -78,17 +86,65 @@ const Mercury = WebexPlugin.extend({ return this.lastError; }, - @oneFlight - connect(webSocketUrl) { - if (this.connected) { - this.logger.info(`${this.namespace}: already connected, will not connect again`); + /** + * Get all active socket connections + * @returns {Map} Map of sessionId to socket instances + */ + getSockets() { + return this.sockets; + }, + + /** + * Get a specific socket by connection ID + * @param {string} sessionId - The connection identifier + * @returns {Socket|undefined} The socket instance or undefined if not found + */ + getSocket(sessionId = this.defaultSessionId) { + return this.sockets.get(sessionId); + }, + + /** + * Check if any sockets are connected + * @returns {boolean} True if at least one socket is connected + */ + hasConnectedSockets() { + for (const socket of this.sockets.values()) { + if (socket && socket.connected) { + return true; + } + } + + return false; + }, + + /** + * Check if any sockets are connecting + * @returns {boolean} True if at least one socket is connected + */ + hasConnectingSockets() { + for (const socket of this.sockets.values()) { + if (socket && socket.connecting) { + return true; + } + } + + return false; + }, + + // @oneFlight + connect(webSocketUrl, sessionId = this.defaultSessionId) { + const sessionSocket = this.sockets.get(sessionId); + if (sessionSocket?.connected || sessionSocket?.connecting) { + this.logger.info( + `${this.namespace}: connection ${sessionId} already connecting or connected, will not connect again` + ); return Promise.resolve(); } this.connecting = true; - this.logger.info(`${this.namespace}: starting connection attempt`); + this.logger.info(`${this.namespace}: starting connection attempt for ${sessionId}`); this.logger.info( `${this.namespace}: debug_mercury_logging stack: `, new Error('debug_mercury_logging').stack @@ -97,9 +153,9 @@ const Mercury = WebexPlugin.extend({ return Promise.resolve( this.webex.internal.device.registered || this.webex.internal.device.register() ).then(() => { - this.logger.info(`${this.namespace}: connecting`); + this.logger.info(`${this.namespace}: connecting ${sessionId}`); - return this._connectWithBackoff(webSocketUrl); + return this._connectWithBackoff(webSocketUrl, sessionId); }); }, @@ -110,7 +166,7 @@ const Mercury = WebexPlugin.extend({ new Error('debug_mercury_logging').stack ); - return this.disconnect( + return this.disconnectAll( this.config.beforeLogoutOptionsCloseReason && !normalReconnectReasons.includes(this.config.beforeLogoutOptionsCloseReason) ? {code: 3050, reason: this.config.beforeLogoutOptionsCloseReason} @@ -118,21 +174,52 @@ const Mercury = WebexPlugin.extend({ ); }, - @oneFlight - disconnect(options) { + // @oneFlight + disconnect(options, sessionId = this.defaultSessionId) { return new Promise((resolve) => { - if (this.backoffCall) { - this.logger.info(`${this.namespace}: aborting connection`); - this.backoffCall.abort(); + console.error(`Mercury#disconnect()1 ${sessionId}.`); + const backoffCall = this.backoffCalls.get(sessionId); + if (backoffCall) { + this.logger.info(`${this.namespace}: aborting connection ${sessionId}`); + backoffCall.abort(); + this.backoffCalls.delete(sessionId); } - if (this.socket) { - this.socket.removeAllListeners('message'); - this.once('offline', resolve); - resolve(this.socket.close(options || undefined)); + const sessionSocket = this.sockets.get(sessionId); + const suffix = sessionId === this.defaultSessionId ? '' : `:${sessionId}`; + + if (sessionSocket) { + sessionSocket.removeAllListeners('message'); + sessionSocket.connecting = false; + sessionSocket.connected = false; + this.once(sessionId === this.defaultSessionId ? 'offline' : `offline${suffix}`, resolve); + resolve(sessionSocket.close(options || undefined)); } resolve(); + + // Update overall connected status + this.connected = this.hasConnectedSockets(); + }); + }, + + /** + * Disconnect all socket connections + * @param {object} options - Close options + * @returns {Promise} Promise that resolves when all connections are closed + */ + disconnectAll(options) { + console.error('Mercury#disconnectAll()'); + const disconnectPromises = []; + + for (const sessionId of this.sockets.keys()) { + disconnectPromises.push(this.disconnect(options, sessionId)); + } + + return Promise.all(disconnectPromises).then(() => { + this.connected = false; + this.sockets.clear(); + this.backoffCalls.clear(); }); }, @@ -207,20 +294,24 @@ const Mercury = WebexPlugin.extend({ }); }, - _attemptConnection(socketUrl, callback) { + _attemptConnection(socketUrl, sessionId, callback) { + console.error(`Mercury#_attemptConnection() ${sessionId}.`); const socket = new Socket(); + socket.connecting = true; let attemptWSUrl; + const suffix = sessionId === this.defaultSessionId ? '' : `:${sessionId}`; - socket.on('close', (...args) => this._onclose(...args)); - socket.on('message', (...args) => this._onmessage(...args)); - socket.on('pong', (...args) => this._setTimeOffset(...args)); - socket.on('sequence-mismatch', (...args) => this._emit('sequence-mismatch', ...args)); - socket.on('ping-pong-latency', (...args) => this._emit('ping-pong-latency', ...args)); + socket.on('close', (...args) => this._onclose(sessionId, ...args)); + socket.on('message', (...args) => this._onmessage(sessionId, ...args)); + socket.on('pong', (...args) => this._setTimeOffset(sessionId, ...args)); + socket.on('sequence-mismatch', (...args) => this._emit(`sequence-mismatch${suffix}`, ...args)); + socket.on('ping-pong-latency', (...args) => this._emit(`ping-pong-latency${suffix}`, ...args)); Promise.all([this._prepareUrl(socketUrl), this.webex.credentials.getUserToken()]) .then(([webSocketUrl, token]) => { - if (!this.backoffCall) { - const msg = `${this.namespace}: prevent socket open when backoffCall no longer defined`; + const backoffCall = this.backoffCalls.get(sessionId); + if (!backoffCall) { + const msg = `${this.namespace}: prevent socket open when backoffCall no longer defined for ${sessionId}`; this.logger.info(msg); @@ -234,27 +325,28 @@ const Mercury = WebexPlugin.extend({ pingInterval: this.config.pingInterval, pongTimeout: this.config.pongTimeout, token: token.toString(), - trackingId: `${this.webex.sessionId}_${Date.now()}`, + trackingId: `${this.webex.sessionId}_${sessionId}_${Date.now()}`, logger: this.logger, }; // if the consumer has supplied request options use them if (this.webex.config.defaultMercuryOptions) { - this.logger.info(`${this.namespace}: setting custom options`); + this.logger.info(`${this.namespace}: setting custom options for ${sessionId}`); options = {...options, ...this.webex.config.defaultMercuryOptions}; } // Set the socket before opening it. This allows a disconnect() to close // the socket if it is in the process of being opened. - this.socket = socket; + this.sockets.set(sessionId, socket); + this.socket = this.sockets.get(this.defaultSessionId) || socket; - this.logger.info(`${this.namespace} connection url: ${webSocketUrl}`); + this.logger.info(`${this.namespace} connection url for ${sessionId}: ${webSocketUrl}`); return socket.open(webSocketUrl, options); }) .then(() => { this.logger.info( - `${this.namespace}: connected to mercury, success, action: connected, url: ${attemptWSUrl}` + `${this.namespace}: connected to mercury, success, action: connected, sessionId: ${sessionId}, url: ${attemptWSUrl}` ); callback(); @@ -271,30 +363,36 @@ const Mercury = WebexPlugin.extend({ .catch((reason) => { this.lastError = reason; // remember the last error + const backoffCall = this.backoffCalls.get(sessionId); // Suppress connection errors that appear to be network related. This // may end up suppressing metrics during outages, but we might not care // (especially since many of our outages happen in a way that client // metrics can't be trusted). - if (reason.code !== 1006 && this.backoffCall && this.backoffCall?.getNumRetries() > 0) { - this._emit('connection_failed', reason, {retries: this.backoffCall?.getNumRetries()}); + if (reason.code !== 1006 && backoffCall && backoffCall?.getNumRetries() > 0) { + this._emit(`connection_failed${suffix}`, reason, { + sessionId, + retries: backoffCall?.getNumRetries(), + }); } this.logger.info( - `${this.namespace}: connection attempt failed`, + `${this.namespace}: connection attempt failed for ${sessionId}`, reason, - this.backoffCall?.getNumRetries() === 0 ? reason.stack : '' + backoffCall?.getNumRetries() === 0 ? reason.stack : '' ); // UnknownResponse is produced by IE for any 4XXX; treated it like a bad // web socket url and let WDM handle the token checking if (reason instanceof UnknownResponse) { this.logger.info( - `${this.namespace}: received unknown response code, refreshing device registration` + `${this.namespace}: received unknown response code for ${sessionId}, refreshing device registration` ); return this.webex.internal.device.refresh().then(() => callback(reason)); } // NotAuthorized implies expired token if (reason instanceof NotAuthorized) { - this.logger.info(`${this.namespace}: received authorization error, reauthorizing`); + this.logger.info( + `${this.namespace}: received authorization error for ${sessionId}, reauthorizing` + ); return this.webex.credentials.refresh({force: true}).then(() => callback(reason)); } @@ -307,8 +405,10 @@ const Mercury = WebexPlugin.extend({ // BadRequest implies current credentials are for a Service Account // Forbidden implies current user is not entitle for Webex if (reason instanceof BadRequest || reason instanceof Forbidden) { - this.logger.warn(`${this.namespace}: received unrecoverable response from mercury`); - this.backoffCall.abort(); + this.logger.warn( + `${this.namespace}: received unrecoverable response from mercury for ${sessionId}` + ); + backoffCall?.abort(); return callback(reason); } @@ -318,7 +418,7 @@ const Mercury = WebexPlugin.extend({ .then((haMessagingEnabled) => { if (haMessagingEnabled) { this.logger.info( - `${this.namespace}: received a generic connection error, will try to connect to another datacenter. failed, action: 'failed', url: ${attemptWSUrl} error: ${reason.message}` + `${this.namespace}: received a generic connection error for ${sessionId}, will try to connect to another datacenter. failed, action: 'failed', url: ${attemptWSUrl} error: ${reason.message}` ); return this.webex.internal.services.markFailedUrl(attemptWSUrl); @@ -332,42 +432,60 @@ const Mercury = WebexPlugin.extend({ return callback(reason); }) .catch((reason) => { - this.logger.error(`${this.namespace}: failed to handle connection failure`, reason); + this.logger.error( + `${this.namespace}: failed to handle connection failure for ${sessionId}`, + reason + ); callback(reason); }); }, - _connectWithBackoff(webSocketUrl) { + _connectWithBackoff(webSocketUrl, sessionId) { return new Promise((resolve, reject) => { // eslint gets confused about whether or not call is actually used // eslint-disable-next-line prefer-const let call; - const onComplete = (err) => { - this.connecting = false; - - this.backoffCall = undefined; + const onComplete = (err, sid = sessionId) => { + this.backoffCalls.delete(sid); if (err) { this.logger.info( `${ this.namespace - }: failed to connect after ${call.getNumRetries()} retries; log statement about next retry was inaccurate; ${err}` + }: failed to connect ${sid} after ${call.getNumRetries()} retries; log statement about next retry was inaccurate; ${err}` ); return reject(err); } - this.connected = true; + console.error(`Mercury#connected() ${sid}.`); + // Update overall connected status + const sessionSocket = this.sockets.get(sid); + if (sessionSocket) { + sessionSocket.connecting = false; + sessionSocket.connected = true; + } + // @ts-ignore + this.connecting = this.hasConnectingSockets(); + this.connected = this.hasConnectedSockets(); this.hasEverConnected = true; - this._emit('online'); + const suffix = sid === this.defaultSessionId ? '' : `:${sid}`; + this._emit(`online${suffix}`, {sessionId: sid}); this.webex.internal.newMetrics.callDiagnosticMetrics.setMercuryConnectedStatus(true); return resolve(); }; - + console.error(`Mercury#_connectWithBackoff() ${sessionId}.`); // eslint-disable-next-line prefer-reflect - call = backoff.call((callback) => { - this.logger.info(`${this.namespace}: executing connection attempt ${call.getNumRetries()}`); - this._attemptConnection(webSocketUrl, callback); - }, onComplete); + call = backoff.call( + (callback) => { + this.logger.info( + `${ + this.namespace + }: executing connection attempt ${call.getNumRetries()} for ${sessionId}` + ); + this._attemptConnection(webSocketUrl, sessionId, callback); + }, + (err) => onComplete(err, sessionId) + ); call.setStrategy( new backoff.ExponentialStrategy({ @@ -383,8 +501,9 @@ const Mercury = WebexPlugin.extend({ } call.on('abort', () => { - this.logger.info(`${this.namespace}: connection aborted`); - reject(new Error('Mercury Connection Aborted')); + console.error(`Mercury#_connectWithBackoff abort() ${sessionId}.`); + this.logger.info(`${this.namespace}: connection aborted for ${sessionId}`); + reject(new Error(`Mercury Connection Aborted for ${sessionId}`)); }); call.on('callback', (err) => { @@ -393,7 +512,9 @@ const Mercury = WebexPlugin.extend({ const delay = Math.min(call.strategy_.nextBackoffDelay_, this.config.backoffTimeMax); this.logger.info( - `${this.namespace}: failed to connect; attempting retry ${number + 1} in ${delay} ms` + `${this.namespace}: failed to connect ${sessionId}; attempting retry ${ + number + 1 + } in ${delay} ms` ); /* istanbul ignore if */ if (process.env.NODE_ENV === 'development') { @@ -402,12 +523,12 @@ const Mercury = WebexPlugin.extend({ return; } - this.logger.info(`${this.namespace}: connected`); + this.logger.info(`${this.namespace}: connected ${sessionId}`); }); - + console.error(`Mercury#_connectWithBackoff start() ${sessionId}.`); call.start(); - this.backoffCall = call; + this.backoffCalls.set(sessionId, call); }); }, @@ -444,78 +565,94 @@ const Mercury = WebexPlugin.extend({ return handlers; }, - _onclose(event) { + _onclose(sessionId, event) { // I don't see any way to avoid the complexity or statement count in here. /* eslint complexity: [0] */ try { const reason = event.reason && event.reason.toLowerCase(); - const socketUrl = this.socket.url; + let sessionSocket = this.sockets.get(sessionId); + const socketUrl = sessionSocket?.url; + const suffix = sessionId === this.defaultSessionId ? '' : `:${sessionId}`; + event.sessionId = sessionId; + this.sockets.delete(sessionId); + + if (sessionSocket) { + sessionSocket.removeAllListeners(); + sessionSocket = null; + this._emit(`offline${suffix}`, event); + } - this.socket.removeAllListeners(); - this.unset('socket'); - this.connected = false; - this._emit('offline', event); - this.webex.internal.newMetrics.callDiagnosticMetrics.setMercuryConnectedStatus(false); + // Update overall connected status + this.connecting = this.hasConnectingSockets(); + this.connected = this.hasConnectedSockets(); + + if (!this.connected) { + this.webex.internal.newMetrics.callDiagnosticMetrics.setMercuryConnectedStatus(false); + } switch (event.code) { case 1003: // metric: disconnect this.logger.info( - `${this.namespace}: Mercury service rejected last message; will not reconnect: ${event.reason}` + `${this.namespace}: Mercury service rejected last message for ${sessionId}; will not reconnect: ${event.reason}` ); - this._emit('offline.permanent', event); + this._emit(`offline.permanent${suffix}`, event); break; case 4000: // metric: disconnect - this.logger.info(`${this.namespace}: socket replaced; will not reconnect`); - this._emit('offline.replaced', event); + this.logger.info(`${this.namespace}: socket ${sessionId} replaced; will not reconnect`); + this._emit(`offline.replaced${suffix}`, event); break; case 1001: case 1005: case 1006: case 1011: - this.logger.info(`${this.namespace}: socket disconnected; reconnecting`); - this._emit('offline.transient', event); - this._reconnect(socketUrl); + this.logger.info(`${this.namespace}: socket ${sessionId} disconnected; reconnecting`); + this._emit(`offline.transient${suffix}`, event); + this._reconnect(socketUrl, sessionId); // metric: disconnect // if (code == 1011 && reason !== ping error) metric: unexpected disconnect break; case 1000: case 3050: // 3050 indicates logout form of closure, default to old behavior, use config reason defined by consumer to proceed with the permanent block if (normalReconnectReasons.includes(reason)) { - this.logger.info(`${this.namespace}: socket disconnected; reconnecting`); - this._emit('offline.transient', event); - this._reconnect(socketUrl); + this.logger.info(`${this.namespace}: socket ${sessionId} disconnected; reconnecting`); + this._emit(`offline.transient${suffix}`, event); + this._reconnect(socketUrl, sessionId); // metric: disconnect // if (reason === done forced) metric: force closure } else { this.logger.info( - `${this.namespace}: socket disconnected; will not reconnect: ${event.reason}` + `${this.namespace}: socket ${sessionId} disconnected; will not reconnect: ${event.reason}` ); - this._emit('offline.permanent', event); + this._emit(`offline.permanent${suffix}`, event); } break; default: this.logger.info( - `${this.namespace}: socket disconnected unexpectedly; will not reconnect` + `${this.namespace}: socket ${sessionId} disconnected unexpectedly; will not reconnect` ); // unexpected disconnect - this._emit('offline.permanent', event); + this._emit(`offline.permanent${suffix}`, event); } } catch (error) { - this.logger.error(`${this.namespace}: error occurred in close handler`, error); + this.logger.error( + `${this.namespace}: error occurred in close handler for ${sessionId}`, + error + ); } }, - _onmessage(event) { - this._setTimeOffset(event); + _onmessage(sessionId, event) { + this._setTimeOffset(sessionId, event); const envelope = event.data; if (process.env.ENABLE_MERCURY_LOGGING) { - this.logger.debug(`${this.namespace}: message envelope: `, envelope); + this.logger.debug(`${this.namespace}: message envelope from ${sessionId}: `, envelope); } + envelope.sessionId = sessionId; const {data} = envelope; this._applyOverrides(data); @@ -530,7 +667,7 @@ const Mercury = WebexPlugin.extend({ resolve((this.webex[namespace] || this.webex.internal[namespace])[name](data)) ).catch((reason) => this.logger.error( - `${this.namespace}: error occurred in autowired event handler for ${data.eventType}`, + `${this.namespace}: error occurred in autowired event handler for ${data.eventType} from ${sessionId}`, reason ) ); @@ -538,32 +675,37 @@ const Mercury = WebexPlugin.extend({ Promise.resolve() ) .then(() => { - this._emit('event', event.data); + const suffix = sessionId === this.defaultSessionId ? '' : `:${sessionId}`; + + this._emit(`event${suffix}`, envelope); const [namespace] = data.eventType.split('.'); if (namespace === data.eventType) { - this._emit(`event:${namespace}`, envelope); + this._emit(`event:${namespace}${suffix}`, envelope); } else { - this._emit(`event:${namespace}`, envelope); - this._emit(`event:${data.eventType}`, envelope); + this._emit(`event:${namespace}${suffix}`, envelope); + this._emit(`event:${data.eventType}${suffix}`, envelope); } }) .catch((reason) => { - this.logger.error(`${this.namespace}: error occurred processing socket message`, reason); + this.logger.error( + `${this.namespace}: error occurred processing socket message from ${sessionId}`, + reason + ); }); }, - _setTimeOffset(event) { + _setTimeOffset(sessionId, event) { const {wsWriteTimestamp} = event.data; if (typeof wsWriteTimestamp === 'number' && wsWriteTimestamp > 0) { this.mercuryTimeOffset = Date.now() - wsWriteTimestamp; } }, - _reconnect(webSocketUrl) { - this.logger.info(`${this.namespace}: reconnecting`); + _reconnect(webSocketUrl, sessionId = this.defaultSessionId) { + this.logger.info(`${this.namespace}: reconnecting ${sessionId}`); - return this.connect(webSocketUrl); + return this.connect(webSocketUrl, sessionId); }, }); diff --git a/packages/@webex/internal-plugin-voicea/src/voicea.ts b/packages/@webex/internal-plugin-voicea/src/voicea.ts index 7e256493235..39ebb975c16 100644 --- a/packages/@webex/internal-plugin-voicea/src/voicea.ts +++ b/packages/@webex/internal-plugin-voicea/src/voicea.ts @@ -84,6 +84,8 @@ export class VoiceaChannel extends WebexPlugin implements IVoiceaChannel { if (!this.hasSubscribedToEvents) { // @ts-ignore this.webex.internal.llm.on('event:relay.event', this.eventProcessor); + // @ts-ignore + this.webex.internal.llm.on('event:relay.event:llm-practice-session', this.eventProcessor); this.hasSubscribedToEvents = true; } } @@ -97,6 +99,8 @@ export class VoiceaChannel extends WebexPlugin implements IVoiceaChannel { this.vmcDeviceId = undefined; // @ts-ignore this.webex.internal.llm.off('event:relay.event', this.eventProcessor); + // @ts-ignore + this.webex.internal.llm.off('event:relay.event:llm-practice-session', this.eventProcessor); this.hasSubscribedToEvents = false; this.announceStatus = ANNOUNCE_STATUS.IDLE; this.captionStatus = TURN_ON_CAPTION_STATUS.IDLE; @@ -244,13 +248,20 @@ export class VoiceaChannel extends WebexPlugin implements IVoiceaChannel { private sendAnnouncement = (): void => { this.announceStatus = ANNOUNCE_STATUS.JOINING; this.listenToEvents(); - // @ts-ignore - this.webex.internal.llm.socket.send({ + const socket = + // @ts-ignore + this.webex.internal.llm.getSocket('llm-practice-session') || this.webex.internal.llm.socket; + const binding = + // @ts-ignore + this.webex.internal.llm.getBinding('llm-practice-session') || + // @ts-ignore + this.webex.internal.llm.getBinding(); + socket.send({ id: `${this.seqNum}`, type: 'publishRequest', recipients: { // @ts-ignore - route: this.webex.internal.llm.getBinding(), + route: binding, }, headers: {}, data: { @@ -294,13 +305,20 @@ export class VoiceaChannel extends WebexPlugin implements IVoiceaChannel { public requestLanguage = (languageCode: string): void => { // @ts-ignore if (!this.webex.internal.llm.isConnected()) return; - // @ts-ignore - this.webex.internal.llm.socket.send({ + const socket = + // @ts-ignore + this.webex.internal.llm.getSocket('llm-practice-session') || this.webex.internal.llm.socket; + const binding = + // @ts-ignore + this.webex.internal.llm.getBinding('llm-practice-session') || + // @ts-ignore + this.webex.internal.llm.getBinding(); + socket.send({ id: `${this.seqNum}`, type: 'publishRequest', recipients: { // @ts-ignore - route: this.webex.internal.llm.getBinding(), + route: binding, }, headers: { to: this.vmcDeviceId, @@ -335,8 +353,16 @@ export class VoiceaChannel extends WebexPlugin implements IVoiceaChannel { // @ts-ignore if (!this.webex.internal.llm.isConnected()) return; - // @ts-ignore - this.webex.internal.llm.socket.send({ + const socket = + // @ts-ignore + this.webex.internal.llm.getSocket('llm-practice-session') || this.webex.internal.llm.socket; + const binding = + // @ts-ignore + this.webex.internal.llm.getBinding('llm-practice-session') || + // @ts-ignore + this.webex.internal.llm.getBinding(); + + socket?.send({ id: `${this.seqNum}`, type: 'publishRequest', recipients: { diff --git a/packages/@webex/plugin-meetings/src/annotation/index.ts b/packages/@webex/plugin-meetings/src/annotation/index.ts index fc0f4a3ee66..c8e7660e5b4 100644 --- a/packages/@webex/plugin-meetings/src/annotation/index.ts +++ b/packages/@webex/plugin-meetings/src/annotation/index.ts @@ -116,6 +116,12 @@ class AnnotationChannel extends WebexPlugin implements IAnnotationChannel { ); // @ts-ignore this.webex.internal.llm.on('event:relay.event', this.eventDataProcessor, this); + // @ts-ignore + this.webex.internal.llm.on( + 'event:relay.event:llm-practice-session', + this.eventDataProcessor, + this + ); this.hasSubscribedToEvents = true; } } @@ -131,7 +137,11 @@ class AnnotationChannel extends WebexPlugin implements IAnnotationChannel { // @ts-ignore this.webex.internal.llm.off('event:relay.event', this.eventDataProcessor); - + // @ts-ignore + this.webex.internal.llm.off( + 'event:relay.event:llm-practice-session', + this.eventDataProcessor + ); this.hasSubscribedToEvents = false; } } @@ -303,12 +313,20 @@ class AnnotationChannel extends WebexPlugin implements IAnnotationChannel { * @returns {void} */ private publishEncrypted(encryptedContent: string, strokeData: StrokeData) { + const socket = + // @ts-ignore + this.webex.internal.llm.getSocket('llm-practice-session') || this.webex.internal.llm.socket; + const binding = + // @ts-ignore + this.webex.internal.llm.getBinding('llm-practice-session') || + // @ts-ignore + this.webex.internal.llm.getBinding(); const data = { id: `${this.seqNum}`, type: 'publishRequest', recipients: { // @ts-ignore - route: this.webex.internal.llm.getBinding(), + route: binding, }, headers: { to: strokeData.toUserId, @@ -336,7 +354,7 @@ class AnnotationChannel extends WebexPlugin implements IAnnotationChannel { }; // @ts-ignore - this.webex.internal.llm.socket.send(data); + socket.send(data); this.seqNum += 1; } } diff --git a/packages/@webex/plugin-meetings/src/meeting/index.ts b/packages/@webex/plugin-meetings/src/meeting/index.ts index a997caff039..e8c20827030 100644 --- a/packages/@webex/plugin-meetings/src/meeting/index.ts +++ b/packages/@webex/plugin-meetings/src/meeting/index.ts @@ -6025,35 +6025,29 @@ export default class Meeting extends StatelessWebexPlugin { /** * Connects to low latency mercury and reconnects if the address has changed * It will also disconnect if called when the meeting has ended - * @param {String} datachannelUrl + * @param {boolean} forceClear to clear the existing connection * @returns {Promise} */ - async updateLLMConnection() { + async updateLLMConnection(forceClear = false) { // @ts-ignore - Fix type - const {url, info: {datachannelUrl, practiceSessionDatachannelUrl} = {}} = this.locusInfo; + const {url, info: {datachannelUrl} = {}} = this.locusInfo; const isJoined = this.isJoined(); - // webinar panelist should use new data channel in practice session - const dataChannelUrl = - this.webinar.isJoinPracticeSessionDataChannel() && practiceSessionDatachannelUrl - ? practiceSessionDatachannelUrl - : datachannelUrl; - // @ts-ignore - Fix type if (this.webex.internal.llm.isConnected()) { if ( // @ts-ignore - Fix type url === this.webex.internal.llm.getLocusUrl() && // @ts-ignore - Fix type - dataChannelUrl === this.webex.internal.llm.getDatachannelUrl() && + datachannelUrl === this.webex.internal.llm.getDatachannelUrl() && isJoined ) { return undefined; } // @ts-ignore - Fix type await this.webex.internal.llm.disconnectLLM( - isJoined + isJoined || forceClear ? { code: 3050, reason: 'done (permanent)', @@ -6064,20 +6058,20 @@ export default class Meeting extends StatelessWebexPlugin { this.webex.internal.llm.off('event:relay.event', this.processRelayEvent); } - if (!isJoined) { + if (!isJoined || forceClear) { return undefined; } // @ts-ignore - Fix type return this.webex.internal.llm - .registerAndConnect(url, dataChannelUrl) + .registerAndConnect(url, datachannelUrl) .then((registerAndConnectResult) => { // @ts-ignore - Fix type this.webex.internal.llm.off('event:relay.event', this.processRelayEvent); // @ts-ignore - Fix type this.webex.internal.llm.on('event:relay.event', this.processRelayEvent); LoggerProxy.logger.info( - 'Meeting:index#updateLLMConnection --> enabled to receive relay events!' + 'Meeting:index#updateLLMConnection --> enabled to receive relay events for default session!' ); return Promise.resolve(registerAndConnectResult); diff --git a/packages/@webex/plugin-meetings/src/meeting/util.ts b/packages/@webex/plugin-meetings/src/meeting/util.ts index 9a45ba9e3ea..02f8ef4ddf0 100644 --- a/packages/@webex/plugin-meetings/src/meeting/util.ts +++ b/packages/@webex/plugin-meetings/src/meeting/util.ts @@ -216,6 +216,7 @@ const MeetingUtil = { meeting.stopPeriodicLogUpload(); meeting.breakouts.cleanUp(); + meeting.webinar.cleanUp(); meeting.simultaneousInterpretation.cleanUp(); meeting.locusMediaRequest = undefined; @@ -240,7 +241,7 @@ const MeetingUtil = { .then(() => meeting.stopKeepAlive()) .then(() => { if (meeting.config?.enableAutomaticLLM) { - meeting.updateLLMConnection(); + meeting.updateLLMConnection(true); } }); }, diff --git a/packages/@webex/plugin-meetings/src/webinar/index.ts b/packages/@webex/plugin-meetings/src/webinar/index.ts index c793a7c8373..2eb670fbf26 100644 --- a/packages/@webex/plugin-meetings/src/webinar/index.ts +++ b/packages/@webex/plugin-meetings/src/webinar/index.ts @@ -28,6 +28,14 @@ const Webinar = WebexPlugin.extend({ meetingId: 'string', }, + /** + * Calls this to clean up listeners + * @returns {void} + */ + cleanUp() { + this.updatePSDataChannel(false); + }, + /** * Update the current locus url of the webinar * @param {string} locusUrl @@ -98,8 +106,9 @@ const Webinar = WebexPlugin.extend({ if (this.practiceSessionEnabled) { // may need change data channel in practice session - meeting?.updateLLMConnection(); + // meeting?.updateLLMConnection(); } + this.updatePSDataChannel(this.practiceSessionEnabled); }, /** @@ -110,6 +119,79 @@ const Webinar = WebexPlugin.extend({ return this.selfIsPanelist && this.practiceSessionEnabled; }, + /** + * Connects to low latency mercury and reconnects if the address has changed + * It will also disconnect if called when the meeting has ended + * @param {boolean} connect - whether to connect or disconnect + * @returns {Promise} + */ + async updatePSDataChannel(connect) { + const meeting = this.webex.meetings.getMeetingByType(_ID_, this.meetingId); + + // @ts-ignore - Fix type + const {url, info: {practiceSessionDatachannelUrl} = {}} = meeting?.locusInfo ?? {}; + + const isJoined = meeting?.isJoined() && this.isJoinPracticeSessionDataChannel(); + + if (!connect) { + // @ts-ignore - Fix type + if (this.webex.internal.llm.isConnected('llm-practice-session')) { + // @ts-ignore - Fix type + await this.webex.internal.llm.disconnectLLM( + { + code: 3050, + reason: 'done (permanent)', + }, + 'llm-practice-session' + ); + // @ts-ignore - Fix type + this.webex.internal.llm.off( + 'event:relay.event:llm-practice-session', + meeting?.processRelayEvent + ); + } + + return undefined; + } + + if (!isJoined || !practiceSessionDatachannelUrl) { + return undefined; + } + // @ts-ignore - Fix type + if (this.webex.internal.llm.isConnected('llm-practice-session')) { + if ( + // @ts-ignore - Fix type + url === this.webex.internal.llm.getLocusUrl('llm-practice-session') && + // @ts-ignore - Fix type + practiceSessionDatachannelUrl === + this.webex.internal.llm.getDatachannelUrl('llm-practice-session') + ) { + return undefined; + } + } + + // @ts-ignore - Fix type + return this.webex.internal.llm + .registerAndConnect(url, practiceSessionDatachannelUrl, 'llm-practice-session') + .then((registerAndConnectResult) => { + // @ts-ignore - Fix type + this.webex.internal.llm.off( + 'event:relay.event:llm-practice-session', + meeting?.processRelayEvent + ); + // @ts-ignore - Fix type + this.webex.internal.llm.on( + 'event:relay.event:llm-practice-session', + meeting?.processRelayEvent + ); + LoggerProxy.logger.info( + 'Webinar:index#updatePSDataChannel --> enabled to receive relay events for default session for llm-practice-session!' + ); + + return Promise.resolve(registerAndConnectResult); + }); + }, + /** * start or stop practice session for webinar * @param {boolean} enabled @@ -137,6 +219,7 @@ const Webinar = WebexPlugin.extend({ */ updatePracticeSessionStatus(payload) { this.set('practiceSessionEnabled', !!payload?.enabled); + this.updatePSDataChannel(this.practiceSessionEnabled).then(() => {}); }, /**