@@ -48,6 +48,10 @@ var common = require('./common');
48
48
var async = require ( 'async' ) ;
49
49
var uuid = require ( 'uuid' ) ;
50
50
var pg = require ( 'pg' ) ;
51
+
52
+ // empty import/invocation of the keepalive fix for node-postgres module
53
+ require ( 'pg-ka-fix' ) ( ) ;
54
+
51
55
var upgrade = require ( './upgrades' ) ;
52
56
53
57
String . prototype . shortenPrefix = function ( ) {
@@ -362,8 +366,9 @@ exports.handler = function (event, context) {
362
366
// add the file to the pending batch
363
367
dynamoDB . updateItem ( item , function ( err , data ) {
364
368
if ( err ) {
369
+ var waitFor = Math . max ( Math . pow ( tryNumber , 2 ) * 10 , 200 ) ;
370
+
365
371
if ( err . code === provisionedThroughputExceeded ) {
366
- var waitFor = Math . max ( Math . pow ( tryNumber , 2 ) * 10 , 200 ) ;
367
372
console . log ( "Provisioned Throughput Exceeded on addition of " + s3info . prefix + " to pending batch " + thisBatchId + ". Trying again in " + waitFor + " ms" ) ;
368
373
setTimeout ( callback , waitFor ) ;
369
374
} else if ( err . code === conditionCheckFailed ) {
@@ -379,6 +384,7 @@ exports.handler = function (event, context) {
379
384
}
380
385
} ,
381
386
TableName : configTable ,
387
+ /* we need a consistent read here to ensure we get the latest batch ID */
382
388
ConsistentRead : true
383
389
} ;
384
390
dynamoDB . getItem ( configReloadRequest , function ( err , data ) {
@@ -392,26 +398,23 @@ exports.handler = function (event, context) {
392
398
callback ( err ) ;
393
399
}
394
400
} else {
395
- /*
396
- * reset the batch ID to the
397
- * current marked batch
398
- */
399
- thisBatchId = data . Item . currentBatch . S ;
400
-
401
- /*
402
- * we've not set proceed to
403
- * true, so async will retry
404
- */
405
- console . log ( "Reload of Configuration Complete after attempting to write to Locked Batch " + thisBatchId + ". Attempt " + configReloads ) ;
406
-
407
- /*
408
- * we can call into the callback
409
- * immediately, as we probably
410
- * just missed the pending batch
411
- * processor's rotate of the
412
- * configuration batch ID
413
- */
414
- callback ( ) ;
401
+ if ( data . Item . currentBatch . S === thisBatchId ) {
402
+ // we've obtained the same batch ID back from the configuration as we have now, meaning it hasn't yet rotated
403
+ console . log ( "Batch " + thisBatchId + " still current after configuration reload attempt " + configReloads + ". Recycling in " + waitFor + " ms." ) ;
404
+
405
+ // because the batch hasn't been reloaded on the configuration, we'll backoff here for a moment to let that happen
406
+ setTimeout ( callback , waitFor ) ;
407
+ } else {
408
+ // we've got an updated batch id, so use this in the next cycle of file add
409
+ thisBatchId = data . Item . currentBatch . S ;
410
+
411
+ console . log ( "Obtained new Batch ID " + thisBatchId + " after configuration reload. Attempt " + configReloads ) ;
412
+
413
+ /*
414
+ callback immediately, as we should now have a valid and open batch to use
415
+ */
416
+ callback ( ) ;
417
+ }
415
418
}
416
419
} ) ;
417
420
} else {
0 commit comments