@@ -6,6 +6,7 @@ package raft
6
6
import (
7
7
"bufio"
8
8
"context"
9
+ "encoding/base64"
9
10
"errors"
10
11
"fmt"
11
12
"io"
@@ -54,13 +55,24 @@ const (
54
55
minInFlightForPipelining = 2
55
56
)
56
57
58
+ type zs struct {}
59
+
60
+ func (zs ) Read ([]byte ) (int , error ) {
61
+ panic ("Read should never be called" )
62
+ }
63
+
64
+ func (zs ) Write ([]byte ) (int , error ) {
65
+ panic ("Write should never be called" )
66
+ }
67
+
57
68
var (
58
69
// ErrTransportShutdown is returned when operations on a transport are
59
70
// invoked after it's been terminated.
60
71
ErrTransportShutdown = errors .New ("transport shutdown" )
61
72
62
73
// ErrPipelineShutdown is returned when the pipeline is closed.
63
74
ErrPipelineShutdown = errors .New ("append pipeline closed" )
75
+ xplod = zs {}
64
76
)
65
77
66
78
// NetworkTransport provides a network based transport that can be
@@ -112,6 +124,12 @@ type NetworkTransport struct {
112
124
msgpackUseNewTimeFormat bool
113
125
}
114
126
127
+ func (n * NetworkTransport ) encoder (to io.Writer ) * codec.Encoder {
128
+ mgsp := & codec.MsgpackHandle {}
129
+ mgsp .TimeNotBuiltin = ! n .msgpackUseNewTimeFormat
130
+ return codec .NewEncoder (to , mgsp )
131
+ }
132
+
115
133
// NetworkTransportConfig encapsulates configuration for the network transport layer.
116
134
type NetworkTransportConfig struct {
117
135
// ServerAddressProvider is used to override the target address when establishing a connection to invoke an RPC
@@ -186,6 +204,7 @@ type StreamLayer interface {
186
204
type netConn struct {
187
205
target ServerAddress
188
206
conn net.Conn
207
+ r * bufio.Reader
189
208
w * bufio.Writer
190
209
dec * codec.Decoder
191
210
enc * codec.Encoder
@@ -403,6 +422,11 @@ func (n *NetworkTransport) getProviderAddressOrFallback(id ServerID, target Serv
403
422
func (n * NetworkTransport ) getConn (target ServerAddress ) (* netConn , error ) {
404
423
// Check for a pooled conn
405
424
if conn := n .getPooledConn (target ); conn != nil {
425
+ // Sanity reset. Discard data that might not have been consumed the last time the connection was used.
426
+ conn .r .Reset (conn .conn )
427
+ conn .w .Reset (conn .conn )
428
+ conn .enc .Reset (conn .w )
429
+ conn .dec .Reset (conn .r )
406
430
return conn , nil
407
431
}
408
432
@@ -411,21 +435,12 @@ func (n *NetworkTransport) getConn(target ServerAddress) (*netConn, error) {
411
435
if err != nil {
412
436
return nil , err
413
437
}
414
-
415
- // Wrap the conn
416
- netConn := & netConn {
417
- target : target ,
418
- conn : conn ,
419
- dec : codec .NewDecoder (bufio .NewReader (conn ), & codec.MsgpackHandle {}),
420
- w : bufio .NewWriterSize (conn , connSendBufferSize ),
421
- }
422
-
423
- mp := & codec.MsgpackHandle {}
424
- mp .TimeNotBuiltin = ! n .msgpackUseNewTimeFormat
425
- netConn .enc = codec .NewEncoder (netConn .w , mp )
426
-
427
- // Done
428
- return netConn , nil
438
+ rBuf := bufio .NewReader (conn )
439
+ wBuf := bufio .NewWriterSize (conn , connSendBufferSize )
440
+ dec := codec .NewDecoder (rBuf , & codec.MsgpackHandle {})
441
+ enc := n .encoder (wBuf )
442
+ res := & netConn {target : target , conn : conn , dec : dec , enc : enc , r : rBuf , w : wBuf }
443
+ return res , nil
429
444
}
430
445
431
446
// returnConn returns a connection back to the pool.
@@ -437,6 +452,10 @@ func (n *NetworkTransport) returnConn(conn *netConn) {
437
452
conns := n .connPool [key ]
438
453
439
454
if ! n .IsShutdown () && len (conns ) < n .maxPool {
455
+ conn .dec .Reset (xplod )
456
+ conn .enc .Reset (xplod )
457
+ conn .w .Reset (xplod )
458
+ conn .r .Reset (xplod )
440
459
n .connPool [key ] = append (conns , conn )
441
460
} else {
442
461
_ = conn .Release ()
@@ -527,7 +546,7 @@ func (n *NetworkTransport) InstallSnapshot(id ServerID, target ServerAddress, ar
527
546
}
528
547
529
548
// Stream the state
530
- if _ , err = io .Copy (conn .w , data ); err != nil {
549
+ if _ , err = io .CopyN (conn .w , data , args . Size ); err != nil {
531
550
return err
532
551
}
533
552
@@ -606,40 +625,55 @@ func (n *NetworkTransport) handleConn(connCtx context.Context, conn net.Conn) {
606
625
r := bufio .NewReaderSize (conn , connReceiveBufferSize )
607
626
w := bufio .NewWriter (conn )
608
627
dec := codec .NewDecoder (r , & codec.MsgpackHandle {})
609
-
610
- mp := & codec.MsgpackHandle {}
611
- mp .TimeNotBuiltin = ! n .msgpackUseNewTimeFormat
612
- enc := codec .NewEncoder (w , mp )
613
-
614
- for {
628
+ enc := n .encoder (w )
629
+ var cmd , prevCmd byte
630
+ var err error
631
+ // Do not re-enter the loop once the conn has been used to install a snapshot.
632
+ // The server side has pretty much always done this.
633
+ for cmd != rpcInstallSnapshot {
634
+ prevCmd = cmd
615
635
select {
616
636
case <- connCtx .Done ():
617
637
n .logger .Debug ("stream layer is closed" )
618
638
return
619
639
default :
620
640
}
621
641
622
- if err : = n .handleCommand (r , dec , enc ); err != nil {
642
+ if cmd , err = n .handleCommand (r , dec , enc ); err != nil {
623
643
if err != io .EOF {
624
- n .logger .Error ("failed to decode incoming command" , "error" , err )
644
+ n .logger .Error ("failed to decode incoming command" , "error" , err , "cmd" , cmd , "prevCmd" , prevCmd )
645
+ }
646
+ if r .Buffered () > 0 {
647
+ unread := r .Buffered ()
648
+ snippet := [100 ]byte {}
649
+ cnt , _ := r .Read (snippet [:])
650
+ dst := make ([]byte , base64 .StdEncoding .EncodedLen (cnt ))
651
+ base64 .StdEncoding .Encode (dst , snippet [:cnt ])
652
+ n .logger .Error ("remaining read buffer" , "sz" , unread , "snippet" , string (dst ), "cmd" , cmd , "prevCmd" , prevCmd )
625
653
}
626
654
return
627
655
}
628
- if err : = w .Flush (); err != nil {
629
- n .logger .Error ("failed to flush response" , "error" , err )
656
+ if err = w .Flush (); err != nil {
657
+ n .logger .Error ("failed to flush response" , "error" , err , "cmd" , cmd , "prevCmd" , prevCmd )
630
658
return
631
659
}
632
660
}
661
+ // If we close conn too soon, the leader may get stuck in an infinite loop
662
+ // attempting to re-install the snapshot we just finished processing. Avert that
663
+ // by waiting a bit for the server-side to close things, and close them ourselves
664
+ // of we wait longer than 5 seconds.
665
+ _ = conn .SetReadDeadline (time .Now ().Add (5 * time .Second ))
666
+ _ , _ = io .Copy (io .Discard , r )
633
667
}
634
668
635
669
// handleCommand is used to decode and dispatch a single command.
636
- func (n * NetworkTransport ) handleCommand (r * bufio.Reader , dec * codec.Decoder , enc * codec.Encoder ) error {
670
+ func (n * NetworkTransport ) handleCommand (r * bufio.Reader , dec * codec.Decoder , enc * codec.Encoder ) ( byte , error ) {
637
671
getTypeStart := time .Now ()
638
672
639
673
// Get the rpc type
640
674
rpcType , err := r .ReadByte ()
641
675
if err != nil {
642
- return err
676
+ return 255 , err
643
677
}
644
678
645
679
// measuring the time to get the first byte separately because the heartbeat conn will hang out here
@@ -655,12 +689,13 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en
655
689
656
690
// Decode the command
657
691
isHeartbeat := false
692
+ wantFlush := false
658
693
var labels []metrics.Label
659
694
switch rpcType {
660
695
case rpcAppendEntries :
661
696
var req AppendEntriesRequest
662
697
if err := dec .Decode (& req ); err != nil {
663
- return err
698
+ return rpcType , err
664
699
}
665
700
rpc .Command = & req
666
701
@@ -684,34 +719,35 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en
684
719
case rpcRequestVote :
685
720
var req RequestVoteRequest
686
721
if err := dec .Decode (& req ); err != nil {
687
- return err
722
+ return rpcType , err
688
723
}
689
724
rpc .Command = & req
690
725
labels = []metrics.Label {{Name : "rpcType" , Value : "RequestVote" }}
691
726
case rpcRequestPreVote :
692
727
var req RequestPreVoteRequest
693
728
if err := dec .Decode (& req ); err != nil {
694
- return err
729
+ return rpcType , err
695
730
}
696
731
rpc .Command = & req
697
732
labels = []metrics.Label {{Name : "rpcType" , Value : "RequestPreVote" }}
698
733
case rpcInstallSnapshot :
699
734
var req InstallSnapshotRequest
700
735
if err := dec .Decode (& req ); err != nil {
701
- return err
736
+ return rpcType , err
702
737
}
703
738
rpc .Command = & req
704
739
rpc .Reader = io .LimitReader (r , req .Size )
740
+ wantFlush = true
705
741
labels = []metrics.Label {{Name : "rpcType" , Value : "InstallSnapshot" }}
706
742
case rpcTimeoutNow :
707
743
var req TimeoutNowRequest
708
744
if err := dec .Decode (& req ); err != nil {
709
- return err
745
+ return rpcType , err
710
746
}
711
747
rpc .Command = & req
712
748
labels = []metrics.Label {{Name : "rpcType" , Value : "TimeoutNow" }}
713
749
default :
714
- return fmt .Errorf ("unknown rpc type %d" , rpcType )
750
+ return rpcType , fmt .Errorf ("unknown rpc type %d" , rpcType )
715
751
}
716
752
717
753
metrics .MeasureSinceWithLabels ([]string {"raft" , "net" , "rpcDecode" }, decodeStart , labels )
@@ -733,7 +769,7 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en
733
769
select {
734
770
case n .consumeCh <- rpc :
735
771
case <- n .shutdownCh :
736
- return ErrTransportShutdown
772
+ return rpcType , ErrTransportShutdown
737
773
}
738
774
739
775
// Wait for response
@@ -744,23 +780,26 @@ RESP:
744
780
select {
745
781
case resp := <- respCh :
746
782
defer metrics .MeasureSinceWithLabels ([]string {"raft" , "net" , "rpcRespond" }, respWaitStart , labels )
783
+ for wantFlush && r .Buffered () > 0 {
784
+ _ , _ = io .CopyN (io .Discard , r , int64 (r .Buffered ()))
785
+ }
747
786
// Send the error first
748
787
respErr := ""
749
788
if resp .Error != nil {
750
789
respErr = resp .Error .Error ()
751
790
}
752
791
if err := enc .Encode (respErr ); err != nil {
753
- return err
792
+ return rpcType , err
754
793
}
755
794
756
795
// Send the response
757
796
if err := enc .Encode (resp .Response ); err != nil {
758
- return err
797
+ return rpcType , err
759
798
}
760
799
case <- n .shutdownCh :
761
- return ErrTransportShutdown
800
+ return rpcType , ErrTransportShutdown
762
801
}
763
- return nil
802
+ return rpcType , nil
764
803
}
765
804
766
805
// decodeResponse is used to decode an RPC response and reports whether
0 commit comments