Skip to content

Commit 52687c9

Browse files
authored
抽出readMessage函数,方便对该函数单独benchmark (#15)
1 parent 3842c68 commit 52687c9

File tree

3 files changed

+152
-154
lines changed

3 files changed

+152
-154
lines changed

conn.go

Lines changed: 148 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ type Conn struct {
6565
bp bytespool.BytesPool // 实验某些特性加的字段
6666

6767
delayWrite
68+
readHeadArray [enum.MaxFrameHeaderSize]byte
69+
fragmentFramePayload []byte // 存放分片帧的缓冲区
70+
bufioPayload []byte
71+
fragmentFrameHeader *frame.FrameHeader
6872
}
6973

7074
func setNoDelay(c net.Conn, noDelay bool) error {
@@ -132,10 +136,36 @@ func decode(payload []byte) ([]byte, error) {
132136
return o.Bytes(), nil
133137
}
134138

135-
func (c *Conn) ReadLoop() error {
139+
func (c *Conn) ReadLoop() (err error) {
136140
c.OnOpen(c)
137141

138-
return c.readLoop()
142+
defer func() {
143+
// c.OnClose(c, err)
144+
c.Close()
145+
if c.fr.IsInit() {
146+
defer func() {
147+
c.fr.Release()
148+
c.fr.BufPtr()
149+
}()
150+
}
151+
}()
152+
153+
if c.br != nil {
154+
newSize := int(1024 * c.bufioMultipleTimesPayloadSize)
155+
if c.br.Size() != newSize {
156+
// TODO sync.Pool管理
157+
(*bufio2.Reader2)(unsafe.Pointer(c.br)).ResetBuf(make([]byte, newSize))
158+
}
159+
// bufio 模式才会使用payload
160+
c.bufioPayload = *bytespool.GetBytes(1024 + enum.MaxFrameHeaderSize)
161+
}
162+
163+
for {
164+
err = c.readMessage()
165+
if err != nil {
166+
return err
167+
}
168+
}
139169
}
140170

141171
func (c *Conn) StartReadLoop() {
@@ -144,7 +174,7 @@ func (c *Conn) StartReadLoop() {
144174
}()
145175
}
146176

147-
func (c *Conn) readDataFromNet(headArray *[enum.MaxFrameHeaderSize]byte, payload *[]byte) (f frame.Frame, err error) {
177+
func (c *Conn) readDataFromNet(headArray *[enum.MaxFrameHeaderSize]byte, bufioPayload *[]byte) (f frame.Frame, err error) {
148178
if c.readTimeout > 0 {
149179
err = c.c.SetReadDeadline(time.Now().Add(c.readTimeout))
150180
if err != nil {
@@ -156,7 +186,7 @@ func (c *Conn) readDataFromNet(headArray *[enum.MaxFrameHeaderSize]byte, payload
156186
if c.fr.IsInit() {
157187
f, err = frame.ReadFrameFromWindows(&c.fr, headArray, c.windowsMultipleTimesPayloadSize)
158188
} else {
159-
f, err = frame.ReadFrameFromReader(c.br, headArray, payload)
189+
f, err = frame.ReadFrameFromReader(c.br, headArray, bufioPayload)
160190
}
161191
if err != nil {
162192
c.Callback.OnClose(c, err)
@@ -172,186 +202,154 @@ func (c *Conn) readDataFromNet(headArray *[enum.MaxFrameHeaderSize]byte, payload
172202
}
173203

174204
// 读取websocket frame.Frame的循环
175-
func (c *Conn) readLoop() error {
176-
var f frame.Frame
177-
var fragmentFrameHeader *frame.FrameHeader
178-
179-
defer c.Close()
180-
181-
var err error
182-
var op opcode.Opcode
183-
184-
if c.fr.IsInit() {
185-
defer func() {
186-
c.fr.Release()
187-
c.fr.BufPtr()
188-
}()
205+
func (c *Conn) readMessage() (err error) {
206+
// 从网络读取数据
207+
f, err := c.readDataFromNet(&c.readHeadArray, &c.bufioPayload)
208+
if err != nil {
209+
return err
189210
}
190211

191-
var fragmentFrameBuf []byte
192-
var headArray [enum.MaxFrameHeaderSize]byte
193-
194-
var payload []byte
195-
if c.br != nil {
196-
newSize := int(1024 * c.bufioMultipleTimesPayloadSize)
197-
if c.br.Size() != newSize {
198-
// TODO sync.Pool管理
199-
(*bufio2.Reader2)(unsafe.Pointer(c.br)).ResetBuf(make([]byte, newSize))
200-
}
201-
// bufio 模式才会使用payload
202-
payload = *bytespool.GetBytes(1024 + enum.MaxFrameHeaderSize)
212+
op := f.Opcode
213+
if c.fragmentFrameHeader != nil {
214+
op = c.fragmentFrameHeader.Opcode
203215
}
204216

205-
for {
217+
rsv1 := f.GetRsv1()
218+
// 检查Rsv1 rsv2 Rfd, errsv3
219+
if rsv1 && c.failRsv1(op) || f.GetRsv2() || f.GetRsv3() {
220+
err = fmt.Errorf("%w:Rsv1(%t) Rsv2(%t) rsv2(%t) compression:%t", ErrRsv123, rsv1, f.GetRsv2(), f.GetRsv3(), c.compression)
221+
return c.writeErrAndOnClose(ProtocolError, err)
222+
}
206223

207-
// 从网络读取数据
208-
f, err = c.readDataFromNet(&headArray, &payload)
209-
if err != nil {
210-
return err
211-
}
224+
fin := f.GetFin()
225+
if c.fragmentFrameHeader != nil && !f.Opcode.IsControl() {
226+
if f.Opcode == 0 {
227+
c.fragmentFramePayload = append(c.fragmentFramePayload, f.Payload...)
212228

213-
op = f.Opcode
214-
if fragmentFrameHeader != nil {
215-
op = fragmentFrameHeader.Opcode
216-
}
229+
// 分段的在这返回
230+
if fin {
231+
// 解压缩
232+
if c.fragmentFrameHeader.GetRsv1() && c.decompression {
233+
tempBuf, err := decode(c.fragmentFramePayload)
234+
if err != nil {
235+
return err
236+
}
237+
c.fragmentFramePayload = tempBuf
238+
}
239+
// 这里的check按道理应该放到f.Fin前面, 会更符合rfc的标准, 前提是c.utf8Check修改成流式解析
240+
// TODO c.utf8Check 修改成流式解析
241+
if c.fragmentFrameHeader.Opcode == opcode.Text && !c.utf8Check(c.fragmentFramePayload) {
242+
c.Callback.OnClose(c, ErrTextNotUTF8)
243+
return ErrTextNotUTF8
244+
}
217245

218-
rsv1 := f.GetRsv1()
219-
// 检查Rsv1 rsv2 Rsv3
220-
if rsv1 && c.failRsv1(op) || f.GetRsv2() || f.GetRsv3() {
221-
err = fmt.Errorf("%w:Rsv1(%t) Rsv2(%t) rsv2(%t) compression:%t", ErrRsv123, rsv1, f.GetRsv2(), f.GetRsv3(), c.compression)
222-
return c.writeErrAndOnClose(ProtocolError, err)
246+
c.Callback.OnMessage(c, c.fragmentFrameHeader.Opcode, c.fragmentFramePayload)
247+
c.fragmentFramePayload = c.fragmentFramePayload[0:0]
248+
c.fragmentFrameHeader = nil
249+
}
250+
return nil
223251
}
224252

225-
fin := f.GetFin()
226-
if fragmentFrameHeader != nil && !f.Opcode.IsControl() {
227-
if f.Opcode == 0 {
228-
fragmentFrameBuf = append(fragmentFrameBuf, f.Payload...)
229-
230-
// 分段的在这返回
231-
if fin {
232-
// 解压缩
233-
if fragmentFrameHeader.GetRsv1() && c.decompression {
234-
tempBuf, err := decode(fragmentFrameBuf)
235-
if err != nil {
236-
return err
237-
}
238-
fragmentFrameBuf = tempBuf
239-
}
240-
// 这里的check按道理应该放到f.Fin前面, 会更符合rfc的标准, 前提是c.utf8Check修改成流式解析
241-
// TODO c.utf8Check 修改成流式解析
242-
if fragmentFrameHeader.Opcode == opcode.Text && !c.utf8Check(fragmentFrameBuf) {
243-
c.Callback.OnClose(c, ErrTextNotUTF8)
244-
return ErrTextNotUTF8
245-
}
253+
c.writeErrAndOnClose(ProtocolError, ErrFrameOpcode)
254+
return ErrFrameOpcode
255+
}
246256

247-
c.Callback.OnMessage(c, fragmentFrameHeader.Opcode, fragmentFrameBuf)
248-
fragmentFrameBuf = fragmentFrameBuf[0:0]
249-
fragmentFrameHeader = nil
250-
}
251-
continue
257+
if f.Opcode == opcode.Text || f.Opcode == opcode.Binary {
258+
if !fin {
259+
prevFrame := f.FrameHeader
260+
// 第一次分段
261+
if len(c.fragmentFramePayload) == 0 {
262+
c.fragmentFramePayload = append(c.fragmentFramePayload, f.Payload...)
263+
f.Payload = nil
252264
}
253265

254-
c.writeErrAndOnClose(ProtocolError, ErrFrameOpcode)
255-
return ErrFrameOpcode
266+
// 让fragmentFrame的Payload指向readBuf, readBuf 原引用直接丢弃
267+
c.fragmentFrameHeader = &prevFrame
268+
return
256269
}
257270

258-
if f.Opcode == opcode.Text || f.Opcode == opcode.Binary {
259-
if !fin {
260-
prevFrame := f.FrameHeader
261-
// 第一次分段
262-
if len(fragmentFrameBuf) == 0 {
263-
fragmentFrameBuf = append(fragmentFrameBuf, f.Payload...)
264-
f.Payload = nil
265-
}
266-
267-
// 让fragmentFrame的Payload指向readBuf, readBuf 原引用直接丢弃
268-
fragmentFrameHeader = &prevFrame
269-
continue
271+
if rsv1 && c.decompression {
272+
// 不分段的解压缩
273+
f.Payload, err = decode(f.Payload)
274+
if err != nil {
275+
return err
270276
}
277+
}
271278

272-
if rsv1 && c.decompression {
273-
// 不分段的解压缩
274-
f.Payload, err = decode(f.Payload)
275-
if err != nil {
276-
return err
277-
}
279+
if f.Opcode == opcode.Text {
280+
if !c.utf8Check(f.Payload) {
281+
c.c.Close()
282+
c.Callback.OnClose(c, ErrTextNotUTF8)
283+
return ErrTextNotUTF8
278284
}
285+
}
279286

280-
if f.Opcode == opcode.Text {
281-
if !c.utf8Check(f.Payload) {
282-
c.c.Close()
283-
c.Callback.OnClose(c, ErrTextNotUTF8)
284-
return ErrTextNotUTF8
285-
}
286-
}
287+
c.Callback.OnMessage(c, f.Opcode, f.Payload)
288+
return
289+
}
287290

288-
c.Callback.OnMessage(c, f.Opcode, f.Payload)
289-
continue
291+
if f.Opcode == Close || f.Opcode == Ping || f.Opcode == Pong {
292+
// 对方发的控制消息太大
293+
if f.PayloadLen > maxControlFrameSize {
294+
c.writeErrAndOnClose(ProtocolError, ErrMaxControlFrameSize)
295+
return ErrMaxControlFrameSize
296+
}
297+
// Close, Ping, Pong 不能分片
298+
if !fin {
299+
c.writeErrAndOnClose(ProtocolError, ErrNOTBeFragmented)
300+
return ErrNOTBeFragmented
290301
}
291302

292-
if f.Opcode == Close || f.Opcode == Ping || f.Opcode == Pong {
293-
// 对方发的控制消息太大
294-
if f.PayloadLen > maxControlFrameSize {
295-
c.writeErrAndOnClose(ProtocolError, ErrMaxControlFrameSize)
296-
return ErrMaxControlFrameSize
297-
}
298-
// Close, Ping, Pong 不能分片
299-
if !fin {
300-
c.writeErrAndOnClose(ProtocolError, ErrNOTBeFragmented)
301-
return ErrNOTBeFragmented
303+
if f.Opcode == Close {
304+
if len(f.Payload) == 0 {
305+
return c.writeErrAndOnClose(NormalClosure, ErrClosePayloadTooSmall)
302306
}
303307

304-
if f.Opcode == Close {
305-
if len(f.Payload) == 0 {
306-
return c.writeErrAndOnClose(NormalClosure, ErrClosePayloadTooSmall)
307-
}
308-
309-
if len(f.Payload) < 2 {
310-
return c.writeErrAndOnClose(ProtocolError, ErrClosePayloadTooSmall)
311-
}
312-
313-
if !c.utf8Check(f.Payload[2:]) {
314-
return c.writeErrAndOnClose(ProtocolError, ErrTextNotUTF8)
315-
}
308+
if len(f.Payload) < 2 {
309+
return c.writeErrAndOnClose(ProtocolError, ErrClosePayloadTooSmall)
310+
}
316311

317-
code := binary.BigEndian.Uint16(f.Payload)
318-
if !validCode(code) {
319-
return c.writeErrAndOnClose(ProtocolError, ErrCloseValue)
320-
}
312+
if !c.utf8Check(f.Payload[2:]) {
313+
return c.writeErrAndOnClose(ProtocolError, ErrTextNotUTF8)
314+
}
321315

322-
// 回敬一个close包
323-
if err := c.WriteTimeout(Close, f.Payload, 2*time.Second); err != nil {
324-
return err
325-
}
316+
code := binary.BigEndian.Uint16(f.Payload)
317+
if !validCode(code) {
318+
return c.writeErrAndOnClose(ProtocolError, ErrCloseValue)
319+
}
326320

327-
err = bytesToCloseErrMsg(f.Payload)
328-
c.Callback.OnClose(c, err)
321+
// 回敬一个close包
322+
if err := c.WriteTimeout(Close, f.Payload, 2*time.Second); err != nil {
329323
return err
330324
}
331325

332-
if f.Opcode == Ping {
333-
// 回一个pong包
334-
if c.replyPing {
335-
if err := c.WriteTimeout(Pong, f.Payload, 2*time.Second); err != nil {
336-
c.Callback.OnClose(c, err)
337-
return err
338-
}
339-
c.Callback.OnMessage(c, f.Opcode, f.Payload)
340-
continue
341-
}
342-
}
326+
err = bytesToCloseErrMsg(f.Payload)
327+
c.Callback.OnClose(c, err)
328+
return err
329+
}
343330

344-
if f.Opcode == Pong && c.ignorePong {
345-
continue
331+
if f.Opcode == Ping {
332+
// 回一个pong包
333+
if c.replyPing {
334+
if err := c.WriteTimeout(Pong, f.Payload, 2*time.Second); err != nil {
335+
c.Callback.OnClose(c, err)
336+
return err
337+
}
338+
c.Callback.OnMessage(c, f.Opcode, f.Payload)
339+
return
346340
}
341+
}
347342

348-
c.Callback.OnMessage(c, f.Opcode, nil)
349-
continue
343+
if f.Opcode == Pong && c.ignorePong {
344+
return
350345
}
351-
// 检查Opcode
352-
c.writeErrAndOnClose(ProtocolError, ErrOpcode)
353-
return ErrOpcode
346+
347+
c.Callback.OnMessage(c, f.Opcode, nil)
348+
return
354349
}
350+
// 检查Opcode
351+
c.writeErrAndOnClose(ProtocolError, ErrOpcode)
352+
return ErrOpcode
355353
}
356354

357355
type wrapBuffer struct {

server_handshake.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,6 @@ var (
3232
strWebSocketKey = "Sec-WebSocket-Key"
3333
)
3434

35-
type ConnOption struct {
36-
Config
37-
}
38-
3935
func writeHeaderVal(w io.Writer, val []byte) (err error) {
4036
if _, err = w.Write(val); err != nil {
4137
return

server_options.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ package quickws
1616

1717
type ServerOption func(*ConnOption)
1818

19+
type ConnOption struct {
20+
Config
21+
}
22+
1923
// 1.配置压缩和解压缩
2024
func WithServerDecompressAndCompress() ServerOption {
2125
return func(o *ConnOption) {

0 commit comments

Comments
 (0)