Skip to content

Commit aeba81e

Browse files
authored
Merge pull request #890 from lec-bit/big-scale-bug
make xds request send async
2 parents 411615c + 0af61e9 commit aeba81e

File tree

4 files changed

+56
-58
lines changed

4 files changed

+56
-58
lines changed

pkg/controller/ads/ads_controller.go

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
2424
resource_v3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
25+
"istio.io/istio/pkg/channels"
2526

2627
"kmesh.net/kmesh/pkg/logger"
2728
)
@@ -31,8 +32,14 @@ var (
3132
)
3233

3334
type Controller struct {
34-
Stream service_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesClient
3535
Processor *processor
36+
con *connection
37+
}
38+
39+
type connection struct {
40+
Stream service_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesClient
41+
requestsChan *channels.Unbounded[*service_discovery_v3.DiscoveryRequest]
42+
stopCh chan struct{}
3643
}
3744

3845
func NewController() *Controller {
@@ -42,17 +49,26 @@ func NewController() *Controller {
4249
}
4350

4451
func (c *Controller) AdsStreamCreateAndSend(client service_discovery_v3.AggregatedDiscoveryServiceClient, ctx context.Context) error {
45-
var err error
52+
if c.con != nil {
53+
close(c.con.stopCh)
54+
}
4655

47-
c.Stream, err = client.StreamAggregatedResources(ctx)
56+
stream, err := client.StreamAggregatedResources(ctx)
4857
if err != nil {
4958
return fmt.Errorf("StreamAggregatedResources failed, %s", err)
5059
}
5160

61+
c.con = &connection{
62+
Stream: stream,
63+
requestsChan: channels.NewUnbounded[*service_discovery_v3.DiscoveryRequest](),
64+
stopCh: make(chan struct{}),
65+
}
66+
5267
c.Processor.Reset()
53-
if err := c.Stream.Send(newAdsRequest(resource_v3.ClusterType, nil, "")); err != nil {
68+
if err := stream.Send(newAdsRequest(resource_v3.ClusterType, nil, "")); err != nil {
5469
return fmt.Errorf("send request failed, %s", err)
5570
}
71+
go sendUpstream(c.con)
5672

5773
return nil
5874
}
@@ -62,26 +78,39 @@ func (c *Controller) HandleAdsStream() error {
6278
err error
6379
rsp *service_discovery_v3.DiscoveryResponse
6480
)
65-
66-
if rsp, err = c.Stream.Recv(); err != nil {
81+
if rsp, err = c.con.Stream.Recv(); err != nil {
82+
_ = c.con.Stream.CloseSend()
6783
return fmt.Errorf("stream recv failed, %s", err)
6884
}
6985

7086
c.Processor.processAdsResponse(rsp)
71-
defer func() {
87+
c.con.requestsChan.Put(c.Processor.ack)
88+
if c.Processor.req != nil {
89+
c.con.requestsChan.Put(c.Processor.req)
7290
c.Processor.req = nil
73-
c.Processor.ack = nil
74-
}()
75-
76-
if err = c.Stream.Send(c.Processor.ack); err != nil {
77-
return fmt.Errorf("stream send ack failed, %s", err)
7891
}
7992

80-
if c.Processor.req != nil {
81-
if err = c.Stream.Send(c.Processor.req); err != nil {
82-
return fmt.Errorf("stream send rqt failed, %s", err)
93+
return nil
94+
}
95+
96+
func sendUpstream(con *connection) {
97+
for {
98+
select {
99+
case req := <-con.requestsChan.Get():
100+
con.requestsChan.Load()
101+
if err := con.Stream.Send(req); err != nil {
102+
log.Errorf("send error for type url %s: %v", req.TypeUrl, err)
103+
return
104+
}
105+
case <-con.stopCh:
106+
return
83107
}
84108
}
109+
}
85110

86-
return nil
111+
func (c *Controller) Close() {
112+
if c.con != nil {
113+
close(c.con.stopCh)
114+
_ = c.con.Stream.CloseSend()
115+
}
87116
}

pkg/controller/ads/ads_controller_test.go

Lines changed: 8 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ import (
2525
"github.com/agiledragon/gomonkey/v2"
2626
config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
2727
discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
28+
service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
2829
resource_v3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
2930
"google.golang.org/grpc"
3031
"google.golang.org/protobuf/types/known/anypb"
32+
"istio.io/istio/pkg/channels"
3133

3234
"kmesh.net/kmesh/pkg/controller/xdstest"
3335
)
@@ -43,7 +45,7 @@ func TestAdsStreamAdsStreamCreateAndSend(t *testing.T) {
4345
defer client.Cleanup()
4446

4547
adsStream := Controller{
46-
Stream: client.AdsClient,
48+
con: &connection{Stream: client.AdsClient, stopCh: make(chan struct{})},
4749
Processor: nil,
4850
}
4951

@@ -62,7 +64,7 @@ func TestAdsStreamAdsStreamCreateAndSend(t *testing.T) {
6264
func(_ discoveryv3.AggregatedDiscoveryServiceClient, ctx context.Context, opts ...grpc.CallOption) (discoveryv3.AggregatedDiscoveryService_StreamAggregatedResourcesClient, error) {
6365
return client.AdsClient, nil
6466
})
65-
patches2.ApplyMethod(reflect.TypeOf(adsStream.Stream), "Send",
67+
patches2.ApplyMethod(reflect.TypeOf(adsStream.con.Stream), "Send",
6668
func(_ discoveryv3.AggregatedDiscoveryService_StreamAggregatedResourcesClient, req *discoveryv3.DiscoveryRequest) error {
6769
return errors.New("timeout")
6870
})
@@ -116,7 +118,7 @@ func TestHandleAdsStream(t *testing.T) {
116118
defer fakeClient.Cleanup()
117119

118120
adsStream := NewController()
119-
adsStream.Stream = fakeClient.AdsClient
121+
adsStream.con = &connection{Stream: fakeClient.AdsClient, requestsChan: channels.NewUnbounded[*service_discovery_v3.DiscoveryRequest](), stopCh: make(chan struct{})}
120122

121123
patches1 := gomonkey.NewPatches()
122124
patches2 := gomonkey.NewPatches()
@@ -129,7 +131,7 @@ func TestHandleAdsStream(t *testing.T) {
129131
{
130132
name: "test1: stream Revc failed, should return error",
131133
beforeFunc: func() {
132-
patches1.ApplyMethod(reflect.TypeOf(adsStream.Stream), "Recv",
134+
patches1.ApplyMethod(reflect.TypeOf(adsStream.con.Stream), "Recv",
133135
func(_ discoveryv3.AggregatedDiscoveryService_StreamAggregatedResourcesClient) (*discoveryv3.DiscoveryResponse, error) {
134136
return nil, errors.New("failed to recv message")
135137
})
@@ -140,9 +142,9 @@ func TestHandleAdsStream(t *testing.T) {
140142
wantErr: true,
141143
},
142144
{
143-
name: "test2: stream Send failed, should return error",
145+
name: "test2: handle success, should return nil",
144146
beforeFunc: func() {
145-
patches1.ApplyMethod(reflect.TypeOf(adsStream.Stream), "Recv",
147+
patches1.ApplyMethod(reflect.TypeOf(adsStream.con.Stream), "Recv",
146148
func(_ discoveryv3.AggregatedDiscoveryService_StreamAggregatedResourcesClient) (*discoveryv3.DiscoveryResponse, error) {
147149
// create resource of rsq
148150
cluster := &config_cluster_v3.Cluster{
@@ -156,38 +158,6 @@ func TestHandleAdsStream(t *testing.T) {
156158
},
157159
}, nil
158160
})
159-
patches2.ApplyMethod(reflect.TypeOf(adsStream.Stream), "Send",
160-
func(_ discoveryv3.AggregatedDiscoveryService_StreamAggregatedResourcesClient) error {
161-
return errors.New("failed to send message")
162-
})
163-
},
164-
afterFunc: func() {
165-
patches1.Reset()
166-
patches2.Reset()
167-
},
168-
wantErr: true,
169-
},
170-
{
171-
name: "test3: handle success, should return nil",
172-
beforeFunc: func() {
173-
patches1.ApplyMethod(reflect.TypeOf(adsStream.Stream), "Recv",
174-
func(_ discoveryv3.AggregatedDiscoveryService_StreamAggregatedResourcesClient) (*discoveryv3.DiscoveryResponse, error) {
175-
// create resource of rsq
176-
cluster := &config_cluster_v3.Cluster{
177-
Name: "ut-cluster",
178-
}
179-
anyCluster, _ := anypb.New(cluster)
180-
return &discoveryv3.DiscoveryResponse{
181-
TypeUrl: resource_v3.ClusterType,
182-
Resources: []*anypb.Any{
183-
anyCluster,
184-
},
185-
}, nil
186-
})
187-
patches2.ApplyMethod(reflect.TypeOf(adsStream.Stream), "Send",
188-
func(_ discoveryv3.AggregatedDiscoveryService_StreamAggregatedResourcesClient) error {
189-
return nil
190-
})
191161
},
192162
afterFunc: func() {
193163
patches1.Reset()

pkg/controller/client.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,12 @@ func (c *XdsClient) handleUpstream(ctx context.Context) {
124124

125125
if c.mode == constants.AdsMode {
126126
if err = c.AdsController.HandleAdsStream(); err != nil {
127-
_ = c.AdsController.Stream.CloseSend()
128127
_ = c.grpcConn.Close()
129128
reconnect = true
130129
continue
131130
}
132131
} else if c.mode == constants.WorkloadMode {
133132
if err = c.WorkloadController.HandleWorkloadStream(); err != nil {
134-
_ = c.WorkloadController.Stream.CloseSend()
135133
_ = c.grpcConn.Close()
136134
reconnect = true
137135
continue
@@ -164,8 +162,8 @@ func (c *XdsClient) Run(stopCh <-chan struct{}) error {
164162
}
165163

166164
func (c *XdsClient) closeStreamClient() {
167-
if c.AdsController != nil && c.AdsController.Stream != nil {
168-
_ = c.AdsController.Stream.CloseSend()
165+
if c.AdsController != nil {
166+
c.AdsController.Close()
169167
}
170168
if c.WorkloadController != nil && c.WorkloadController.Stream != nil {
171169
_ = c.WorkloadController.Stream.CloseSend()

pkg/controller/workload/workload_controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ func (c *Controller) HandleWorkloadStream() error {
110110
)
111111

112112
if rspDelta, err = c.Stream.Recv(); err != nil {
113+
_ = c.Stream.CloseSend()
113114
return fmt.Errorf("stream recv failed, %s", err)
114115
}
115116

0 commit comments

Comments
 (0)