@@ -5,6 +5,8 @@ import { READ_ONLY_TXN, ALREADY_FINISHED } from './errors';
5
5
6
6
const log = debug ( 'dgraph-js-native:txn' ) ;
7
7
8
+ const POLL_TIMEOUT = Number ( process . env . DGRAPH_JS_NATIVE_POLL_TIMEOUT || 5000 ) ;
9
+
8
10
export type TxnOptions = {
9
11
readOnly ?: boolean ;
10
12
bestEffort ?: boolean ;
@@ -15,13 +17,12 @@ export class Txn {
15
17
private responses : { [ key : string ] : [ ( resp : Response ) => void , ( err : Error ) => void ] } ;
16
18
private finished : boolean ;
17
19
private immediate : NodeJS . Immediate ;
20
+ private emptyTimestamp : number ;
18
21
19
22
constructor ( txn : QueryTxn ) {
20
23
this . txn = txn ;
21
24
this . responses = { } ;
22
25
this . finished = false ;
23
-
24
- this . startPolling ( ) ;
25
26
}
26
27
27
28
private loop ( ) : void {
@@ -58,9 +59,19 @@ export class Txn {
58
59
}
59
60
60
61
private startPolling ( ) : void {
62
+ if ( Object . keys ( this . responses ) . length > 0 ) {
63
+ this . emptyTimestamp = Date . now ( ) ;
64
+ }
65
+
66
+ if ( Date . now ( ) - this . emptyTimestamp >= POLL_TIMEOUT ) {
67
+ log ( `no more new responses come in after ${ POLL_TIMEOUT } ms, stopping the loop` ) ;
68
+ this . finished = true ;
69
+ }
70
+
61
71
if ( ! this . finished ) {
62
72
this . immediate = setImmediate ( this . loop . bind ( this ) ) ;
63
73
} else {
74
+ log ( 'stopping txn poll' ) ;
64
75
clearImmediate ( this . immediate ) ;
65
76
}
66
77
}
@@ -176,8 +187,14 @@ export class Txn {
176
187
private checkIsFinished ( ) : Promise < void > {
177
188
if ( this . finished ) {
178
189
return Promise . reject ( ALREADY_FINISHED ) ;
190
+ } else {
191
+ if ( ! this . immediate ) {
192
+ log ( 'starting to poll responses' ) ;
193
+ this . emptyTimestamp = Date . now ( ) ;
194
+ this . startPolling ( ) ;
195
+ }
196
+ return Promise . resolve ( ) ;
179
197
}
180
- return Promise . resolve ( ) ;
181
198
}
182
199
183
200
private isMutated ( txn : QueryTxn | MutateTxn ) : txn is MutateTxn {
0 commit comments