Skip to content

Commit e7757c3

Browse files
author
Robin Gottfried
committed
avoid json pack/unpack overhead
1 parent 9bdee9a commit e7757c3

File tree

10 files changed

+482
-56
lines changed

10 files changed

+482
-56
lines changed

client/client.js

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ const http = require('http');
55
const checksum = require('../lib').checksum;
66
const WebSocket = require('ws');
77
const WsJsonProtocol = require('../lib/ws-json');
8+
const { BufferStruct, BufferStructType } = require('../lib/buffer-struct');
9+
const { Messanger } = require('../server/ws-message');
810

911

1012
class RequestForwarder extends Object {
@@ -44,7 +46,6 @@ class RequestForwarder extends Object {
4446
}
4547
_send({
4648
channel: message.channel,
47-
id: message.id,
4849
event: event_id,
4950
data: data,
5051
})
@@ -72,7 +73,7 @@ class RequestForwarder extends Object {
7273
if (message.data instanceof Object) {
7374
message.data = Buffer.from(message.data);
7475
}
75-
console.log(` :> ${digest(message.data)}]`);
76+
console.log(` :> ${checksum(message.data)}]`);
7677
req.write(message.data);
7778
} catch(err) {
7879
console.log('data is object', message);
@@ -107,14 +108,13 @@ class RequestForwarder extends Object {
107108
}
108109

109110
_send(data) {
110-
this._ws.send(data);
111+
this._ws.send(data.channel, data.event, data.data);
111112
}
112113

113114
on_message(message) {
114-
if (!message.channel || message.channel.indexOf('/req/') != 0) {
115-
return;
116-
} else {
115+
if (message.channel && message.channel.indexOf('/req/') == 0) {
117116
this.handle_request(message);
117+
} else {
118118
}
119119
}
120120
}
@@ -144,15 +144,15 @@ class WebSockProxyClient extends Object {
144144

145145
connect(host_port, {forward_to = 'http://localhost', websocket_path = '/ws'}={}) {
146146
const ws_ = new WebSocket(`ws://${host_port}${websocket_path}/${this.key}`);
147-
const ws = new WsJsonProtocol(ws_);
147+
const ws = new Messanger(ws_);
148148

149149
ws.on('open', function open() {
150150
const request_forwarder = new RequestForwarder(ws, forward_to);
151151
// const watchDog = new WathDog(request_forwarder);
152152
console.log("Client connection openned.");
153153

154-
ws.send({data:"Hallo."});
155-
ws.on("message", request_forwarder.on_message(message).bind(request_forwarder));
154+
ws.send('/', 'test', {data:"Hallo."});
155+
ws.on("message", request_forwarder.on_message.bind(request_forwarder));
156156
ws.on("close", function onClose() {
157157
console.log("Client connection closed.");
158158
});

lib/buffer-struct.js

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
//-- vim: ft=javascript tabstop=2 softtabstop=2 expandtab shiftwidth=2
2+
'use strict';
3+
const { debug } = require('./utils');
4+
console.log('debug is', debug);
5+
6+
class BufferStruct extends Buffer {
7+
8+
constructor(bufferStructType, value) {
9+
super(value);
10+
this._bufferStructType = bufferStructType;
11+
}
12+
13+
toJSON() {
14+
return this._bufferStructType.toJSON(this)
15+
}
16+
}
17+
18+
BufferStruct.forType = function forType(bufferStructType) {
19+
return new BufferStruct(bufferStructType, Buffer.alloc(bufferStructType.size));
20+
}
21+
22+
23+
class BufferStructType {
24+
25+
constructor(definitions) {
26+
this._fields = definitions;
27+
this._fields.forEach(function prepareDefinitions(def, defOrder) {
28+
if (defOrder == definitions.length - 1) def._isLast = true;
29+
30+
switch (def.type) {
31+
case 'int':
32+
if (def.size === undefined) def.size = 4;
33+
def.type = `Int${def.size*8}`;
34+
if (def.size > 1) def.type += 'LE';
35+
break;
36+
case 'uint':
37+
if (def.size === undefined) def.size = 4;
38+
def.type = `UInt${def.size*8}`;
39+
if (def.size > 1) def.type += 'LE';
40+
break;
41+
case 'float':
42+
if (def.size == 8) {
43+
def.type = 'DoubleLE';
44+
} else {
45+
def.type = 'FloatLE';
46+
if (def.size === undefined) def.size = 4;
47+
}
48+
break;
49+
case 'double':
50+
if (def.size === undefined) def.size = 8;
51+
def.type = 'DoubleLE';
52+
break;
53+
case 'buffer':
54+
if (def._isLast && def.size === 0) {
55+
// allow dynamic length
56+
def._dynamicSize = true;
57+
}
58+
break;
59+
case 'str':
60+
break;
61+
default:
62+
throw new Error(`Unkwnon type ${def.type} for field ${def.name}.`);
63+
break;
64+
}
65+
66+
if ((!def.size || isNaN(def.size)) && !def._dynamicSize) {
67+
throw new Error(`Field ${def.name} has invalid size ${def.size}.`);
68+
}
69+
});
70+
}
71+
72+
73+
pack(values) {
74+
const self = this;
75+
let offset = 0;
76+
let data = BufferStruct.forType(this);
77+
this._fields.forEach(function packField(def, n) {
78+
const val = values[n];
79+
let realSize;
80+
if (def._dynamicSize) { // last item
81+
if (def.type != 'buffer') throw new Error("Dynamic size is supported on 'buffer' type.");
82+
if (! (val instanceof Buffer)) throw new Error(`Expected type Buffer for ${def.name} but got '${val}'.`);
83+
if (def.size) throw new Error(`Dynamic must not have a size specified.`);
84+
debug(`-- ${JSON.stringify(val)}`);
85+
const sizeBuffer = Buffer.alloc(4);
86+
sizeBuffer.writeUInt32LE(val.length);
87+
debug(`Pack dynamic ${def.name} of size ${sizeBuffer} ${offset}+${val.length}+${sizeBuffer.length}.`);
88+
data = Buffer.concat([
89+
data.subarray(0, offset), // original message
90+
sizeBuffer, // size indicator (till the end of record)
91+
val, // the buffer
92+
], offset + sizeBuffer.length + val.length);
93+
realSize = val.length + sizeBuffer.length;
94+
} else {
95+
const target = data.subarray(offset, offset + def.size);
96+
// debug(`pack ${offset}, ${def.size}`, val);
97+
realSize = self._packField(def, val, target);
98+
}
99+
offset += realSize;
100+
});
101+
return data.subarray(0, offset);
102+
}
103+
104+
_packField(def, val, target) {
105+
let realSize = def.size;
106+
if (def.type == 'str' || def.type == 'buffer') {
107+
if (typeof val == 'string' || val instanceof String) {
108+
realSize = val.length;
109+
target.writeUInt32LE(realSize);
110+
target.write(val, 4);
111+
} else if (val instanceof Buffer) {
112+
realSize = val.length;
113+
val.copy(target, 4);
114+
} else {
115+
realSize = val.length;
116+
target.write(val.toString(), 4);
117+
}
118+
target.writeUInt32LE(realSize);
119+
realSize += 4;
120+
} else {
121+
// TODO: guess type by size
122+
const packMeth = `write${def.type}`;
123+
if (target[packMeth]) {
124+
target[packMeth](val);
125+
} else {
126+
throw new Error(`Invalid type ${def.type} (Buffer has not method '${packMeth}'.`);
127+
}
128+
}
129+
return realSize;
130+
}
131+
132+
unpack(data) {
133+
const self = this;
134+
let offset = 0;
135+
const values = [];
136+
if (!(data instanceof Buffer)) {
137+
throw new Error(`Data is expected to be Buffer found ${data} instead.`);
138+
}
139+
this._fields.forEach(function unpackField(def, n) {
140+
const val = data.subarray(offset, def.size ? (offset + def.size) : undefined);
141+
// debug(`unpack ${def.name} at ${offset}, size ${def.size}`, val);
142+
let [unpackedValue, realSize] = self._unpackField(def, val);
143+
values.push(unpackedValue);
144+
offset += realSize;
145+
});
146+
return values;
147+
}
148+
149+
_unpackField(def, val) {
150+
let realSize = def.size;
151+
let unpackedValue;
152+
if (def.type == 'str' || def.type == 'buffer') {
153+
realSize = val.readUInt32LE();
154+
debug(`unpack ${def.name} of type ${def.type} size ${realSize}/${def.size}`);
155+
if (realSize) {
156+
if (def.type == 'str') {
157+
unpackedValue = val.toString('utf8', 4, 4 + realSize);
158+
} else {
159+
unpackedValue = val.subarray(4, 4 + realSize);
160+
}
161+
} else { // the string was a buffer
162+
unpackedValue = val.subarray(4);
163+
}
164+
realSize += 4;
165+
} else {
166+
debug(`unpack ${def.name} of type ${def.type} size -/${def.size}`);
167+
const packMeth = `read${def.type}`;
168+
if (val[packMeth]) {
169+
unpackedValue = val[packMeth]();
170+
} else {
171+
throw new Error(`Invalid type ${def.type} (Buffer has not method '${packMeth}'.`);
172+
}
173+
}
174+
return [unpackedValue, realSize];
175+
}
176+
177+
toJSON(data) {
178+
const values = this.unpack(data);
179+
const obj = {};
180+
this._fields.forEach((def, n) => obj[def.name] = values[n]);
181+
return obj;
182+
}
183+
184+
fromJSON(obj) {
185+
const values = [];
186+
this._fields.forEach((def, n) => values.push(obj[def.name]));
187+
return this.pack(values);
188+
}
189+
190+
get size() {
191+
if (!this._size) {
192+
this._size = this._fields.reduce((a,b)=>{
193+
return {size: a.size+b.size};
194+
}
195+
,{size:0}).size;
196+
}
197+
return this._size;
198+
}
199+
}
200+
201+
202+
if (require.main == module) {
203+
const print = console.log.bind(console);
204+
print("Running tests ...");
205+
206+
const struct = new BufferStructType([
207+
{name: 'title', type: 'str', size: 20},
208+
{name: 'offset', type: 'uint'},
209+
{name: 'myFloat', type: 'float'},
210+
{name: 'myDouble', type: 'float'},
211+
{name: 'smallInt', type: 'int', size: 1},
212+
{name: 'data', type: 'buffer', size: 20},
213+
{name: 'payload', type: 'buffer', size: 0},
214+
215+
])
216+
217+
const data = {
218+
title: 'pokus',
219+
data: Buffer.from([10, 13, 0, 45]),
220+
offset: 10,
221+
myFloat: 10.5,
222+
myDouble: 124132412,
223+
smallInt: 30,
224+
payload: Buffer.from('012345678901', 'ascii'),
225+
}
226+
227+
print('>', data);
228+
229+
const buf = struct.fromJSON(data);
230+
print('-----', buf.toString('hex'), '-----');
231+
232+
233+
print('<', struct.toJSON(buf));
234+
}
235+
236+
module.exports = {
237+
BufferStructType: BufferStructType,
238+
BufferStruct: BufferStruct,
239+
}

lib/index.js

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
//-- vim: ft=javascript tabstop=2 softtabstop=2 expandtab shiftwidth=2
22
'use strict';
33

4-
const crypto = require('crypto');
54

6-
function checksum(data, {length=5}={}) {
7-
try {
8-
return crypto.createHash('md5').update(data).digest('hex').slice(0,length).replace(/(.{4})/g, ':$1').slice(1);
9-
} catch(err) {
10-
console.error(err, data);
11-
throw err;
12-
}
13-
}
5+
const { BufferStruct, BufferStructType } = require('./buffer-struct')
6+
const { checksum, debug } = require('./utils');
7+
148

15-
exports.checksum = checksum;
16-
module.exports.WsJsonProtocol = require('./ws-json').WsJsonProtocol;
9+
module.exports = {
10+
11+
WsJsonProtocol: require('./ws-json').WsJsonProtocol,
12+
BufferStructType: BufferStructType,
13+
BufferStruct: BufferStruct,
14+
checksum: checksum,
15+
debug: debug,
16+
// StructWebSocket: require('./ws-channel'),
17+
}

lib/utils.js

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
const crypto = require('crypto');
2+
const DEBUG = process.env.DEBUG;
3+
4+
function checksum(data, {length=5}={}) {
5+
try {
6+
return crypto.createHash('md5').update(data).digest('hex').slice(0,length).replace(/(.{4})/g, ':$1').slice(1);
7+
} catch(err) {
8+
console.error(err, data);
9+
throw err;
10+
}
11+
}
12+
13+
function debug() {
14+
if (DEBUG) console.log.apply(console, arguments);
15+
}
16+
17+
module.exports = {
18+
checksum: checksum,
19+
debug: debug,
20+
}

lib/ws-channel.js

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
'use strict';
2+
//-- vim: ft=javascript tabstop=2 softtabstop=2 expandtab shiftwidth=2
3+
4+
const { BufferStruct, BufferStructType } = require('./buffer-struct');
5+
6+
7+
class StructWebSocket {
8+
constructor(format, websocket) {
9+
this._bufferStruct = new BufferStructType(format);
10+
this._ws = websocket;
11+
}
12+
13+
on(eventType, callback) {
14+
const self = this;
15+
if (eventType == 'message') {
16+
callback = (message)=>callback(self._bufferStruct.toJson(message));
17+
}
18+
return this._ws.on(eventType, callback);
19+
}
20+
21+
send(data) {
22+
this._ws.send(this._bufferStruct.fromJson(data);
23+
}
24+
}
25+
26+
exports.StructWebSocket = StructWebSocket;
27+
28+

0 commit comments

Comments
 (0)