From 1a609a8aedf8c9debda979f4bf49af2d9b0e92d8 Mon Sep 17 00:00:00 2001 From: Zach Wily Date: Fri, 18 Jul 2014 22:22:10 -0600 Subject: [PATCH 01/14] Don't crash when passed a null client id Sometimes a client can get confused, and start sending messages with null client ids. This just avoids crashing when that happens. --- lib/faye-redis-sharded.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/faye-redis-sharded.js b/lib/faye-redis-sharded.js index 463466f..82cf71f 100644 --- a/lib/faye-redis-sharded.js +++ b/lib/faye-redis-sharded.js @@ -143,6 +143,11 @@ Engine.prototype = { }, clientExists:function (clientId, callback, context) { + if (!clientId) { + callback.call(context, false); + return; + } + var redis = this._getShard(clientId).redis; redis.zscore(this._ns + '/clients', clientId, function (error, score) { callback.call(context, score !== null); From 4313d71c3919c752f6f2a4d908d5a37259c8dd1e Mon Sep 17 00:00:00 2001 From: Zach Wily Date: Tue, 29 Jul 2014 16:08:03 -0600 Subject: [PATCH 02/14] bump redis to 0.11.x --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index fa18293..b3a2987 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,7 @@ }, "main":"./index", "dependencies":{ - "redis":"0.8.x" + "redis":"0.11.x" }, "licenses":[{ "type":"MIT", From e2f6b5f7304fba446c451c7bdc1da278c79ef0fe Mon Sep 17 00:00:00 2001 From: Zach Wily Date: Tue, 29 Jul 2014 16:08:15 -0600 Subject: [PATCH 03/14] prevent a crash when redis servers go away You need to provide callbacks to redis commands or else redis will throw an error (killing your process) when something goes wrong. --- lib/faye-redis-sharded.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/faye-redis-sharded.js b/lib/faye-redis-sharded.js index 82cf71f..2e7988a 100644 --- a/lib/faye-redis-sharded.js +++ b/lib/faye-redis-sharded.js @@ -327,7 +327,11 @@ Engine.prototype = { }); multi.del(key); - multi.exec(); + multi.exec(function(err) { + if (err) { + self._server.error('redis error from exec: ?', err.message); + } + }); }, gc:function () { From f2cc87dfd76b69e6efb80c0c25268fe71d0397ce Mon Sep 17 00:00:00 2001 From: Zach Wily Date: Fri, 1 Aug 2014 21:08:57 -0600 Subject: [PATCH 04/14] Add runner for Faye engine tests --- .gitmodules | 3 ++ .travis.yml | 13 ++++++++ Makefile | 5 +++ README.md | 7 +++++ lib/faye-redis-sharded.js | 3 ++ package.json | 49 ++++++++++++++++++----------- spec/faye_redis_sharded_spec.js | 56 +++++++++++++++++++++++++++++++++ spec/runner.js | 7 +++++ vendor/faye | 1 + 9 files changed, 126 insertions(+), 18 deletions(-) create mode 100644 .gitmodules create mode 100644 .travis.yml create mode 100644 Makefile create mode 100644 spec/faye_redis_sharded_spec.js create mode 100644 spec/runner.js create mode 160000 vendor/faye diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..b59fa91 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "vendor/faye"] + path = vendor/faye + url = git@github.com:zwily/faye.git diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..8093f13 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,13 @@ +language: node_js + +node_js: + - "0.6" + - "0.8" + - "0.10" + - "0.11" + +services: + - redis-server + +before_script: + - make diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..c7944b7 --- /dev/null +++ b/Makefile @@ -0,0 +1,5 @@ +prepare: + git submodule update --init --recursive + cd vendor/faye && npm install + cd vendor/faye && ./node_modules/.bin/wake + npm install diff --git a/README.md b/README.md index ea52715..d139afa 100644 --- a/README.md +++ b/README.md @@ -117,6 +117,13 @@ one server. 4. Ability to provide a custom shard manager so users can add their own implementations. 5. Ability to perform resharding of keys when hosts change +## Running Tests + +```bash +$ make +$ npm test +``` + ## License (The MIT License) diff --git a/lib/faye-redis-sharded.js b/lib/faye-redis-sharded.js index 2e7988a..5845bdd 100644 --- a/lib/faye-redis-sharded.js +++ b/lib/faye-redis-sharded.js @@ -125,6 +125,9 @@ Engine.prototype = { } } + this._server.unbind('connection:open'); + this._server.unbind('connection:close'); + clearInterval(this._gc); this._shardManagers.forEach(function (shardManager) { diff --git a/package.json b/package.json index b3a2987..fb036b1 100644 --- a/package.json +++ b/package.json @@ -1,22 +1,35 @@ { - "name":"faye-redis-sharded", - "description":"Redis backend engine for Faye with support for sharding", - "author":"Myspace", - "keywords":["faye", "faye-redis", "pubsub", "bayeux"], - "version":"0.2.4", - "engines":{ - "node":">=0.4.0" + "name": "faye-redis-sharded", + "description": "Redis backend engine for Faye with support for sharding", + "author": "Myspace", + "keywords": [ + "faye", + "faye-redis", + "pubsub", + "bayeux" + ], + "version": "0.2.4", + "engines": { + "node": ">=0.4.0" }, - "main":"./index", - "dependencies":{ - "redis":"0.11.x" + "main": "./index", + "dependencies": { + "redis": "0.11.x" }, - "licenses":[{ - "type":"MIT", - "url":"http://www.opensource.org/licenses/mit-license.php" - }], - "repositories":[{ - "type":"git", - "url":"git@github.com:myspace/faye-redis-sharded-node.git" - }] + "licenses": [ + { + "type": "MIT", + "url": "http://www.opensource.org/licenses/mit-license.php" + } + ], + "repositories": [ + { + "type": "git", + "url": "git@github.com:myspace/faye-redis-sharded-node.git" + } + ], + "devDependencies": { + "jstest": "^1.0.5" + }, + "scripts": {"test": "node spec/runner.js"} } diff --git a/spec/faye_redis_sharded_spec.js b/spec/faye_redis_sharded_spec.js new file mode 100644 index 0000000..8960c93 --- /dev/null +++ b/spec/faye_redis_sharded_spec.js @@ -0,0 +1,56 @@ +var engine = require('../index') + +var REDIS_HOST = 'localhost', + REDIS_PORT = 6379, + NUM_SHARDS = 10; + +JS.Test.describe("Redis Sharded engine", function() { with(this) { + before(function() { + var shards = []; + for (var i = 1; i <= NUM_SHARDS; i++) { + shards.push({ + shardName: 'redis' + i, + host: REDIS_HOST, + port: REDIS_PORT, + database: i + }); + } + + this.engineOpts = { + type: engine, + shards: shards + }; + }) + + after(function(resume) { with(this) { + engine.disconnect() + var redis = require('redis').createClient(REDIS_PORT, REDIS_HOST, { no_ready_check: true }) + + var toClear = []; + for (var i = 1; i <= NUM_SHARDS; i++) { + toClear.push(i); + } + + var clear = function(remaining) { + var db = remaining.pop(); + redis.select(db, function() { + redis.flushdb(function() { + if (remaining.length > 0) { + clear(remaining); + } + else { + redis.end(); + resume(); + } + }); + }); + }; + clear(toClear); + }}); + + itShouldBehaveLike("faye engine") + + describe("distribution", function() { with(this) { + itShouldBehaveLike("distributed engine") + }}) +}}) diff --git a/spec/runner.js b/spec/runner.js new file mode 100644 index 0000000..de642a2 --- /dev/null +++ b/spec/runner.js @@ -0,0 +1,7 @@ +JS = require('jstest'); +Faye = require('../vendor/faye/build/node/faye-node'); + +require('../vendor/faye/spec/javascript/engine_spec'); +require('./faye_redis_sharded_spec') + +JS.Test.autorun({ test: null}) diff --git a/vendor/faye b/vendor/faye new file mode 160000 index 0000000..6fd706e --- /dev/null +++ b/vendor/faye @@ -0,0 +1 @@ +Subproject commit 6fd706e7674a9b31deb4b1e304524d720182e55d From 6a707bece3c654c13080dd2490b93fb5dc0ec7b4 Mon Sep 17 00:00:00 2001 From: Zach Wily Date: Fri, 1 Aug 2014 21:14:32 -0600 Subject: [PATCH 05/14] Fix faye submodule location --- .gitmodules | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitmodules b/.gitmodules index b59fa91..4bf8c84 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ [submodule "vendor/faye"] path = vendor/faye - url = git@github.com:zwily/faye.git + url = https://github.com/zwily/faye.git From bc27fedd5aa25b5bf2dce7cfda6423c986f5ddf6 Mon Sep 17 00:00:00 2001 From: Zach Wily Date: Fri, 1 Aug 2014 21:16:52 -0600 Subject: [PATCH 06/14] Don't pin jstest to a specific version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index fb036b1..4b38a9a 100644 --- a/package.json +++ b/package.json @@ -29,7 +29,7 @@ } ], "devDependencies": { - "jstest": "^1.0.5" + "jstest": "" }, "scripts": {"test": "node spec/runner.js"} } From b02caf41b5f5eae5edc6037b685f88ecb8306d19 Mon Sep 17 00:00:00 2001 From: Zach Wily Date: Fri, 1 Aug 2014 21:23:50 -0600 Subject: [PATCH 07/14] Remove some older node versions from Travis tests --- .travis.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 8093f13..8ec778f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,6 @@ language: node_js node_js: - - "0.6" - - "0.8" - "0.10" - "0.11" From 581f6c3b423eb607960d78aa21f3e0c8c9318255 Mon Sep 17 00:00:00 2001 From: Zach Wily Date: Fri, 1 Aug 2014 21:26:53 -0600 Subject: [PATCH 08/14] Add Travis build icon --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d139afa..f4ecdb2 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# faye-redis-sharded +# faye-redis-sharded [![Build Status](https://travis-ci.org/zwily/faye-redis-sharded-node.svg?branch=master)](https://travis-ci.org/zwily/faye-redis-sharded-node) This plugin provides a Redis-based backend for the [Faye](http://faye.jcoglan.com) messaging server. It allows a single Faye service to be distributed across many From 6365b817d233772e4157eb44ce298f909d9fa2f4 Mon Sep 17 00:00:00 2001 From: Zach Wily Date: Fri, 1 Aug 2014 22:22:51 -0600 Subject: [PATCH 09/14] Make sure clients only get one copy of each message. When a client is subscribed to channels like /foo/* and /foo/bar, and a message is published to /foo/bar, the client should only receive *one* message. Previously, we were sending the message for every shard each subscribed channel lived in. (So if /foo/* was in shard1 and /foo/bar was in shard2, the subscriber would get two copies.) This change collects all the client ids from all shards and only sends one message to each client. --- lib/faye-redis-sharded.js | 68 ++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 26 deletions(-) diff --git a/lib/faye-redis-sharded.js b/lib/faye-redis-sharded.js index 5845bdd..6c86a7d 100644 --- a/lib/faye-redis-sharded.js +++ b/lib/faye-redis-sharded.js @@ -260,12 +260,53 @@ Engine.prototype = { shardMap[shardName].channels.push(channel); }); + // We need to query all shards to get the clients subscribed to the given channels, + // and then dedupe those clientIds because each client should only get exactly + // *one* copy of the message. + + var allClients = {}; + var shardsToCheck = Object.keys(shardMap).length; + + function getClientsForChannels(message, channels, channelShard, done) { + var keys = channels.map(function (c) { + return self._ns + '/channels' + c; + }); + + // get all clients subscribed to the channels on that shard + channelShard.redis.sunion.apply(channelShard.redis, keys.concat(done)); + } + + function checkDone() { + shardsToCheck -= 1; + if (shardsToCheck == 0) { + var jsonMessage = JSON.stringify(message); + + Object.keys(allClients).forEach(function (clientId) { + var shard = self._getShard(clientId, shardManager), + redis = shard.redis; + self._server.debug('Queueing for client ?: ?', clientId, message); + redis.rpush(self._ns + '/clients/' + clientId + '/messages', jsonMessage, function (err, result) { + redis.publish(self._ns + '/' + clientId + '/notify', clientId); + self._server.debug('Published for client ? - ? - to server ?', clientId, message, shard.shardName); + }); + }); + } + } + Object.keys(shardMap).forEach(function (shardName) { var map = shardMap[shardName], shard = map.shard, channels = map.channels; - self._publish(message, channels, shard, shardManager); + getClientsForChannels(message, channels, shard, function(err, clients) { + if (err) return; + + clients.forEach(function(clientId) { + allClients[clientId] = true; + }); + + setTimeout(checkDone, 0); + }); }); } @@ -282,31 +323,6 @@ Engine.prototype = { this._server.trigger('publish', message.clientId, message.channel, message.data); }, - _publish:function (message, channels, channelShard, shardManager) { - var self = this, - jsonMessage = JSON.stringify(message), - keys = channels.map(function (c) { - return self._ns + '/channels' + c; - }); - - var notify = function (error, clients) { - if (error) return; - - clients.forEach(function (clientId) { - var shard = self._getShard(clientId, shardManager), - redis = shard.redis; - self._server.debug('Queueing for client ?: ?', clientId, message); - redis.rpush(self._ns + '/clients/' + clientId + '/messages', jsonMessage, function (err, result) { - redis.publish(self._ns + '/' + clientId + '/notify', clientId); - self._server.debug('Published for client ? - ? - to server ?', clientId, message, shard.shardName); - }); - }); - }; - - keys.push(notify); - channelShard.redis.sunion.apply(channelShard.redis, keys); // get all clients subscribed to the channels on that shard, then call notify on them - }, - emptyQueue:function (clientId) { if (!this._server.hasConnection(clientId)) { this._server.debug('Does not have connection for: ?', clientId); From 83fd18da072760ce89e0b3c74c9908cb65978022 Mon Sep 17 00:00:00 2001 From: Zach Wily Date: Fri, 1 Aug 2014 22:53:23 -0600 Subject: [PATCH 10/14] Trigger "close" in every engine when clients are disconnected due to timeout --- lib/faye-redis-sharded.js | 41 ++++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/lib/faye-redis-sharded.js b/lib/faye-redis-sharded.js index 6c86a7d..e69b349 100644 --- a/lib/faye-redis-sharded.js +++ b/lib/faye-redis-sharded.js @@ -30,22 +30,31 @@ var Engine = function (server, options) { var shardsList = []; shards.forEach(function (shard) { var client, - subscriber; + subscriber, + shardName = shard.shardName || (shard.host + ':' + shard.port), + closeChannel = self._ns + '/notifications/' + shardName + '/close'; client = connectRedis(shard, redisOptions, onRedisError); if (isPrimary) { subscriber = connectRedis(shard, redisOptions, onRedisError); subscriber.on('message', function (topic, message) { - self._server.debug('Got message for ?', message); - self.emptyQueue(message); + if (topic === closeChannel) { + self._server.debug('Got close for ?', message); + self._server.trigger('close', message); + } else { + self._server.debug('Got message for ?', message); + self.emptyQueue(message); + } }); + + subscriber.subscribe(closeChannel); } - var shardName = shard.shardName || (shard.host + ':' + shard.port); var newShard = { - redis:client, - subscriber:subscriber, - shardName:shardName + redis: client, + subscriber: subscriber, + shardName: shardName, + closeChannel: closeChannel }; shardsList.push({ shardName:shardName, shard:newShard}); @@ -192,18 +201,22 @@ Engine.prototype = { _afterSubscriptionsRemoved:function (clientId, callback, context) { var self = this, - redis = this._getShard(clientId).redis; - redis.del(this._ns + '/clients/' + clientId + '/messages', function (err) { + shard = this._getShard(clientId), + multi = shard.redis.multi(); + + multi.del(self._ns + '/clients/' + clientId + '/messages'); + multi.zrem(self._ns + '/clients', clientId); + multi.publish(shard.closeChannel, clientId); + + multi.exec(function(err, results) { if (err) { if (callback) callback.call(context); return; } - redis.zrem(self._ns + '/clients', clientId, function () { - self._server.debug('Destroyed client ?', clientId); - self._server.trigger('disconnect', clientId); - if (callback) callback.call(context); - }); + self._server.debug('Destroyed client ?', clientId); + self._server.trigger('disconnect', clientId); + if (callback) callback.call(context); }); }, From e9a79dccbff73a4d9f1771e065371e9844c6eb25 Mon Sep 17 00:00:00 2001 From: Zach Wily Date: Sat, 2 Aug 2014 07:38:52 -0600 Subject: [PATCH 11/14] Update the readme. (hosts isn't a valid option). --- README.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f4ecdb2..9adcf7a 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,16 @@ var bayeux = new faye.NodeAdapter({ timeout: 25, engine: { type: redis, - hosts: ['redis-server-1:6397','redis-server-1:6380', 'redis-server-2:6379'] + shards: [{ + host: 'redis-server-1', + port: 6397 + }, { + host: 'redis-server-1', + port: 6380 + }, { + host: 'redis-server-2', + port: 6379 + }] // more options } }); From 4890e71d6e38f426b6fcdaf28cc99f7669721af4 Mon Sep 17 00:00:00 2001 From: Zach Wily Date: Sat, 2 Aug 2014 08:24:23 -0600 Subject: [PATCH 12/14] Convert some hand-rolled async code to use async library Pulled in "async" to help with several cases where we perform several operations in parallel, and need a callback at the end. --- lib/faye-redis-sharded.js | 149 ++++++++++++++++---------------------- package.json | 6 +- 2 files changed, 66 insertions(+), 89 deletions(-) diff --git a/lib/faye-redis-sharded.js b/lib/faye-redis-sharded.js index e69b349..abe3479 100644 --- a/lib/faye-redis-sharded.js +++ b/lib/faye-redis-sharded.js @@ -1,5 +1,7 @@ var redis = require('redis'), - ShardManager = require('./sharding/shard-manager'); + async = require('async'), + _ = require('lodash'), + ShardManager = require('./sharding/shard-manager'); var Engine = function (server, options) { var self = this; @@ -168,9 +170,9 @@ Engine.prototype = { destroyClient:function (clientId, callback, context) { var self = this, - shard = this._getShard(clientId), - redis = shard.redis, - subscriber = shard.subscriber; + shard = this._getShard(clientId), + redis = shard.redis, + subscriber = shard.subscriber; subscriber.unsubscribe(self._ns + '/' + clientId + '/notify'); redis.smembers(this._ns + '/clients/' + clientId + '/channels', function (err, channels) { @@ -179,44 +181,35 @@ Engine.prototype = { return; } - var n = channels.length, i = 0; - if (i === n) return self._afterSubscriptionsRemoved(clientId, callback, context); - - var unsubscribeError = null; - channels.forEach(function (channel) { - self.unsubscribe(clientId, channel, function (err) { - unsubscribeError = unsubscribeError || err; - i += 1; - if (i === n) { - if (unsubscribeError) { - if (callback) callback.call(context); - } else { - self._afterSubscriptionsRemoved(clientId, callback, context); - } - } - }); - }); - }); - }, + // unsubscribe from all channels... + async.parallel(channels.map(function(channel) { + return function(done) { + self.unsubscribe(clientId, channel, done); + }; + }), function(err) { + // ... and then clear out the client, and trigger a close event. + if (err) { + if (callback) callback.call(context); + return; + } - _afterSubscriptionsRemoved:function (clientId, callback, context) { - var self = this, - shard = this._getShard(clientId), - multi = shard.redis.multi(); + var multi = redis.multi(); - multi.del(self._ns + '/clients/' + clientId + '/messages'); - multi.zrem(self._ns + '/clients', clientId); - multi.publish(shard.closeChannel, clientId); + multi.del(self._ns + '/clients/' + clientId + '/messages'); + multi.zrem(self._ns + '/clients', clientId); + multi.publish(shard.closeChannel, clientId); - multi.exec(function(err, results) { - if (err) { - if (callback) callback.call(context); - return; - } + multi.exec(function(err, results) { + if (err) { + if (callback) callback.call(context); + return; + } - self._server.debug('Destroyed client ?', clientId); - self._server.trigger('disconnect', clientId); - if (callback) callback.call(context); + self._server.debug('Destroyed client ?', clientId); + self._server.trigger('disconnect', clientId); + if (callback) callback.call(context); + }); + }); }); }, @@ -277,60 +270,44 @@ Engine.prototype = { // and then dedupe those clientIds because each client should only get exactly // *one* copy of the message. - var allClients = {}; - var shardsToCheck = Object.keys(shardMap).length; - function getClientsForChannels(message, channels, channelShard, done) { var keys = channels.map(function (c) { return self._ns + '/channels' + c; }); - // get all clients subscribed to the channels on that shard - channelShard.redis.sunion.apply(channelShard.redis, keys.concat(done)); + channelShard.redis.sunion.apply(channelShard.redis, keys.concat(done)); } - function checkDone() { - shardsToCheck -= 1; - if (shardsToCheck == 0) { - var jsonMessage = JSON.stringify(message); - - Object.keys(allClients).forEach(function (clientId) { - var shard = self._getShard(clientId, shardManager), - redis = shard.redis; - self._server.debug('Queueing for client ?: ?', clientId, message); - redis.rpush(self._ns + '/clients/' + clientId + '/messages', jsonMessage, function (err, result) { - redis.publish(self._ns + '/' + clientId + '/notify', clientId); - self._server.debug('Published for client ? - ? - to server ?', clientId, message, shard.shardName); - }); + async.parallel(_.map(shardMap, function(shardInfo) { + // build up a list of clientIds to deliver the message to across shards... + return function(done) { + getClientsForChannels(message, shardInfo.channels, shardInfo.shard, done); + }; + }), function(err, results) { + if (err) { + self._server.error('Error getting list of clients to publish to: ?', err.message); + return; + }; + + var jsonMessage = JSON.stringify(message); + + // ... then uniquify it and deliver. + _.uniq(_.flatten(results)).forEach(function(clientId) { + var shard = self._getShard(clientId, shardManager), + redis = shard.redis; + self._server.debug('Queueing for client ?: ?', clientId, message); + redis.rpush(self._ns + '/clients/' + clientId + '/messages', jsonMessage, function (err, result) { + redis.publish(self._ns + '/' + clientId + '/notify', clientId); + self._server.debug('Published for client ? - ? - to server ?', clientId, message, shard.shardName); }); - } - } - - Object.keys(shardMap).forEach(function (shardName) { - var map = shardMap[shardName], - shard = map.shard, - channels = map.channels; - - getClientsForChannels(message, channels, shard, function(err, clients) { - if (err) return; - - clients.forEach(function(clientId) { - allClients[clientId] = true; - }); - - setTimeout(checkDone, 0); }); }); } // publish to all shard managers - function broadcast(message, channels) { - self._shardManagers.forEach(function (shardManager) { - publish(message, channels, shardManager); - }) - } - - broadcast(message, channels); + self._shardManagers.forEach(function (shardManager) { + publish(message, channels, shardManager); + }) this._server.debug('Publishing message ?', message); this._server.trigger('publish', message.clientId, message.channel, message.data); @@ -379,15 +356,11 @@ Engine.prototype = { shard.redis.zrangebyscore(this._ns + '/clients', 0, cutoff, function (error, clients) { if (error) return releaseLock(); - var i = 0, n = clients.length; - if (i === n) return releaseLock(); - - clients.forEach(function (clientId) { - this.destroyClient(clientId, function () { - i += 1; - if (i === n) releaseLock(); - }, this); - }, self); + async.parallel(clients.map(function(clientId) { + return function(done) { + self.destroyClient(clientId, done); + }; + }), releaseLock); }); }, self); }); diff --git a/package.json b/package.json index 4b38a9a..0378eff 100644 --- a/package.json +++ b/package.json @@ -14,6 +14,8 @@ }, "main": "./index", "dependencies": { + "async": "^0.9.0", + "lodash": "^2.4.1", "redis": "0.11.x" }, "licenses": [ @@ -31,5 +33,7 @@ "devDependencies": { "jstest": "" }, - "scripts": {"test": "node spec/runner.js"} + "scripts": { + "test": "node spec/runner.js" + } } From 896ce7c0c8a78270d4accae351fed3abbba0ef7c Mon Sep 17 00:00:00 2001 From: Zach Wily Date: Sat, 2 Aug 2014 12:32:35 -0600 Subject: [PATCH 13/14] Make sure newly created clients don't get immediately gc'd --- lib/faye-redis-sharded.js | 1 + vendor/faye | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/faye-redis-sharded.js b/lib/faye-redis-sharded.js index abe3479..043cad9 100644 --- a/lib/faye-redis-sharded.js +++ b/lib/faye-redis-sharded.js @@ -151,6 +151,7 @@ Engine.prototype = { this._getShard(clientId).redis.zadd(this._ns + '/clients', 0, clientId, function (error, added) { if (added === 0) return self.createClient(callback, context); self._server.debug('Created new client ?', clientId); + self.ping(clientId); self._server.trigger('handshake', clientId); callback.call(context, clientId); }); diff --git a/vendor/faye b/vendor/faye index 6fd706e..8d6c9c0 160000 --- a/vendor/faye +++ b/vendor/faye @@ -1 +1 @@ -Subproject commit 6fd706e7674a9b31deb4b1e304524d720182e55d +Subproject commit 8d6c9c0f07dab8e57fd635db084f4991923bd9db From 4c0f9b708e015b263fc5db2bbbb2e15eced071f4 Mon Sep 17 00:00:00 2001 From: Zach Wily Date: Sat, 2 Aug 2014 12:48:24 -0600 Subject: [PATCH 14/14] Enfore clientId timeout in clientExists. Ported from faye-redis-node, see: https://github.com/jpignata/faye-redis-node/commit/d0bc94086315ddbffbcf68018df4e0019eb3c632 --- lib/faye-redis-sharded.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/faye-redis-sharded.js b/lib/faye-redis-sharded.js index 043cad9..775ae0f 100644 --- a/lib/faye-redis-sharded.js +++ b/lib/faye-redis-sharded.js @@ -163,9 +163,11 @@ Engine.prototype = { return; } + var cutoff = new Date().getTime() - (1000 * 1.6 * this._server.timeout); + var redis = this._getShard(clientId).redis; redis.zscore(this._ns + '/clients', clientId, function (error, score) { - callback.call(context, score !== null); + callback.call(context, parseInt(score, 10) > cutoff); }); },