From 042528dfa280854f2bf6e1550f1182522a082d01 Mon Sep 17 00:00:00 2001 From: Armagan Amcalar Date: Mon, 16 Nov 2020 11:36:08 +0100 Subject: [PATCH 1/4] Give travis more heap for builds --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index 3cd46da..52fbff7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,6 @@ language: node_js +env: + - NODE_OPTIONS=–-max_old_space_size=4096 matrix: include: - node_js: '10' From cea67d1a9c1f7d3ab457a3aa56ec073d7fa9ce15 Mon Sep 17 00:00:00 2001 From: OtotheA Date: Sun, 5 Feb 2023 14:33:55 -0600 Subject: [PATCH 2/4] add logic to track messages and gracefully close --- examples/requester.js | 14 +++++++ examples/responder.js | 14 +++++++ src/components/component.js | 24 +++++++++++- src/components/requester.js | 15 ++++++- src/components/responder.js | 13 ++++++- test/component-lifecycle.js | 78 +++++++++++++++++++++++++++++++++++++ 6 files changed, 154 insertions(+), 4 deletions(-) diff --git a/examples/requester.js b/examples/requester.js index b73c700..71eed83 100644 --- a/examples/requester.js +++ b/examples/requester.js @@ -32,3 +32,17 @@ function makeRequest() { makeRequest(); setInterval(makeRequest, 5000); + +// Gracefully close responder after it completes any pending messages +process.once('SIGINT', () => { + console.log('closing, press ctrl+c again to force exit'); + randomRequest.close(() => { + console.log('exiting'); + process.exit(); + }); + + process.once('SIGINT', () => { + console.log('forced exit'); + process.exit(); + }); +}); diff --git a/examples/responder.js b/examples/responder.js index ded6b1c..11ff31f 100644 --- a/examples/responder.js +++ b/examples/responder.js @@ -27,3 +27,17 @@ randomResponder.on('promised request', function(req) { // reject(answer); }); }); + +// Gracefully close responder after it completes any pending messages +process.once('SIGINT', () => { + console.log('closing, press ctrl+c again to force exit'); + randomResponder.close(() => { + console.log('exiting'); + process.exit(); + }); + + process.once('SIGINT', () => { + console.log('forced exit'); + process.exit(); + }); +}); diff --git a/src/components/component.js b/src/components/component.js index 0d0b52e..872701a 100644 --- a/src/components/component.js +++ b/src/components/component.js @@ -48,8 +48,28 @@ module.exports = class Component extends EventEmitter { onRemoved() { }; - close() { - this.sock && this.sock.close(); + close(cb) { this.discovery && this.discovery.stop(); + + if (typeof cb === 'function') { + const interval = setInterval(() => { + if (!this.messageIds || this.messageIds.length === 0) { + if (this.sock) { + if (this.sock.server) { + this.sock.close(cb); + } else { + this.sock.close(); + cb(); + } + } else { + cb(); + } + clearInterval(interval); + } + }, 100); + return; + } else { + this.sock && this.sock.close(); + } } }; diff --git a/src/components/requester.js b/src/components/requester.js index 63ae540..35e30e5 100644 --- a/src/components/requester.js +++ b/src/components/requester.js @@ -3,6 +3,7 @@ const Monitorable = require('./monitorable'); const Component = require('./component'); const axon = require('@dashersw/axon'); const debug = require('debug')('axon:req'); +const uuid = require('uuid'); const SUBSET_IDENTIFIER = '__subset'; @@ -11,6 +12,8 @@ module.exports = class Requester extends Monitorable(Configurable(Component)) { constructor(advertisement, discoveryOptions) { super(advertisement, discoveryOptions); + this.messageIds = []; + this.sock = new axon.types[this.type](); this.sock.set('retry timeout', 0); this.timeout = advertisement.timeout || process.env.COTE_REQUEST_TIMEOUT; @@ -98,10 +101,20 @@ module.exports = class Requester extends Monitorable(Configurable(Component)) { const hasCallback = typeof args[args.length - 1] == 'function'; const timeout = args[0].__timeout || this.timeout; - if (hasCallback) return sendOverSocket(this.sock, timeout, ...args); + const cb = hasCallback && args.pop(); + const messageId = uuid.v4(); + this.messageIds.push(messageId); + const cbWithCounter = (...cbArgs) => { + cb && cb(...cbArgs); + const index = this.messageIds.indexOf(messageId); + index >= 0 && this.messageIds.splice(index, 1); + }; + + if (hasCallback) return sendOverSocket(this.sock, timeout, ...args, cbWithCounter); return new Promise((resolve, reject) => { sendOverSocket(this.sock, timeout, ...args, (err, res) => { + cbWithCounter(); if (err) return reject(err); resolve(res); }); diff --git a/src/components/responder.js b/src/components/responder.js index cc20330..5218a3a 100644 --- a/src/components/responder.js +++ b/src/components/responder.js @@ -2,6 +2,7 @@ const axon = require('@dashersw/axon'); const portfinder = require('portfinder'); const Configurable = require('./configurable'); const Component = require('./component'); +const uuid = require('uuid'); // eslint-disable-next-line const colors = require('colors'); @@ -10,6 +11,8 @@ module.exports = class Responder extends Configurable(Component) { constructor(advertisement, discoveryOptions) { super(advertisement, discoveryOptions); + this.messageIds = []; + this.sock = new axon.types[this.type](); this.sock.on('bind', () => this.startDiscovery()); @@ -20,7 +23,15 @@ module.exports = class Responder extends Configurable(Component) { this.discovery.log([this.advertisement.name, '>', `No listeners found for event: ${req.type}`.yellow]); } - this.emit(req.type, req, cb); + const messageId = uuid.v4(); + this.messageIds.push(messageId); + const cbWithCounter = (...cbArgs) => { + cb(...cbArgs); + const index = this.messageIds.indexOf(messageId); + index >= 0 && this.messageIds.splice(index, 1); + }; + + this.emit(req.type, req, cbWithCounter); }); const onPort = (err, port) => { diff --git a/test/component-lifecycle.js b/test/component-lifecycle.js index 3a24d2b..d68b251 100644 --- a/test/component-lifecycle.js +++ b/test/component-lifecycle.js @@ -7,6 +7,27 @@ const { Requester, Responder } = require('../')({ environment }); LogSuppress.init(console); +const setup = (t) => { + const key = r.generate(); + + const requester = new Requester({ name: `${t.title}: ${key} requester`, key }); + const responder = new Responder({ name: `${t.title}: ${key} responder`, key }); + + let i = 1; + responder.on('ping', (req, cb) => { + i++; + setTimeout(() => { + cb(null, 'pong'); + }, 2000 * i); + }); + + for (let j = 0; j < 3; j++) { + requester.send({ type: 'ping' }); + } + + return { requester, responder }; +}; + test('Instantiate a requester', (t) => { const key = r.generate(); const requester = new Requester({ name: `${t.title}: requester`, key }); @@ -29,3 +50,60 @@ test.cb('Discover and close a requester', (t) => { t.end(); }); }); + +test.cb(`Component should call close callback if no sock`, (t) => { + const key = r.generate(); + const requester = new Requester({ name: `${t.title}: ${key} requester`, key }); + + setTimeout(() => { + requester.sock = null; + requester.close(() => { + t.is(requester.sock, null); + t.end(); + }); + }, 1000); +}); + +test.cb(`Requester should close socket immediately if no callback`, (t) => { + const { requester } = setup(t); + + setTimeout(() => { + requester.close(); + t.is(requester.messageIds.length, 3); + t.end(); + }, 1000); +}); + +test.cb(`Requester should wait for all messages to complete before close callback`, (t) => { + const { requester } = setup(t); + + setTimeout(() => { + requester.close(() => { + t.is(requester.messageIds.length, 0); + t.end(); + }); + t.is(requester.messageIds.length, 3); + }, 1000); +}); + +test.cb(`Responder should close socket immediately if no callback`, (t) => { + const { responder } = setup(t); + + setTimeout(() => { + responder.close(); + t.is(responder.messageIds.length, 3); + t.end(); + }, 1000); +}); + +test.cb(`Responder should wait for all messages to complete before close callback`, (t) => { + const { responder } = setup(t); + + setTimeout(() => { + responder.close(() => { + t.is(responder.messageIds.length, 0); + t.end(); + }); + t.is(responder.messageIds.length, 3); + }, 1000); +}); From 3a121acc061876daaa26ac7e3f59651be17a2955 Mon Sep 17 00:00:00 2001 From: OtotheA Date: Mon, 6 Feb 2023 22:08:09 -0600 Subject: [PATCH 3/4] remove socket from requester when responder closing --- src/components/requester.js | 19 ++++++++++++++++++- src/components/responder.js | 12 ++++++++++++ test/component-lifecycle.js | 25 +++++++++++++------------ 3 files changed, 43 insertions(+), 13 deletions(-) diff --git a/src/components/requester.js b/src/components/requester.js index 35e30e5..81f685c 100644 --- a/src/components/requester.js +++ b/src/components/requester.js @@ -94,7 +94,15 @@ module.exports = class Requester extends Monitorable(Configurable(Component)) { if (alreadyConnected) return; - this.sock.connect(obj.advertisement.port, address); + this.sock.connect(obj.advertisement.port, address, sock => { + // Add the closing callback for the responder + const key = `closing__${address}:${sock.localPort}`; + this.sock.callbacks[key] = () => { + // Remove the socket so it no longer sends messages to it + const sockIndex = this.sock.socks.findIndex(sock => `closing__${sock._sockname.address}:${sock.localPort}` === key); + sockIndex >= 0 && this.sock.socks.splice(sockIndex, 1); + }; + }); } send(...args) { @@ -121,6 +129,15 @@ module.exports = class Requester extends Monitorable(Configurable(Component)) { }); } + close(cb) { + if (cb) { + // Disable send so that it will stop queueing messages + this.send = () => {}; + } + + super.close(cb); + } + get type() { return 'req'; } diff --git a/src/components/responder.js b/src/components/responder.js index 5218a3a..d546528 100644 --- a/src/components/responder.js +++ b/src/components/responder.js @@ -67,6 +67,18 @@ module.exports = class Responder extends Configurable(Component) { }); } + close(cb) { + if (cb) { + // Send closing event to all requesters so they will stop sending messages to it + for (const sock of this.sock.socks) { + const key = `closing__${sock._peername.address}:${sock.remotePort}`; + sock.writable && sock.write(this.sock.pack([null, key])); + } + } + + super.close(cb); + } + get type() { return 'rep'; } diff --git a/test/component-lifecycle.js b/test/component-lifecycle.js index d68b251..72f877d 100644 --- a/test/component-lifecycle.js +++ b/test/component-lifecycle.js @@ -18,12 +18,13 @@ const setup = (t) => { i++; setTimeout(() => { cb(null, 'pong'); - }, 2000 * i); + }, 500 * i); }); - for (let j = 0; j < 3; j++) { + requester.send({ type: 'ping' }); + setInterval(() => { requester.send({ type: 'ping' }); - } + }, 500); return { requester, responder }; }; @@ -61,7 +62,7 @@ test.cb(`Component should call close callback if no sock`, (t) => { t.is(requester.sock, null); t.end(); }); - }, 1000); + }, 750); }); test.cb(`Requester should close socket immediately if no callback`, (t) => { @@ -69,9 +70,9 @@ test.cb(`Requester should close socket immediately if no callback`, (t) => { setTimeout(() => { requester.close(); - t.is(requester.messageIds.length, 3); + t.is(requester.messageIds.length, 2); t.end(); - }, 1000); + }, 750); }); test.cb(`Requester should wait for all messages to complete before close callback`, (t) => { @@ -82,8 +83,8 @@ test.cb(`Requester should wait for all messages to complete before close callbac t.is(requester.messageIds.length, 0); t.end(); }); - t.is(requester.messageIds.length, 3); - }, 1000); + t.is(requester.messageIds.length, 2); + }, 750); }); test.cb(`Responder should close socket immediately if no callback`, (t) => { @@ -91,9 +92,9 @@ test.cb(`Responder should close socket immediately if no callback`, (t) => { setTimeout(() => { responder.close(); - t.is(responder.messageIds.length, 3); + t.is(responder.messageIds.length, 2); t.end(); - }, 1000); + }, 750); }); test.cb(`Responder should wait for all messages to complete before close callback`, (t) => { @@ -104,6 +105,6 @@ test.cb(`Responder should wait for all messages to complete before close callbac t.is(responder.messageIds.length, 0); t.end(); }); - t.is(responder.messageIds.length, 3); - }, 1000); + t.is(responder.messageIds.length, 2); + }, 750); }); From 0a98393ef262b7c2afa8554b664a1d9dd65ba665 Mon Sep 17 00:00:00 2001 From: OtotheA Date: Tue, 7 Feb 2023 09:56:49 -0600 Subject: [PATCH 4/4] fix --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 52fbff7..28afe76 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: node_js env: - - NODE_OPTIONS=–-max_old_space_size=4096 + - NODE_OPTIONS=--max_old_space_size=4096 matrix: include: - node_js: '10'