diff --git a/sdk/scripts/grpc-multiuser-client-test-comparison.ts b/sdk/scripts/grpc-multiuser-client-test-comparison.ts new file mode 100644 index 000000000..7c1ec78c0 --- /dev/null +++ b/sdk/scripts/grpc-multiuser-client-test-comparison.ts @@ -0,0 +1,156 @@ +import { grpcUserAccountSubscriber } from '../src/accounts/grpcUserAccountSubscriber'; +import { grpcMultiUserAccountSubscriber } from '../src/accounts/grpcMultiUserAccountSubscriber'; +import { Connection, Keypair, PublicKey } from '@solana/web3.js'; +import { DRIFT_PROGRAM_ID } from '../src'; +import { CommitmentLevel } from '@triton-one/yellowstone-grpc'; +import { AnchorProvider, Idl, Program } from '@coral-xyz/anchor'; +import driftIDL from '../src/idl/drift.json'; +import assert from 'assert'; +import { Wallet } from '../src'; + +const GRPC_ENDPOINT = process.env.GRPC_ENDPOINT; +const TOKEN = process.env.TOKEN; +const RPC_ENDPOINT = process.env.RPC_ENDPOINT; + +const USER_ACCOUNT_PUBKEYS = [ + // Add user account public keys here, e.g.: + // new PublicKey('...') +]; + +async function testGrpcUserAccountSubscriberV1VsV2() { + console.log('๐Ÿš€ Initializing User Account Subscriber V1 vs V2 Test...'); + + if (USER_ACCOUNT_PUBKEYS.length === 0) { + console.error('โŒ No user account public keys provided. Please add some to USER_ACCOUNT_PUBKEYS array.'); + process.exit(1); + } + + const connection = new Connection(RPC_ENDPOINT); + const wallet = new Wallet(new Keypair()); + + const programId = new PublicKey(DRIFT_PROGRAM_ID); + const provider = new AnchorProvider( + connection, + // @ts-ignore + wallet, + { + commitment: 'processed', + } + ); + + const program = new Program(driftIDL as Idl, programId, provider); + + const grpcConfigs = { + endpoint: GRPC_ENDPOINT, + token: TOKEN, + commitmentLevel: CommitmentLevel.PROCESSED, + channelOptions: { + 'grpc.keepalive_time_ms': 10_000, + 'grpc.keepalive_timeout_ms': 1_000, + 'grpc.keepalive_permit_without_calls': 1, + }, + }; + + console.log(`๐Ÿ“Š Testing ${USER_ACCOUNT_PUBKEYS.length} user accounts...`); + + // V1: Create individual subscribers for each user account + const v1Subscribers = USER_ACCOUNT_PUBKEYS.map( + (pubkey) => + new grpcUserAccountSubscriber( + grpcConfigs, + program, + pubkey, + { logResubMessages: true } + ) + ); + + // V2: Create a single multi-subscriber and get per-user interfaces + const v2MultiSubscriber = new grpcMultiUserAccountSubscriber( + program, + grpcConfigs, + { logResubMessages: true } + ); + const v2Subscribers = USER_ACCOUNT_PUBKEYS.map((pubkey) => + v2MultiSubscriber.forUser(pubkey) + ); + + // Subscribe all V1 subscribers + console.log('๐Ÿ”— Subscribing V1 subscribers...'); + await Promise.all(v1Subscribers.map((sub) => sub.subscribe())); + console.log('โœ… V1 subscribers ready'); + + // Subscribe all V2 subscribers + console.log('๐Ÿ”— Subscribing V2 subscribers...'); + await v2MultiSubscriber.subscribe(); + console.log('โœ… V2 subscribers ready'); + + const compare = () => { + try { + let passedTests = 0; + let totalTests = 0; + + // Test each user account + for (let i = 0; i < USER_ACCOUNT_PUBKEYS.length; i++) { + const pubkey = USER_ACCOUNT_PUBKEYS[i]; + const v1Sub = v1Subscribers[i]; + const v2Sub = v2Subscribers[i]; + + totalTests++; + + // 1. Test isSubscribed + assert.strictEqual( + v1Sub.isSubscribed, + v2Sub.isSubscribed, + `User ${pubkey.toBase58()}: isSubscribed should match` + ); + + // 2. Test getUserAccountAndSlot + const v1Data = v1Sub.getUserAccountAndSlot(); + const v2Data = v2Sub.getUserAccountAndSlot(); + + // Compare the user account data + assert.deepStrictEqual( + v1Data.data, + v2Data.data, + `User ${pubkey.toBase58()}: account data should match` + ); + + // Slots might differ slightly due to timing, but let's check if they're close + const slotDiff = Math.abs(v1Data.slot - v2Data.slot); + if (slotDiff > 10) { + console.warn( + `โš ๏ธ User ${pubkey.toBase58()}: slot difference is ${slotDiff} (v1: ${v1Data.slot}, v2: ${v2Data.slot})` + ); + } + + passedTests++; + } + + console.log(`โœ… All comparisons passed (${passedTests}/${totalTests} user accounts)`); + } catch (error) { + console.error('โŒ Comparison failed:', error); + } + }; + + // Run initial comparison + compare(); + + // Run comparison every second to verify live updates + const interval = setInterval(compare, 1000); + + const cleanup = async () => { + clearInterval(interval); + console.log('๐Ÿงน Cleaning up...'); + await Promise.all([ + ...v1Subscribers.map((sub) => sub.unsubscribe()), + ...v2Subscribers.map((sub) => sub.unsubscribe()), + ]); + console.log('โœ… Cleanup complete'); + process.exit(0); + }; + + process.on('SIGINT', cleanup); + process.on('SIGTERM', cleanup); + } + + testGrpcUserAccountSubscriberV1VsV2().catch(console.error); \ No newline at end of file diff --git a/sdk/scripts/single-grpc-client-test.ts b/sdk/scripts/single-grpc-client-test.ts index 0aca8985a..e6724e661 100644 --- a/sdk/scripts/single-grpc-client-test.ts +++ b/sdk/scripts/single-grpc-client-test.ts @@ -19,6 +19,7 @@ import { ProgramAccount, } from '@coral-xyz/anchor'; import driftIDL from '../src/idl/drift.json'; +import { grpcMultiUserAccountSubscriber } from '../src/accounts/grpcMultiUserAccountSubscriber'; const GRPC_ENDPOINT = process.env.GRPC_ENDPOINT; const TOKEN = process.env.TOKEN; @@ -96,18 +97,27 @@ async function initializeSingleGrpcClient() { console.log(`๐Ÿ“Š Markets: ${perpMarketIndexes.length} perp, ${spotMarketIndexes.length} spot`); console.log(`๐Ÿ”ฎ Oracles: ${oracleInfos.length}`); + + const grpcConfigs = { + endpoint: GRPC_ENDPOINT, + token: TOKEN, + commitmentLevel: CommitmentLevel.PROCESSED, + channelOptions: { + 'grpc.keepalive_time_ms': 10_000, + 'grpc.keepalive_timeout_ms': 1_000, + 'grpc.keepalive_permit_without_calls': 1, + }, + }; + + const multiUserSubsciber = new grpcMultiUserAccountSubscriber( + program, + grpcConfigs + ); + const baseAccountSubscription = { type: 'grpc' as const, - grpcConfigs: { - endpoint: GRPC_ENDPOINT, - token: TOKEN, - commitmentLevel: CommitmentLevel.PROCESSED, - channelOptions: { - 'grpc.keepalive_time_ms': 10_000, - 'grpc.keepalive_timeout_ms': 1_000, - 'grpc.keepalive_permit_without_calls': 1, - }, - }, + grpcConfigs, + grpcMultiUserAccountSubscriber: multiUserSubsciber, }; const config: DriftClientConfig = { diff --git a/sdk/src/accounts/grpcMultiAccountSubscriber.ts b/sdk/src/accounts/grpcMultiAccountSubscriber.ts index 8bdd1cbcc..7a442ac5c 100644 --- a/sdk/src/accounts/grpcMultiAccountSubscriber.ts +++ b/sdk/src/accounts/grpcMultiAccountSubscriber.ts @@ -372,6 +372,7 @@ export class grpcMultiAccountSubscriber { } }); }); + await this.fetch(); } async removeAccounts(accounts: PublicKey[]): Promise { diff --git a/sdk/src/accounts/grpcMultiUserAccountSubscriber.ts b/sdk/src/accounts/grpcMultiUserAccountSubscriber.ts new file mode 100644 index 000000000..b4ad528d5 --- /dev/null +++ b/sdk/src/accounts/grpcMultiUserAccountSubscriber.ts @@ -0,0 +1,274 @@ +import { + DataAndSlot, + GrpcConfigs, + NotSubscribedError, + ResubOpts, + UserAccountEvents, + UserAccountSubscriber, +} from './types'; +import StrictEventEmitter from 'strict-event-emitter-types'; +import { EventEmitter } from 'events'; +import { Context, PublicKey } from '@solana/web3.js'; +import { Program } from '@coral-xyz/anchor'; +import { UserAccount } from '../types'; +import { grpcMultiAccountSubscriber } from './grpcMultiAccountSubscriber'; + +export class grpcMultiUserAccountSubscriber { + private program: Program; + private multiSubscriber: grpcMultiAccountSubscriber; + + private userData = new Map>(); + private listeners = new Map< + string, + Set> + >(); + private keyToPk = new Map(); + private pendingAddKeys = new Set(); + private debounceTimer?: ReturnType; + private debounceMs = 20; + private isMultiSubscribed = false; + private userAccountSubscribers = new Map(); + private grpcConfigs: GrpcConfigs; + resubOpts?: ResubOpts; + + private handleAccountChange = ( + accountId: PublicKey, + data: UserAccount, + context: Context, + _buffer?: unknown, + _accountProps?: unknown + ): void => { + const k = accountId.toBase58(); + this.userData.set(k, { data, slot: context.slot }); + const setForKey = this.listeners.get(k); + if (setForKey) { + for (const emitter of setForKey) { + emitter.emit('userAccountUpdate', data); + emitter.emit('update'); + } + } + }; + + public constructor( + program: Program, + grpcConfigs: GrpcConfigs, + resubOpts?: ResubOpts, + multiSubscriber?: grpcMultiAccountSubscriber + ) { + this.program = program; + this.multiSubscriber = multiSubscriber; + this.grpcConfigs = grpcConfigs; + this.resubOpts = resubOpts; + } + + public async subscribe(): Promise { + if (!this.multiSubscriber) { + this.multiSubscriber = + await grpcMultiAccountSubscriber.create( + this.grpcConfigs, + 'user', + this.program, + undefined, + this.resubOpts + ); + } + + // Subscribe all per-user subscribers first + await Promise.all( + Array.from(this.userAccountSubscribers.values()).map((subscriber) => + subscriber.subscribe() + ) + ); + // Ensure we immediately register any pending keys and kick off underlying subscription/fetch + await this.flushPending(); + // Proactively fetch once to populate data for all subscribed accounts + await this.multiSubscriber.fetch(); + // Wait until the underlying multi-subscriber has data for every registered user key + const targetKeys = Array.from(this.listeners.keys()); + if (targetKeys.length === 0) return; + // Poll until all keys are present in dataMap + // Use debounceMs as the polling cadence to avoid introducing new magic numbers + // eslint-disable-next-line no-constant-condition + while (true) { + const map = this.multiSubscriber.getAccountDataMap(); + let allPresent = true; + for (const k of targetKeys) { + if (!map.has(k)) { + allPresent = false; + break; + } + } + if (allPresent) break; + await new Promise((resolve) => setTimeout(resolve, this.debounceMs)); + } + } + + public forUser(userAccountPublicKey: PublicKey): UserAccountSubscriber { + if (this.userAccountSubscribers.has(userAccountPublicKey.toBase58())) { + return this.userAccountSubscribers.get(userAccountPublicKey.toBase58())!; + } + const key = userAccountPublicKey.toBase58(); + const perUserEmitter: StrictEventEmitter = + new EventEmitter(); + // eslint-disable-next-line @typescript-eslint/no-this-alias + const parent = this; + let isSubscribed = false; + + const registerHandlerIfNeeded = async () => { + if (!this.listeners.has(key)) { + this.listeners.set(key, new Set()); + this.keyToPk.set(key, userAccountPublicKey); + this.pendingAddKeys.add(key); + this.scheduleFlush(); + } + }; + + const perUser: UserAccountSubscriber = { + get eventEmitter() { + return perUserEmitter; + }, + set eventEmitter(_v) {}, + + get isSubscribed() { + return isSubscribed; + }, + set isSubscribed(_v: boolean) { + isSubscribed = _v; + }, + + async subscribe(userAccount?: UserAccount): Promise { + if (isSubscribed) return true; + if (userAccount) { + this.updateData(userAccount, 0); + } + await registerHandlerIfNeeded(); + const setForKey = parent.listeners.get(key)!; + setForKey.add(perUserEmitter); + isSubscribed = true; + return true; + }, + + async fetch(): Promise { + if (!isSubscribed) { + throw new NotSubscribedError( + 'Must subscribe before fetching account updates' + ); + } + const account = (await parent.program.account.user.fetch( + userAccountPublicKey + )) as UserAccount; + this.updateData(account, 0); + }, + + updateData(userAccount: UserAccount, slot: number): void { + parent.userData.set(key, { data: userAccount, slot }); + perUserEmitter.emit('userAccountUpdate', userAccount); + perUserEmitter.emit('update'); + }, + + async unsubscribe(): Promise { + if (!isSubscribed) return; + const setForKey = parent.listeners.get(key); + if (setForKey) { + setForKey.delete(perUserEmitter); + if (setForKey.size === 0) { + parent.listeners.delete(key); + await parent.multiSubscriber.removeAccounts([userAccountPublicKey]); + parent.userData.delete(key); + parent.keyToPk.delete(key); + parent.pendingAddKeys.delete(key); + } + } + isSubscribed = false; + }, + + getUserAccountAndSlot(): DataAndSlot { + const das = parent.userData.get(key); + if (!das) { + throw new NotSubscribedError( + 'Must subscribe before getting user account data' + ); + } + return das; + }, + }; + + this.userAccountSubscribers.set(userAccountPublicKey.toBase58(), perUser); + return perUser; + } + + private scheduleFlush(): void { + if (this.debounceTimer) return; + this.debounceTimer = setTimeout(() => { + void this.flushPending(); + }, this.debounceMs); + } + + private async flushPending(): Promise { + const hasPending = this.pendingAddKeys.size > 0; + if (!hasPending) { + this.debounceTimer = undefined; + return; + } + + const allPks: PublicKey[] = []; + for (const k of this.listeners.keys()) { + const pk = this.keyToPk.get(k); + if (pk) allPks.push(pk); + } + if (allPks.length === 0) { + this.pendingAddKeys.clear(); + this.debounceTimer = undefined; + return; + } + + if (!this.isMultiSubscribed) { + await this.multiSubscriber.subscribe(allPks, this.handleAccountChange); + this.isMultiSubscribed = true; + await this.multiSubscriber.fetch(); + for (const k of this.pendingAddKeys) { + const pk = this.keyToPk.get(k); + if (pk) { + const data = this.multiSubscriber.getAccountData(k); + if (data) { + this.handleAccountChange( + pk, + data.data, + { slot: data.slot }, + undefined, + undefined + ); + } + } + } + } else { + const ms = this.multiSubscriber as unknown as { + onChangeMap: Map< + string, + ( + data: UserAccount, + context: Context, + buffer: unknown, + accountProps: unknown + ) => void + >; + }; + for (const k of this.pendingAddKeys) { + ms.onChangeMap.set(k, (data, ctx, buffer, accountProps) => { + this.multiSubscriber.setAccountData(k, data, ctx.slot); + this.handleAccountChange( + new PublicKey(k), + data, + ctx, + buffer, + accountProps + ); + }); + } + await this.multiSubscriber.addAccounts(allPks); + } + + this.pendingAddKeys.clear(); + this.debounceTimer = undefined; + } +} diff --git a/sdk/src/driftClient.ts b/sdk/src/driftClient.ts index 4c71a49ab..010555604 100644 --- a/sdk/src/driftClient.ts +++ b/sdk/src/driftClient.ts @@ -368,6 +368,8 @@ export class DriftClient { resubTimeoutMs: config.accountSubscription?.resubTimeoutMs, logResubMessages: config.accountSubscription?.logResubMessages, grpcConfigs: config.accountSubscription?.grpcConfigs, + grpcMultiUserAccountSubscriber: + config.accountSubscription?.grpcMultiUserAccountSubscriber, }; this.userStatsAccountSubscriptionConfig = { type: 'grpc', diff --git a/sdk/src/driftClientConfig.ts b/sdk/src/driftClientConfig.ts index ceb7b2603..e80534453 100644 --- a/sdk/src/driftClientConfig.ts +++ b/sdk/src/driftClientConfig.ts @@ -24,6 +24,7 @@ import { WebSocketDriftClientAccountSubscriberV2 } from './accounts/webSocketDri import { WebSocketDriftClientAccountSubscriber } from './accounts/webSocketDriftClientAccountSubscriber'; import { grpcDriftClientAccountSubscriberV2 } from './accounts/grpcDriftClientAccountSubscriberV2'; import { grpcDriftClientAccountSubscriber } from './accounts/grpcDriftClientAccountSubscriber'; +import { grpcMultiUserAccountSubscriber } from './accounts/grpcMultiUserAccountSubscriber'; export type DriftClientConfig = { connection: Connection; @@ -73,6 +74,7 @@ export type DriftClientSubscriptionConfig = ) => | grpcDriftClientAccountSubscriberV2 | grpcDriftClientAccountSubscriber; + grpcMultiUserAccountSubscriber?: grpcMultiUserAccountSubscriber; } | { type: 'websocket'; diff --git a/sdk/src/user.ts b/sdk/src/user.ts index 92512243f..1c3e0730f 100644 --- a/sdk/src/user.ts +++ b/sdk/src/user.ts @@ -137,15 +137,22 @@ export class User { } else if (config.accountSubscription?.type === 'custom') { this.accountSubscriber = config.accountSubscription.userAccountSubscriber; } else if (config.accountSubscription?.type === 'grpc') { - this.accountSubscriber = new grpcUserAccountSubscriber( - config.accountSubscription.grpcConfigs, - config.driftClient.program, - config.userAccountPublicKey, - { - resubTimeoutMs: config.accountSubscription?.resubTimeoutMs, - logResubMessages: config.accountSubscription?.logResubMessages, - } - ); + if (config.accountSubscription.grpcMultiUserAccountSubscriber) { + this.accountSubscriber = + config.accountSubscription.grpcMultiUserAccountSubscriber.forUser( + config.userAccountPublicKey + ); + } else { + this.accountSubscriber = new grpcUserAccountSubscriber( + config.accountSubscription.grpcConfigs, + config.driftClient.program, + config.userAccountPublicKey, + { + resubTimeoutMs: config.accountSubscription?.resubTimeoutMs, + logResubMessages: config.accountSubscription?.logResubMessages, + } + ); + } } else { if ( config.accountSubscription?.type === 'websocket' && diff --git a/sdk/src/userConfig.ts b/sdk/src/userConfig.ts index d5ba2147b..9067833a7 100644 --- a/sdk/src/userConfig.ts +++ b/sdk/src/userConfig.ts @@ -4,6 +4,7 @@ import { BulkAccountLoader } from './accounts/bulkAccountLoader'; import { GrpcConfigs, UserAccountSubscriber } from './accounts/types'; import { WebSocketProgramAccountSubscriber } from './accounts/webSocketProgramAccountSubscriber'; import { UserAccount } from './types'; +import { grpcMultiUserAccountSubscriber } from './accounts/grpcMultiUserAccountSubscriber'; export type UserConfig = { accountSubscription?: UserSubscriptionConfig; @@ -17,6 +18,7 @@ export type UserSubscriptionConfig = resubTimeoutMs?: number; logResubMessages?: boolean; grpcConfigs: GrpcConfigs; + grpcMultiUserAccountSubscriber?: grpcMultiUserAccountSubscriber; } | { type: 'websocket';