Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: Tests

on: [push, pull_request]

jobs:
test:
runs-on: ubuntu-latest
env:
NODE_NO_WARNINGS: 1
services:
redis:
image: tradle/redis
ports:
- "6379:6379"
steps:
- uses: actions/checkout@v2
- uses: actions/setup-node@v1
with:
node-version: 14.17.3
- name: Installing dependencies
run: npm i
- name: Linting code
run: npm run lint
- name: Running unit tests
run: npm run test
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# serverless-iot-local

AWS Iot lifecycle and regular topic subscription events
AWS Iot lifecycle and regular topic subscription events.

## Prerequisites
* [email protected]
Expand All @@ -18,23 +18,23 @@ plugins:
```

## Usage
1. Start redis:
1. Start redis:
`redis-server`

2. If you're using [serverless-offline](https://github.com/dherault/serverless-offline), you can run:

`sls offline start`

Otherwise run:
Otherwise run:

`sls iot start`

CLI options are optional:

```
--port -p Port to listen on. Default: 1883
--httpPort -h Port for WebSocket connections. Default: 1884
--noStart -n Prevent Iot broker (Mosca MQTT brorker) from being started (if you already have one)
--noStart -n Prevent Iot broker (Aedes MQTT broker) from being started (if you already have one)
--skipCacheValidation -c Tells the plugin to skip require cache invalidation. A script reloading tool like Nodemon might then be needed (same as serverless-offline)
```

Expand Down
88 changes: 88 additions & 0 deletions __tests__/basic.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
if (process.env.SLS_DEBUG === undefined) {
process.env.SLS_DEBUG = '*'
}

const test = require('tape')
const ServerlessIoTLocal = require('../')
const { randomBytes } = require('crypto')
const awsIot = require('aws-iot-device-sdk')
const promisify = require('pify')
const debug = require('debug')('serverless-iot-local-test')

const serverless = {
cli: {
log: debug
},
service: {
custom: {},
service: 'x',
provider: {
environment: '',
stage: '',
runtime: '',
region: '',
timeout: 0
},
functions: {},
getFunction (key) {}
},

getProvider (_name /* aws */) {
return {
naming: {
getStackName () {
return ''
}
}
}
},
config: {
servicePath: 'abcd'
},
pluginManager: {
plugins: []
}
}

function createClient () {
const client = awsIot.device({
protocol: 'ws',
port: 1884,
host: 'localhost'
})
return promisify(client)
}

test('Basic server test', async t => {
const inst = new ServerlessIoTLocal(serverless, {
redis: {
host: process.env.REDIS_HOST || process.env.TRADLE_LOCAL_IP,
port: process.env.REDIS_PORT
}
})
inst.startHandler()
t.pass('waiting 300ms')
await new Promise(resolve => setTimeout(resolve, 300))
t.pass('creating clients')
const client1 = createClient()
const client2 = createClient()
t.pass('subscribing client')
await client2.subscribe('test', { qos: 1 })
t.pass('init receiving client')
const receive = new Promise(resolve => {
client2.handleMessage = message => resolve(message.payload.toString())
})
t.pass('publishing message')
const payload = randomBytes(6).toString('hex')
const [, data] = await Promise.all([
client1.publish('test', payload, { qos: 1 }).then(() => t.pass('data sent')),
receive
])
t.equals(data, payload, `expected test data "${payload}" received`)

await Promise.all([
client2.end(),
client1.end()
])
inst.endHandler()
})
2 changes: 1 addition & 1 deletion __tests__/eval.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ test('evalInContext - evals function in context', (t) => {
})

test('throws error if variable does not exist', (t) => {
t.throws((() => evalInContext('notHere', {})))
t.throws(() => evalInContext('notHere', {}))
t.end()
})
20 changes: 10 additions & 10 deletions __tests__/sql.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ test('parseSelect - parses multiple SELECT properties correctly', (t) => {
const subject = "SELECT name, age, maleOrFemale AS gender FROM 'topic'"
const results = parseSelect(subject)
t.deepEqual(results.select, [
{ field: 'name', alias: undefined},
{ field: 'name', alias: undefined },
{ field: 'age', alias: undefined },
{ field: 'maleOrFemale', alias: 'gender'}
{ field: 'maleOrFemale', alias: 'gender' }
])
t.end()
})

test('applySelect - Simple select with buffered string handled correctly', (t) => {
const select = [{ field: '*', alias: undefined }]
const payload = Buffer.from(JSON.stringify({name: 'Bob'}), 'utf8')
const payload = Buffer.from(JSON.stringify({ name: 'Bob' }), 'utf8')
const context = {}
const event = applySelect({ select, payload, context })
t.deepEqual(event, { name: 'Bob' })
Expand All @@ -63,7 +63,7 @@ test('applySelect - Aliased wildcard with non-JSON handled correctly', (t) => {
const payload = 'Bob'
const context = {}
const event = applySelect({ select, payload, context })
t.deepEqual(event, { 'name': 'Bob'})
t.deepEqual(event, { name: 'Bob' })
t.end()
})

Expand All @@ -72,8 +72,8 @@ test('applySelect - Unaliased wildcard plus function results in flattened output
{ field: '*', alias: undefined },
{ field: 'clientid()', alias: undefined }
]
const clientIdFunc = sinon.stub().returns(undefined);
const payload = Buffer.from(JSON.stringify({name: 'Bob'}), 'utf8')
const clientIdFunc = sinon.stub().returns(undefined)
const payload = Buffer.from(JSON.stringify({ name: 'Bob' }), 'utf8')
const context = { clientid: clientIdFunc }
const event = applySelect({ select, payload, context })
t.ok(clientIdFunc.calledOnce)
Expand All @@ -86,8 +86,8 @@ test('applySelect - Aliased wildcard plus function results in nested output', (t
{ field: '*', alias: 'message' },
{ field: 'clientid()', alias: undefined }
]
const clientIdFunc = sinon.stub().returns(undefined);
const payload = Buffer.from(JSON.stringify({name: 'Bob'}), 'utf8')
const clientIdFunc = sinon.stub().returns(undefined)
const payload = Buffer.from(JSON.stringify({ name: 'Bob' }), 'utf8')
const context = { clientid: clientIdFunc }
const event = applySelect({ select, payload, context })
t.ok(clientIdFunc.calledOnce)
Expand All @@ -101,10 +101,10 @@ test('applySelect - Function results are appeneded to output', (t) => {
{ field: 'clientid()', alias: 'theClientId' }
]
const clientIdFunc = sinon.stub().returns('12345')
const payload = Buffer.from(JSON.stringify({name: 'Bob'}), 'utf8')
const payload = Buffer.from(JSON.stringify({ name: 'Bob' }), 'utf8')
const context = { clientid: clientIdFunc }
const event = applySelect({ select, payload, context })
t.ok(clientIdFunc.calledOnce)
t.deepEqual(event, { message: { name: 'Bob' }, 'theClientId': '12345' })
t.deepEqual(event, { message: { name: 'Bob' }, theClientId: '12345' })
t.end()
})
98 changes: 49 additions & 49 deletions broker.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
const mosca = require('mosca')

// fired when the mqtt server is ready
function setup() {
console.log('Mosca server is up and running')
}
const Aedes = require('aedes')
const { createServer } = require('aedes-server-factory')
const aedesPersistenceRedis = require('aedes-persistence-redis')

function createAWSLifecycleEvent ({ type, clientId, topics }) {
// http://docs.aws.amazon.com/iot/latest/developerguide/life-cycle-events.html#subscribe-unsubscribe-events
Expand All @@ -24,54 +21,57 @@ function createAWSLifecycleEvent ({ type, clientId, topics }) {

/**
* https://github.com/aws/aws-sdk-js/blob/master/clients/iot.d.ts#L349
*
*
* @param {Object} opts Module options
* @param {Object} moscaOpts Mosca options
* @param {Object} aedesOpts Aedes options
*/
function createBroker (ascoltatore, moscaOpts) {
const moscaSettings = {
// port: 1883,
backend: ascoltatore,
persistence: {
factory: mosca.persistence.Redis
}
}
function createMQTTBroker ({ host, port, httpPort, redis }, debug) {
const persistence = aedesPersistenceRedis(redis)
const aedes = new Aedes({ persistence })
aedes.on('ready', () => debug('Aedes server is up and running'))
aedes.on('client', client => publishClient('connected', client.id))
aedes.on('clientDisconnect', client => publishClient('disconnected', client.id))
aedes.on('subscribe', (subscriptions, client) => publishSubscription('subscribed', client.id, subscriptions))
aedes.on('unsubscribe', (subscriptions, client) => publishSubscription('unsubscribed', client.id, subscriptions))

moscaOpts = Object.assign({}, moscaSettings, moscaOpts)
const server = new mosca.Server(moscaOpts)
server.on('ready', setup)

// fired when a message is received
server.on('published', function (packet, client) {
const presence = packet.topic.match(/^\$SYS\/.*\/(new|disconnect)\/clients$/)
if (presence) {
const clientId = packet.payload
const type = presence[1] === 'new' ? 'connected' : 'disconnected'
server.publish({
topic: `$aws/events/presence/${type}/${clientId}`,
payload: JSON.stringify(createAWSLifecycleEvent({
type,
clientId
}))
})
}
const tcp = createServer(aedes)
const http = createServer(aedes, {
ws: true
})

const subscription = packet.topic.match(/^\$SYS\/.*\/new\/(subscribes|unsubscribes)$/)
if (subscription) {
const type = subscription[1] === 'subscribes' ? 'subscribed' : 'unsubscribed'
const { clientId, topic } = JSON.parse(packet.payload)
server.publish({
topic: `$aws/events/subscriptions/${type}/${clientId}`,
payload: JSON.stringify(createAWSLifecycleEvent({
type,
clientId,
topics: [topic]
}))
})
}
debug(`Listening to Aedes tcp at ${host}:${port}`)
tcp.listen(port, () => {
debug(`Aedes tcp is up and running at ${host}:${port}`)
})
debug(`Listening to Aedes http at ${host}:${httpPort}`)
http.listen(httpPort, () => {
debug(`Aedes http is up and running at ${host}:${httpPort}`)
})

return { aedes, tcp, http, persistence }

return server
function publishClient (type, clientId) {
debug(`Publishing client ${type}/${clientId}`)
aedes.publish({
topic: `$aws/events/presence/${type}/${clientId}`,
payload: JSON.stringify(createAWSLifecycleEvent({
type,
clientId
}))
})
}

function publishSubscription (type, clientId, subscriptions) {
debug(`Publishing subscription ${type}/${clientId}`)
aedes.publish({
topic: `$aws/events/subscriptions/${type}/${clientId}`,
payload: JSON.stringify(createAWSLifecycleEvent({
type,
clientId,
topics: subscriptions
}))
})
}
}

module.exports = createBroker
module.exports = createMQTTBroker
10 changes: 2 additions & 8 deletions eval.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// TODO: trim(), ltrim(), etc

const evalInContext = (js, context) => {
/* eslint-disable */
const { clientid, topic, principal } = context
try {
return eval(js)
Expand All @@ -9,14 +10,7 @@ const evalInContext = (js, context) => {
console.log(`failed to evaluate: ${js}`)
throw err
}
}

const encode = (data, encoding) => {
if (encoding !== 'base64') {
throw new Error('AWS Iot SQL encode() function only supports base64 as an encoding')
}

return data.toString(encoding)
/* eslint-enable */
}

module.exports = evalInContext
Loading
Loading