diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..cdc7c2c --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,25 @@ +name: Tests + +on: [push, pull_request] + +jobs: + test: + runs-on: ubuntu-latest + env: + NODE_NO_WARNINGS: 1 + services: + redis: + image: tradle/redis + ports: + - "6379:6379" + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-node@v1 + with: + node-version: 14.17.3 + - name: Installing dependencies + run: npm i + - name: Linting code + run: npm run lint + - name: Running unit tests + run: npm run test diff --git a/README.md b/README.md index cc8fe0b..5e57055 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # serverless-iot-local -AWS Iot lifecycle and regular topic subscription events +AWS Iot lifecycle and regular topic subscription events. ## Prerequisites * serverless@1.x @@ -18,15 +18,15 @@ plugins: ``` ## Usage -1. Start redis: +1. Start redis: `redis-server` 2. If you're using [serverless-offline](https://github.com/dherault/serverless-offline), you can run: `sls offline start` - Otherwise run: - + Otherwise run: + `sls iot start` CLI options are optional: @@ -34,7 +34,7 @@ CLI options are optional: ``` --port -p Port to listen on. Default: 1883 --httpPort -h Port for WebSocket connections. Default: 1884 ---noStart -n Prevent Iot broker (Mosca MQTT brorker) from being started (if you already have one) +--noStart -n Prevent Iot broker (Aedes MQTT broker) from being started (if you already have one) --skipCacheValidation -c Tells the plugin to skip require cache invalidation. A script reloading tool like Nodemon might then be needed (same as serverless-offline) ``` diff --git a/__tests__/basic.test.js b/__tests__/basic.test.js new file mode 100644 index 0000000..f255b06 --- /dev/null +++ b/__tests__/basic.test.js @@ -0,0 +1,88 @@ +if (process.env.SLS_DEBUG === undefined) { + process.env.SLS_DEBUG = '*' +} + +const test = require('tape') +const ServerlessIoTLocal = require('../') +const { randomBytes } = require('crypto') +const awsIot = require('aws-iot-device-sdk') +const promisify = require('pify') +const debug = require('debug')('serverless-iot-local-test') + +const serverless = { + cli: { + log: debug + }, + service: { + custom: {}, + service: 'x', + provider: { + environment: '', + stage: '', + runtime: '', + region: '', + timeout: 0 + }, + functions: {}, + getFunction (key) {} + }, + + getProvider (_name /* aws */) { + return { + naming: { + getStackName () { + return '' + } + } + } + }, + config: { + servicePath: 'abcd' + }, + pluginManager: { + plugins: [] + } +} + +function createClient () { + const client = awsIot.device({ + protocol: 'ws', + port: 1884, + host: 'localhost' + }) + return promisify(client) +} + +test('Basic server test', async t => { + const inst = new ServerlessIoTLocal(serverless, { + redis: { + host: process.env.REDIS_HOST || process.env.TRADLE_LOCAL_IP, + port: process.env.REDIS_PORT + } + }) + inst.startHandler() + t.pass('waiting 300ms') + await new Promise(resolve => setTimeout(resolve, 300)) + t.pass('creating clients') + const client1 = createClient() + const client2 = createClient() + t.pass('subscribing client') + await client2.subscribe('test', { qos: 1 }) + t.pass('init receiving client') + const receive = new Promise(resolve => { + client2.handleMessage = message => resolve(message.payload.toString()) + }) + t.pass('publishing message') + const payload = randomBytes(6).toString('hex') + const [, data] = await Promise.all([ + client1.publish('test', payload, { qos: 1 }).then(() => t.pass('data sent')), + receive + ]) + t.equals(data, payload, `expected test data "${payload}" received`) + + await Promise.all([ + client2.end(), + client1.end() + ]) + inst.endHandler() +}) diff --git a/__tests__/eval.test.js b/__tests__/eval.test.js index 4923f8f..9d0dcf2 100644 --- a/__tests__/eval.test.js +++ b/__tests__/eval.test.js @@ -15,6 +15,6 @@ test('evalInContext - evals function in context', (t) => { }) test('throws error if variable does not exist', (t) => { - t.throws((() => evalInContext('notHere', {}))) + t.throws(() => evalInContext('notHere', {})) t.end() }) diff --git a/__tests__/sql.test.js b/__tests__/sql.test.js index 7648328..1fc8e1b 100644 --- a/__tests__/sql.test.js +++ b/__tests__/sql.test.js @@ -33,16 +33,16 @@ test('parseSelect - parses multiple SELECT properties correctly', (t) => { const subject = "SELECT name, age, maleOrFemale AS gender FROM 'topic'" const results = parseSelect(subject) t.deepEqual(results.select, [ - { field: 'name', alias: undefined}, + { field: 'name', alias: undefined }, { field: 'age', alias: undefined }, - { field: 'maleOrFemale', alias: 'gender'} + { field: 'maleOrFemale', alias: 'gender' } ]) t.end() }) test('applySelect - Simple select with buffered string handled correctly', (t) => { const select = [{ field: '*', alias: undefined }] - const payload = Buffer.from(JSON.stringify({name: 'Bob'}), 'utf8') + const payload = Buffer.from(JSON.stringify({ name: 'Bob' }), 'utf8') const context = {} const event = applySelect({ select, payload, context }) t.deepEqual(event, { name: 'Bob' }) @@ -63,7 +63,7 @@ test('applySelect - Aliased wildcard with non-JSON handled correctly', (t) => { const payload = 'Bob' const context = {} const event = applySelect({ select, payload, context }) - t.deepEqual(event, { 'name': 'Bob'}) + t.deepEqual(event, { name: 'Bob' }) t.end() }) @@ -72,8 +72,8 @@ test('applySelect - Unaliased wildcard plus function results in flattened output { field: '*', alias: undefined }, { field: 'clientid()', alias: undefined } ] - const clientIdFunc = sinon.stub().returns(undefined); - const payload = Buffer.from(JSON.stringify({name: 'Bob'}), 'utf8') + const clientIdFunc = sinon.stub().returns(undefined) + const payload = Buffer.from(JSON.stringify({ name: 'Bob' }), 'utf8') const context = { clientid: clientIdFunc } const event = applySelect({ select, payload, context }) t.ok(clientIdFunc.calledOnce) @@ -86,8 +86,8 @@ test('applySelect - Aliased wildcard plus function results in nested output', (t { field: '*', alias: 'message' }, { field: 'clientid()', alias: undefined } ] - const clientIdFunc = sinon.stub().returns(undefined); - const payload = Buffer.from(JSON.stringify({name: 'Bob'}), 'utf8') + const clientIdFunc = sinon.stub().returns(undefined) + const payload = Buffer.from(JSON.stringify({ name: 'Bob' }), 'utf8') const context = { clientid: clientIdFunc } const event = applySelect({ select, payload, context }) t.ok(clientIdFunc.calledOnce) @@ -101,10 +101,10 @@ test('applySelect - Function results are appeneded to output', (t) => { { field: 'clientid()', alias: 'theClientId' } ] const clientIdFunc = sinon.stub().returns('12345') - const payload = Buffer.from(JSON.stringify({name: 'Bob'}), 'utf8') + const payload = Buffer.from(JSON.stringify({ name: 'Bob' }), 'utf8') const context = { clientid: clientIdFunc } const event = applySelect({ select, payload, context }) t.ok(clientIdFunc.calledOnce) - t.deepEqual(event, { message: { name: 'Bob' }, 'theClientId': '12345' }) + t.deepEqual(event, { message: { name: 'Bob' }, theClientId: '12345' }) t.end() }) diff --git a/broker.js b/broker.js index 919ee3d..cef28f9 100644 --- a/broker.js +++ b/broker.js @@ -1,9 +1,6 @@ -const mosca = require('mosca') - -// fired when the mqtt server is ready -function setup() { - console.log('Mosca server is up and running') -} +const Aedes = require('aedes') +const { createServer } = require('aedes-server-factory') +const aedesPersistenceRedis = require('aedes-persistence-redis') function createAWSLifecycleEvent ({ type, clientId, topics }) { // http://docs.aws.amazon.com/iot/latest/developerguide/life-cycle-events.html#subscribe-unsubscribe-events @@ -24,54 +21,57 @@ function createAWSLifecycleEvent ({ type, clientId, topics }) { /** * https://github.com/aws/aws-sdk-js/blob/master/clients/iot.d.ts#L349 - * + * * @param {Object} opts Module options - * @param {Object} moscaOpts Mosca options + * @param {Object} aedesOpts Aedes options */ -function createBroker (ascoltatore, moscaOpts) { - const moscaSettings = { - // port: 1883, - backend: ascoltatore, - persistence: { - factory: mosca.persistence.Redis - } - } +function createMQTTBroker ({ host, port, httpPort, redis }, debug) { + const persistence = aedesPersistenceRedis(redis) + const aedes = new Aedes({ persistence }) + aedes.on('ready', () => debug('Aedes server is up and running')) + aedes.on('client', client => publishClient('connected', client.id)) + aedes.on('clientDisconnect', client => publishClient('disconnected', client.id)) + aedes.on('subscribe', (subscriptions, client) => publishSubscription('subscribed', client.id, subscriptions)) + aedes.on('unsubscribe', (subscriptions, client) => publishSubscription('unsubscribed', client.id, subscriptions)) - moscaOpts = Object.assign({}, moscaSettings, moscaOpts) - const server = new mosca.Server(moscaOpts) - server.on('ready', setup) - - // fired when a message is received - server.on('published', function (packet, client) { - const presence = packet.topic.match(/^\$SYS\/.*\/(new|disconnect)\/clients$/) - if (presence) { - const clientId = packet.payload - const type = presence[1] === 'new' ? 'connected' : 'disconnected' - server.publish({ - topic: `$aws/events/presence/${type}/${clientId}`, - payload: JSON.stringify(createAWSLifecycleEvent({ - type, - clientId - })) - }) - } + const tcp = createServer(aedes) + const http = createServer(aedes, { + ws: true + }) - const subscription = packet.topic.match(/^\$SYS\/.*\/new\/(subscribes|unsubscribes)$/) - if (subscription) { - const type = subscription[1] === 'subscribes' ? 'subscribed' : 'unsubscribed' - const { clientId, topic } = JSON.parse(packet.payload) - server.publish({ - topic: `$aws/events/subscriptions/${type}/${clientId}`, - payload: JSON.stringify(createAWSLifecycleEvent({ - type, - clientId, - topics: [topic] - })) - }) - } + debug(`Listening to Aedes tcp at ${host}:${port}`) + tcp.listen(port, () => { + debug(`Aedes tcp is up and running at ${host}:${port}`) }) + debug(`Listening to Aedes http at ${host}:${httpPort}`) + http.listen(httpPort, () => { + debug(`Aedes http is up and running at ${host}:${httpPort}`) + }) + + return { aedes, tcp, http, persistence } - return server + function publishClient (type, clientId) { + debug(`Publishing client ${type}/${clientId}`) + aedes.publish({ + topic: `$aws/events/presence/${type}/${clientId}`, + payload: JSON.stringify(createAWSLifecycleEvent({ + type, + clientId + })) + }) + } + + function publishSubscription (type, clientId, subscriptions) { + debug(`Publishing subscription ${type}/${clientId}`) + aedes.publish({ + topic: `$aws/events/subscriptions/${type}/${clientId}`, + payload: JSON.stringify(createAWSLifecycleEvent({ + type, + clientId, + topics: subscriptions + })) + }) + } } -module.exports = createBroker +module.exports = createMQTTBroker diff --git a/eval.js b/eval.js index fd8371b..d6311c7 100644 --- a/eval.js +++ b/eval.js @@ -1,6 +1,7 @@ // TODO: trim(), ltrim(), etc const evalInContext = (js, context) => { + /* eslint-disable */ const { clientid, topic, principal } = context try { return eval(js) @@ -9,14 +10,7 @@ const evalInContext = (js, context) => { console.log(`failed to evaluate: ${js}`) throw err } -} - -const encode = (data, encoding) => { - if (encoding !== 'base64') { - throw new Error('AWS Iot SQL encode() function only supports base64 as an encoding') - } - - return data.toString(encoding) + /* eslint-enable */ } module.exports = evalInContext diff --git a/index.js b/index.js index 1a9c0ce..dd6a788 100644 --- a/index.js +++ b/index.js @@ -5,15 +5,12 @@ const mqttMatch = require('mqtt-match') const realAWS = require('aws-sdk') const AWS = require('aws-sdk-mock') AWS.setSDK(path.resolve('node_modules/aws-sdk')) -const extend = require('xtend') const IP = require('ip') -const redis = require('redis') const SQL = require('./sql') -const evalInContext = require('./eval') const createMQTTBroker = require('./broker') // TODO: send PR to serverless-offline to export this -const functionHelper = require('serverless-offline/src/functionHelper') -const createLambdaContext = require('serverless-offline/src/createLambdaContext') +const functionHelper = require('@tradle/serverless-offline/src/functionHelper') +const createLambdaContext = require('@tradle/serverless-offline/src/createLambdaContext') const VERBOSE = typeof process.env.SLS_DEBUG !== 'undefined' const defaultOpts = { host: 'localhost', @@ -21,20 +18,18 @@ const defaultOpts = { port: 1883, httpPort: 1884, noStart: false, - skipCacheInvalidation: false -} - -const ascoltatoreOpts = { - type: 'redis', - redis, - host: 'localhost', - port: 6379, - db: 12, - return_buffers: true // to handle binary payloads + skipCacheInvalidation: false, + redis: { + port: 6379, // Redis port + host: 'localhost', // Redis host + family: 4, // 4 (IPv4) or 6 (IPv6) + db: 12, + maxSessionDelivery: 100 // maximum offline messages deliverable on client CONNECT, default is 1000 + } } class ServerlessIotLocal { - constructor(serverless, options) { + constructor (serverless, options) { this.serverless = serverless this.log = serverless.cli.log.bind(serverless.cli) this.service = serverless.service @@ -65,12 +60,12 @@ class ServerlessIotLocal { }, noStart: { shortcut: 'n', - usage: 'Do not start local MQTT broker (in case it is already running)', + usage: 'Do not start local MQTT broker (in case it is already running)' }, skipCacheInvalidation: { usage: 'Tells the plugin to skip require cache invalidation. A script reloading tool like Nodemon might then be needed', - shortcut: 'c', - }, + shortcut: 'c' + } } } } @@ -81,17 +76,17 @@ class ServerlessIotLocal { 'iot:start:startHandler': this.startHandler.bind(this), 'before:offline:start:init': this.startHandler.bind(this), 'before:offline:start': this.startHandler.bind(this), - 'before:offline:start:end': this.endHandler.bind(this), + // 'before:offline:start:end': this.endHandler.bind(this) } } - debug() { + debug () { if (VERBOSE) { this.log.apply(this, arguments) } } - startHandler() { + startHandler () { this.originalEnvironment = _.extend({ IS_OFFLINE: true }, process.env) const custom = this.service.custom || {} @@ -112,32 +107,22 @@ class ServerlessIotLocal { this._createMQTTClient() } - endHandler() { + endHandler () { this.log('Stopping Iot broker') - this.mqttBroker.close() + this.mqttBroker.tcp.close(() => { + this.mqttBroker.http.close(() => { + this.mqttBroker.aedes.close(() => { + this.mqttBroker.persistence.destroy() + }) + }) + }) + this._client.end() } - _createMQTTBroker() { - const { host, port, httpPort } = this.options + _createMQTTBroker () { + this.mqttBroker = createMQTTBroker(this.options, (...args) => this.debug(...args)) - const mosca = { - host, - port, - http: { - host, - port: httpPort, - bundle: true - } - } - - // For now we'll only support redis backend. - const redisConfigOpts = this.options.redis; - - const ascoltatore = _.merge({}, ascoltatoreOpts, redisConfigOpts) - - this.mqttBroker = createMQTTBroker(ascoltatore, mosca) - - const endpointAddress = `${IP.address()}:${httpPort}` + const endpointAddress = `${isLocalHost(this.options.host) ? IP.address() : this.options.host}:${this.options.httpPort}` // prime AWS IotData import // this is necessary for below mock to work @@ -149,7 +134,7 @@ class ServerlessIotLocal { AWS.mock('IotData', 'publish', (params, callback) => { const { topic, payload } = params - this.mqttBroker.publish({ topic, payload }, callback) + this.mqttBroker.aedes.publish({ topic, payload }, callback) }) AWS.mock('Iot', 'describeEndpoint', (params, callback) => { @@ -158,23 +143,21 @@ class ServerlessIotLocal { (callback || params)(null, { endpointAddress }) }) }) - - this.log(`Iot broker listening on ports: ${port} (mqtt) and ${httpPort} (http)`) } - _getServerlessOfflinePort() { + _getServerlessOfflinePort () { // hackeroni! const offline = this.serverless.pluginManager.plugins.find( plugin => plugin.commands && plugin.commands.offline ) if (offline) { - return offline.options.port + return offline.options.httpPort || offline.options.port } } - _createMQTTClient() { - const { port, httpPort, location } = this.options + _createMQTTClient () { + const { host, httpPort, location } = this.options const topicsToFunctionsMap = {} const { runtime } = this.service.provider const stackName = this.provider.naming.getStackName() @@ -209,7 +192,7 @@ class ServerlessIotLocal { // assumes SELECT ... topic() as topic const parsed = SQL.parseSelect({ sql, - stackName, + stackName }) const topicMatcher = parsed.topic @@ -227,14 +210,17 @@ class ServerlessIotLocal { }) }) - const client = mqtt.connect(`ws://localhost:${httpPort}/mqqt`) + const url = `ws://${host}:${httpPort}/mqqt` + const client = mqtt.connect(url) + this.log(`connecting to local Iot broker! at ${url}`) + this._client = client client.on('error', console.error) let connectMonitor const startMonitor = () => { clearInterval(connectMonitor) connectMonitor = setInterval(() => { - this.log(`still haven't connected to local Iot broker!`) + this.log(`still haven't connected to local Iot broker! ${url}`) }, 5000).unref() } @@ -243,11 +229,14 @@ class ServerlessIotLocal { client.on('connect', () => { clearInterval(connectMonitor) this.log('connected to local Iot broker') - for (let topicMatcher in topicsToFunctionsMap) { + for (const topicMatcher in topicsToFunctionsMap) { client.subscribe(topicMatcher) } }) + client.on('end', () => { + clearInterval(connectMonitor) + }) client.on('disconnect', startMonitor) client.on('message', (topic, message) => { @@ -265,7 +254,7 @@ class ServerlessIotLocal { const apiGWPort = this._getServerlessOfflinePort() matches.forEach(topicMatcher => { - let functions = topicsToFunctionsMap[topicMatcher] + const functions = topicsToFunctionsMap[topicMatcher] functions.forEach(fnInfo => { const { fn, name, options, select } = fnInfo const requestId = Math.random().toString().slice(2) @@ -304,7 +293,7 @@ class ServerlessIotLocal { }) } - _getFunction(key) { + _getFunction (key) { const fun = this.service.getFunction(key) if (!fun.timeout) { fun.timeout = this.service.provider.timeout @@ -314,4 +303,8 @@ class ServerlessIotLocal { } } +function isLocalHost (host) { + return host === '0.0.0.0' || host === '127.0.0.1' || host === 'localhost' +} + module.exports = ServerlessIotLocal diff --git a/package.json b/package.json index 32f9106..c4ceea2 100644 --- a/package.json +++ b/package.json @@ -1,31 +1,39 @@ { "name": "serverless-iot-local", - "version": "2.1.2", + "version": "2.1.3", "description": "local iot events for the Serverless Framework", "main": "index.js", "repository": "https://github.com/tradle/serverless-iot-local", "author": "mvayngrib", "license": "MIT", "scripts": { + "lint": "standard", "test": "tape '__tests__/**/*'", "coverage": "istanbul cover tape tape '__tests__/**/*'" }, "dependencies": { + "aedes": "^0.46.1", + "aedes-persistence-redis": "^8.0.1", + "aedes-server-factory": "^0.2.1", "aws-sdk-mock": "^1.7.0", "ip": "^1.1.5", "lodash": "^4.17.4", - "mosca": "^2.6.0", "mqtt": "^2.13.1", - "mqtt-match": "^1.0.3", - "redis": "^2.8.0" + "mqtt-match": "^1.0.3" }, "peerDependencies": { - "aws-sdk": "*", - "serverless-offline": "*" + "@tradle/serverless-offline": "^3.16.0", + "aws-sdk": "*" }, "devDependencies": { + "@tradle/serverless-offline": "^3.16.0", + "aws-iot-device-sdk": "github:mvayngrib/aws-iot-device-sdk-js", + "aws-sdk": "^2.631.0", + "debug": "^3.2.7", "istanbul": "^0.4.5", + "pify": "^5.0.0", "sinon": "^5.0.3", - "tape": "^4.9.0" + "standard": "^16.0.3", + "tape": "^5.3.1" } } diff --git a/sql.js b/sql.js index d2780ec..aac8e83 100644 --- a/sql.js +++ b/sql.js @@ -3,7 +3,11 @@ const BASE64_PLACEHOLDER = '*b64' const SQL_REGEX = /^SELECT (.*)\s+FROM\s+'([^']+)'\s*(?:WHERE\s(.*))?$/i const SELECT_PART_REGEX = /^(.*?)(?: AS (.*))?$/i -const parseSelect = ({ sql, stackName }) => { +const parseSelect = (options) => { + if (typeof options === 'string') { + options = { sql: options } + } + let { sql, stackName } = options // if (/\([^)]/.test(sql)) { // throw new Error(`AWS Iot SQL functions in this sql are not yet supported: ${sql}`) // } @@ -38,18 +42,18 @@ const parseSelectPart = part => { } } -const brace = new Buffer('{')[0] -const bracket = new Buffer('[')[0] -const doubleQuote = new Buffer('"')[0] +const brace = Buffer.from('{')[0] +const bracket = Buffer.from('[')[0] +const doubleQuote = Buffer.from('"')[0] // to avoid stopping here when Stop on Caught Exceptions is on const maybeParseJSON = val => { switch (val[0]) { - case brace: - case bracket: - case doubleQuote: - try { - return JSON.parse(val) - } catch (err) {} + case brace: + case bracket: + case doubleQuote: + try { + return JSON.parse(val) + } catch (err) {} } return val @@ -63,7 +67,7 @@ const applySelect = ({ select, payload, context }) => { } const payloadReplacement = Buffer.isBuffer(payload) - ? `new Buffer('${payload.toString('base64')}', 'base64')` + ? `Buffer.from('${payload.toString('base64')}', 'base64')` : payload for (const part of select) { @@ -79,7 +83,7 @@ const applySelect = ({ select, payload, context }) => { * SELECT *, clientid() from 'topic' * { fieldOne: 'value', ...} */ - if(alias) { + if (alias) { event[key] = json } else { Object.assign(event, json) @@ -96,6 +100,6 @@ const applySelect = ({ select, payload, context }) => { module.exports = { parseSelect, - applySelect, + applySelect // parseWhere }