5
5
ProductTypeElement ,
6
6
SumType ,
7
7
SumTypeVariant ,
8
+ type ComparablePrimitive ,
8
9
} from './algebraic_type.ts' ;
9
10
import {
10
11
AlgebraicValue ,
@@ -15,7 +16,13 @@ import {
15
16
} from './algebraic_value.ts' ;
16
17
import BinaryReader from './binary_reader.ts' ;
17
18
import BinaryWriter from './binary_writer.ts' ;
18
- import * as ws from './client_api/index.ts' ;
19
+ import { BsatnRowList } from './client_api/bsatn_row_list_type.ts' ;
20
+ import { ClientMessage } from './client_api/client_message_type.ts' ;
21
+ import { DatabaseUpdate } from './client_api/database_update_type.ts' ;
22
+ import { QueryUpdate } from './client_api/query_update_type.ts' ;
23
+ import { ServerMessage } from './client_api/server_message_type.ts' ;
24
+ import { TableUpdate as RawTableUpdate } from './client_api/table_update_type.ts' ;
25
+ import type * as clientApi from './client_api/index.ts' ;
19
26
import { ClientCache } from './client_cache.ts' ;
20
27
import { DbConnectionBuilder } from './db_connection_builder.ts' ;
21
28
import { type DbContext } from './db_context.ts' ;
@@ -41,7 +48,7 @@ import {
41
48
TableCache ,
42
49
type Operation ,
43
50
type PendingCallback ,
44
- type TableUpdate ,
51
+ type TableUpdate as CacheTableUpdate ,
45
52
} from './table_cache.ts' ;
46
53
import { deepEqual , toPascalCase } from './utils.ts' ;
47
54
import { WebsocketDecompressAdapter } from './websocket_decompress_adapter.ts' ;
@@ -53,7 +60,8 @@ import {
53
60
type SubscribeEvent ,
54
61
} from './subscription_builder_impl.ts' ;
55
62
import { stdbLogger } from './logger.ts' ;
56
- import type { ReducerRuntimeTypeInfo } from './spacetime_module.ts' ;
63
+ import { type ReducerRuntimeTypeInfo } from './spacetime_module.ts' ;
64
+ import { fromByteArray } from 'base64-js' ;
57
65
58
66
export {
59
67
AlgebraicType ,
@@ -273,7 +281,7 @@ export class DbConnectionImpl<
273
281
emitter : handleEmitter ,
274
282
} ) ;
275
283
this . #sendMessage(
276
- ws . ClientMessage . SubscribeMulti ( {
284
+ ClientMessage . SubscribeMulti ( {
277
285
queryStrings : querySql ,
278
286
queryId : { id : queryId } ,
279
287
// The TypeScript SDK doesn't currently track `request_id`s,
@@ -286,7 +294,7 @@ export class DbConnectionImpl<
286
294
287
295
unregisterSubscription ( queryId : number ) : void {
288
296
this . #sendMessage(
289
- ws . ClientMessage . UnsubscribeMulti ( {
297
+ ClientMessage . UnsubscribeMulti ( {
290
298
queryId : { id : queryId } ,
291
299
// The TypeScript SDK doesn't currently track `request_id`s,
292
300
// so always use 0.
@@ -297,25 +305,38 @@ export class DbConnectionImpl<
297
305
298
306
// This function is async because we decompress the message async
299
307
async #processParsedMessage(
300
- message : ws . ServerMessage
308
+ message : ServerMessage
301
309
) : Promise < Message | undefined > {
302
310
const parseRowList = (
303
311
type : 'insert' | 'delete' ,
304
312
tableName : string ,
305
- rowList : ws . BsatnRowList
313
+ rowList : BsatnRowList
306
314
) : Operation [ ] => {
307
315
const buffer = rowList . rowsData ;
308
316
const reader = new BinaryReader ( buffer ) ;
309
- const rows : any [ ] = [ ] ;
317
+ const rows : Operation [ ] = [ ] ;
310
318
const rowType = this . #remoteModule. tables [ tableName ] ! . rowType ;
319
+ const primaryKeyInfo =
320
+ this . #remoteModule. tables [ tableName ] ! . primaryKeyInfo ;
311
321
while ( reader . offset < buffer . length + buffer . byteOffset ) {
312
322
const initialOffset = reader . offset ;
313
323
const row = rowType . deserialize ( reader ) ;
314
- // This is super inefficient, but the buffer indexes are weird, so we are doing this for now.
315
- // We should just base64 encode the bytes.
316
- const rowId = JSON . stringify ( row , ( _ , v ) =>
317
- typeof v === 'bigint' ? v . toString ( ) : v
318
- ) ;
324
+ let rowId : ComparablePrimitive | undefined = undefined ;
325
+ if ( primaryKeyInfo !== undefined ) {
326
+ rowId = primaryKeyInfo . colType . intoMapKey (
327
+ row [ primaryKeyInfo . colName ]
328
+ ) ;
329
+ } else {
330
+ // Get a view of the bytes for this row.
331
+ const rowBytes = buffer . subarray (
332
+ initialOffset - buffer . byteOffset ,
333
+ reader . offset - buffer . byteOffset
334
+ ) ;
335
+ // Convert it to a base64 string, so we can use it as a map key.
336
+ const asBase64 = fromByteArray ( rowBytes ) ;
337
+ rowId = asBase64 ;
338
+ }
339
+
319
340
rows . push ( {
320
341
type,
321
342
rowId,
@@ -326,15 +347,15 @@ export class DbConnectionImpl<
326
347
} ;
327
348
328
349
const parseTableUpdate = async (
329
- rawTableUpdate : ws . TableUpdate
330
- ) : Promise < TableUpdate > => {
350
+ rawTableUpdate : RawTableUpdate
351
+ ) : Promise < CacheTableUpdate > => {
331
352
const tableName = rawTableUpdate . tableName ;
332
353
let operations : Operation [ ] = [ ] ;
333
354
for ( const update of rawTableUpdate . updates ) {
334
- let decompressed : ws . QueryUpdate ;
355
+ let decompressed : QueryUpdate ;
335
356
if ( update . tag === 'Gzip' ) {
336
357
const decompressedBuffer = await decompress ( update . value , 'gzip' ) ;
337
- decompressed = ws . QueryUpdate . deserialize (
358
+ decompressed = QueryUpdate . deserialize (
338
359
new BinaryReader ( decompressedBuffer )
339
360
) ;
340
361
} else if ( update . tag === 'Brotli' ) {
@@ -358,9 +379,9 @@ export class DbConnectionImpl<
358
379
} ;
359
380
360
381
const parseDatabaseUpdate = async (
361
- dbUpdate : ws . DatabaseUpdate
362
- ) : Promise < TableUpdate [ ] > => {
363
- const tableUpdates : TableUpdate [ ] = [ ] ;
382
+ dbUpdate : DatabaseUpdate
383
+ ) : Promise < CacheTableUpdate [ ] > => {
384
+ const tableUpdates : CacheTableUpdate [ ] = [ ] ;
364
385
for ( const rawTableUpdate of dbUpdate . tables ) {
365
386
tableUpdates . push ( await parseTableUpdate ( rawTableUpdate ) ) ;
366
387
}
@@ -398,7 +419,7 @@ export class DbConnectionImpl<
398
419
const args = txUpdate . reducerCall . args ;
399
420
const energyQuantaUsed = txUpdate . energyQuantaUsed ;
400
421
401
- let tableUpdates : TableUpdate [ ] ;
422
+ let tableUpdates : CacheTableUpdate [ ] ;
402
423
let errMessage = '' ;
403
424
switch ( txUpdate . status . tag ) {
404
425
case 'Committed' :
@@ -498,11 +519,11 @@ export class DbConnectionImpl<
498
519
}
499
520
}
500
521
501
- #sendMessage( message : ws . ClientMessage ) : void {
522
+ #sendMessage( message : ClientMessage ) : void {
502
523
this . wsPromise . then ( wsResolved => {
503
524
if ( wsResolved ) {
504
525
const writer = new BinaryWriter ( 1024 ) ;
505
- ws . ClientMessage . serialize ( writer , message ) ;
526
+ ClientMessage . serialize ( writer , message ) ;
506
527
const encoded = writer . getBuffer ( ) ;
507
528
wsResolved . send ( encoded ) ;
508
529
}
@@ -517,24 +538,28 @@ export class DbConnectionImpl<
517
538
}
518
539
519
540
#applyTableUpdates(
520
- tableUpdates : TableUpdate [ ] ,
541
+ tableUpdates : CacheTableUpdate [ ] ,
521
542
eventContext : EventContextInterface
522
543
) : PendingCallback [ ] {
523
- const pendingCallbacks : PendingCallback [ ] = [ ] ;
544
+ let pendingCallbacks : PendingCallback [ ] = [ ] ;
524
545
for ( let tableUpdate of tableUpdates ) {
525
546
// Get table information for the table being updated
526
547
const tableName = tableUpdate . tableName ;
527
548
const tableTypeInfo = this . #remoteModule. tables [ tableName ] ! ;
528
549
const table = this . clientCache . getOrCreateTable ( tableTypeInfo ) ;
529
- pendingCallbacks . push (
530
- ...table . applyOperations ( tableUpdate . operations , eventContext )
550
+ const newCallbacks = table . applyOperations (
551
+ tableUpdate . operations ,
552
+ eventContext
531
553
) ;
554
+ for ( const callback of newCallbacks ) {
555
+ pendingCallbacks . push ( callback ) ;
556
+ }
532
557
}
533
558
return pendingCallbacks ;
534
559
}
535
560
536
561
async #processMessage( data : Uint8Array ) : Promise < void > {
537
- const serverMessage = parseValue ( ws . ServerMessage , data ) ;
562
+ const serverMessage = parseValue ( ServerMessage , data ) ;
538
563
const message = await this . #processParsedMessage( serverMessage ) ;
539
564
if ( ! message ) {
540
565
return ;
@@ -788,7 +813,7 @@ export class DbConnectionImpl<
788
813
argsBuffer : Uint8Array ,
789
814
flags : CallReducerFlags
790
815
) : void {
791
- const message = ws . ClientMessage . CallReducer ( {
816
+ const message = ClientMessage . CallReducer ( {
792
817
reducer : reducerName ,
793
818
args : argsBuffer ,
794
819
// The TypeScript SDK doesn't currently track `request_id`s,
0 commit comments