Skip to content

Commit 02ee88d

Browse files
author
chunpu
authored
Merge pull request #7 from meschbach/tcp
TCP Syslog Server
2 parents 4cfe83e + b8dda09 commit 02ee88d

File tree

7 files changed

+365
-33
lines changed

7 files changed

+365
-33
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
node_modules
2+
*.sw[op]

.travis.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,5 @@
11
language: node_js
2+
node_js:
3+
- "6"
4+
- "7"
5+
- "8"

index.js

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
// vim: set ft=javascript tabstop=4 softtabstop=4 shiftwidth=4 autoindent:
12
var dgram = require('dgram')
23
var debug = require('debug')('syslogd')
34

@@ -11,6 +12,7 @@ function Syslogd(fn, opt) {
1112
}
1213
this.opt = opt || {}
1314
this.handler = fn
15+
1416
this.server = dgram.createSocket('udp4')
1517
}
1618

@@ -96,3 +98,176 @@ function parser(msg, rinfo) {
9698
}
9799

98100
exports.parser = parser
101+
102+
/*
103+
* SOCK_STREAM service
104+
*/
105+
const net = require('net')
106+
107+
function SimpleStreamService( messageReceived, options ) {
108+
return new StreamService( messageReceived, options );
109+
}
110+
111+
function StreamService(fn, opt) {
112+
this.opt = opt || {}
113+
this.handler = fn
114+
115+
this.server = net.createServer( ( connection ) => {
116+
debug('New connection from ' + connection.remoteAddress + ":" + connection.remotePort )
117+
var state = new ConnectionState( this, connection );
118+
connection.on('data', ( buffer ) => { state.more_data( buffer ) } )
119+
connection.on('end', () => { state.closed() } )
120+
})
121+
return this;
122+
}
123+
124+
StreamService.prototype.listen = function( port, callback ){
125+
var server = this.server
126+
callback = callback || noop
127+
this.port = port || 514 // default is 514
128+
debug('Binding to ' + this.port)
129+
var me = this
130+
server
131+
.on('error', function(err) {
132+
debug('binding error: %o', err)
133+
callback(err)
134+
})
135+
.on('listening', function() {
136+
debug('tcp binding ok')
137+
me.port = server.address().port
138+
callback(null, me)
139+
})
140+
.listen( port, this.opt.address )
141+
142+
return this
143+
}
144+
145+
class ConnectionState {
146+
constructor( service, connection ){
147+
this.service = service
148+
this.info = {
149+
address: connection.remoteAddress,
150+
family: connection.family,
151+
port: connection.remotePort
152+
}
153+
this.frameParser = new FrameParser( ( frame ) => {
154+
this.dispatch_message( frame )
155+
})
156+
}
157+
158+
more_data( buffer ) {
159+
this.frameParser.feed( buffer )
160+
}
161+
162+
dispatch_message( frame ) {
163+
let clientInfo = {
164+
address: this.info.address,
165+
family: this.info.family,
166+
family: this.info.remotePort,
167+
size: frame.length
168+
}
169+
let message = parser( frame, clientInfo )
170+
this.service.handler( message )
171+
}
172+
173+
closed(){
174+
this.frameParser.done()
175+
}
176+
}
177+
178+
let FRAME_TYPE_UNKNOWN = 0;
179+
let FRAME_TYPE_NEWLINE = 1;
180+
let FRAME_TYPE_OCTET = 2;
181+
182+
class FrameParser {
183+
constructor( callback ){
184+
this.buffer = Buffer.from( "" )
185+
this.callback = callback;
186+
this.frame_state = FRAME_TYPE_UNKNOWN ;
187+
}
188+
189+
feed( data ){
190+
this.buffer = Buffer.concat( [ this.buffer, data ] )
191+
this.check_framing()
192+
}
193+
194+
done() {
195+
if( this.buffer.length > 0 ){
196+
this.callback( this.buffer.toString() )
197+
}
198+
this.buffer = Buffer.from( "" )
199+
}
200+
201+
check_framing(){
202+
if( this.frame_state == FRAME_TYPE_UNKNOWN ) {
203+
this.decide_on_frame_type();
204+
} else if( this.frame_state == FRAME_TYPE_NEWLINE ) {
205+
this.check_newline_framing();
206+
} else if( this.frame_state == FRAME_TYPE_OCTET ) {
207+
this.check_octet_frame()
208+
} else {
209+
throw "Invalid farme state";
210+
}
211+
}
212+
213+
decide_on_frame_type() {
214+
// do nothing if buffer is too short
215+
if( this.buffer.length < 8 ) {
216+
return
217+
}
218+
// shrink our check buffer
219+
let check = this.buffer.slice( 0, 8 )
220+
// Do we have spaces?
221+
let space = check.indexOf( " " )
222+
if( space == -1 ){
223+
this.frame_state = FRAME_TYPE_NEWLINE
224+
return this.check_framing()
225+
}
226+
227+
// Check output if we can convert it to a number
228+
let size = parseInt( check.slice( 0, space ), 10 )
229+
if( isNaN( size ) || size < 2 ) {
230+
this.frame_state = FRAME_TYPE_NEWLINE
231+
return this.check_framing()
232+
}
233+
234+
// Octet framing
235+
this.octets = size
236+
this.frame_state = FRAME_TYPE_OCTET
237+
this.buffer = this.buffer.slice( space + 1 )
238+
return this.check_framing()
239+
}
240+
241+
check_newline_framing() {
242+
let indexOfNewLine = this.buffer.indexOf( "\n" )
243+
if( indexOfNewLine == -1 ) { return; }
244+
245+
let frame = this.buffer.slice( 0, indexOfNewLine )
246+
this.buffer = this.buffer.slice( indexOfNewLine + 1 )
247+
248+
this._emit_and_reset( frame )
249+
}
250+
251+
check_octet_frame() {
252+
let size = this.octets
253+
if( !size ) { throw "Not currently in octet strategy" }
254+
255+
if( this.buffer.length < size ) { return }
256+
257+
let frame = this.buffer.slice( 0, size )
258+
this.buffer = this.buffer.slice( size )
259+
260+
this._emit_and_reset( frame )
261+
}
262+
263+
_emit_and_reset( frame ){
264+
this.callback( frame.toString() )
265+
266+
this.frame_state = FRAME_TYPE_UNKNOWN
267+
this.check_framing()
268+
}
269+
}
270+
271+
exports.StreamService = SimpleStreamService
272+
exports.FrameParser = FrameParser
273+

package.json

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
"test": "test"
88
},
99
"scripts": {
10-
"test": "sudo $(which node) test",
10+
"test": "mocha",
1111
"performance": "node performance"
1212
},
1313
"keywords": [
@@ -20,13 +20,16 @@
2020
"dependencies": {
2121
"debug": "^2.1.0"
2222
},
23-
"devDependencies": {},
23+
"devDependencies": {
24+
"mocha" : "3.4.2"
25+
},
2426
"repository": {
2527
"type": "git",
2628
"url": "https://github.com/chunpu/syslogd.git"
2729
},
2830
"bugs": {
2931
"url": "https://github.com/chunpu/syslogd/issues"
3032
},
31-
"homepage": "https://github.com/chunpu/syslogd"
33+
"homepage": "https://github.com/chunpu/syslogd",
34+
"engines" : { "node" : ">=6.0.0" }
3235
}

test/frame.js

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
let mocha = require( "mocha" )
2+
let assert = require('assert')
3+
let syslogd = require('../')
4+
let FrameParser = syslogd.FrameParser
5+
6+
describe( "StreamFrameParser", () => {
7+
var frames
8+
var parser
9+
10+
beforeEach( () => {
11+
frames = []
12+
parser = new FrameParser( ( frame ) => {
13+
frames.push( frame )
14+
//console.log( "Frame", frames.length, " '"+ frame + "'" )
15+
} )
16+
})
17+
18+
describe( "when closed before receiving data", () => {
19+
it( "doesn't emit frames", () => { assert.equal( frames.length, 0 ) } )
20+
})
21+
22+
describe( "given a complete new line frame", () => {
23+
beforeEach( () => {
24+
parser.feed( Buffer.from( "nontransparent newline\n" ) )
25+
})
26+
27+
it( "adds a frame", () => { assert.equal( frames.length, 1) })
28+
it( "correctly copies the frame", () => { assert.equal( frames[0], "nontransparent newline" ) })
29+
30+
describe( "when the stream is complete", () => {
31+
it( "doesn't emit a new frame", () => { assert.equal( frames.length, 1 ) } )
32+
})
33+
})
34+
35+
describe( "when given a partial new line frame", () => {
36+
beforeEach( () => {
37+
parser.feed( Buffer.from( "not done" ) )
38+
})
39+
40+
it( "does not emit the frame yet", () => { assert.equal( frames.length, 0 ) })
41+
42+
describe( "when given the completed part", () => {
43+
beforeEach( () => {
44+
parser.feed( Buffer.from( " yet\n" ) )
45+
})
46+
47+
it( "completes the frame", () => { assert.equal( frames.length, 1) } )
48+
it( "ensure frame content correct", () => { assert.equal( frames[0], "not done yet" ) } )
49+
})
50+
51+
describe( "when finished with additional frames", () => {
52+
beforeEach( () => { parser.feed( Buffer.from( "here\nwith another\nframe" ) ) })
53+
54+
it( "it only completed two frames", () => { assert.equal( frames.length, 2) } )
55+
it( "frame 1 is complete", () => { assert.equal( frames[0], "not donehere" ) } )
56+
it( "frame 2 is correct", () => { assert.equal( frames[1], "with another" ) } )
57+
58+
describe( "when the stream is done", () => {
59+
beforeEach( () => { parser.done() } )
60+
it( "yeilds another frame", () => { assert.equal( frames.length, 3 ) } )
61+
it( "yeilds leftover content", () => { assert.equal( frames[2], "frame" ) } )
62+
})
63+
})
64+
})
65+
66+
describe( "when given multiple new line frames", () => {
67+
beforeEach( () => {
68+
parser.feed( Buffer.from( "multiple frames\nwithin a single\nbuffer" ) )
69+
} )
70+
71+
it( "emits all frames", () => { assert.equal( frames.length, 2 ) })
72+
it( "emits the frames in correct order", () => {
73+
assert.equal( frames[0], "multiple frames" )
74+
assert.equal( frames[1], "within a single" )
75+
})
76+
})
77+
78+
describe( "when given an octet counted frame", () => {
79+
const message = "<12>1 2017-05-26T14:05:00.000Z host proc 42 - - - Some message"
80+
beforeEach( () => {
81+
let length = message.length
82+
parser.feed( Buffer.from( length + " " + message ) )
83+
})
84+
85+
it( "emits a frame", () => { assert.equal( frames.length, 1 ) } )
86+
it( "emits only the framed contents", () => { assert.equal( frames[0], message ) } )
87+
})
88+
89+
describe( "when given mutliple octet frames", () => {
90+
const message1 = "<12>1 2017-05-26T14:05:00.000Z host proc 42 - - - Some message"
91+
const message2 = "<18>1 2017-05-26T14:31:00.123Z host proc 42 - - - Secon messages"
92+
beforeEach( () => {
93+
parser.feed( Buffer.from( message1.length + " " + message1 + message2.length + " " + message2 ) )
94+
})
95+
96+
it( "emits a frame", () => { assert.equal( frames.length, 2 ) } )
97+
it( "emits the first message", () => { assert.equal( frames[0], message1 ) } )
98+
it( "emits the second message", () => { assert.equal( frames[1], message2 ) } )
99+
})
100+
})
101+

test/index.js

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,39 @@
11
var dgram = require('dgram')
22
var assert = require('assert')
3+
let mocha = require( 'mocha' )
34
var Syslogd = require('../')
45

5-
var time = 'Dec 15 10:58:44'
6-
var testMsg = '<183>' + time + ' hostname tag: info'
6+
describe( "given a syslogd service", () => {
7+
it( "recieves and processes messages", (done) => {
8+
var time = 'Dec 15 10:58:44'
9+
var testMsg = '<183>' + time + ' hostname tag: info'
10+
const port = 10514
711

8-
Syslogd(function(info) {
9-
//console.log(info)
10-
info.port = null // port is random
11-
var shouldRet = {
12-
facility: 7
13-
, severity: 22
14-
, tag: 'tag'
15-
, time: new Date(time + ' ' + new Date().getFullYear())
16-
, hostname: 'hostname'
17-
, address: '127.0.0.1'
18-
, family: 'IPv4'
19-
, port: null
20-
, size: 39
21-
, msg: 'info'
22-
}
23-
assert.deepEqual(shouldRet, info)
24-
console.log('test pass!')
25-
process.exit(0)
26-
}).listen(514, function(err) { // sudo
27-
console.log('listen', err)
28-
assert(!err)
29-
var client = dgram.createSocket('udp4')
30-
var buffer = new Buffer(testMsg)
31-
client.send(buffer, 0, buffer.length, 514, 'localhost', function(err, bytes) {
32-
console.log('send', err, bytes)
33-
})
12+
Syslogd(function(info) {
13+
//console.log(info)
14+
info.port = null // port is random
15+
var shouldRet = {
16+
facility: 7
17+
, severity: 22
18+
, tag: 'tag'
19+
, time: new Date(time + ' ' + new Date().getFullYear())
20+
, hostname: 'hostname'
21+
, address: '127.0.0.1'
22+
, family: 'IPv4'
23+
, port: null
24+
, size: 39
25+
, msg: 'info'
26+
}
27+
assert.deepEqual(shouldRet, info)
28+
done()
29+
}).listen( port, function(err) { // sudo
30+
//console.log('listen', err)
31+
assert(!err)
32+
var client = dgram.createSocket('udp4')
33+
var buffer = new Buffer(testMsg)
34+
client.send(buffer, 0, buffer.length, port, 'localhost', function(err, bytes) {
35+
//console.log('send', err, bytes)
36+
})
37+
})
38+
})
3439
})
35-
36-

0 commit comments

Comments
 (0)