@@ -16,6 +16,7 @@ use tonic::Request;
16
16
use super :: timestamp:: TimestampOracle ;
17
17
use crate :: internal_err;
18
18
use crate :: proto:: pdpb;
19
+ use crate :: Config ;
19
20
use crate :: Result ;
20
21
use crate :: SecurityManager ;
21
22
use crate :: Timestamp ;
@@ -103,13 +104,9 @@ impl Connection {
103
104
Connection { security_mgr }
104
105
}
105
106
106
- pub async fn connect_cluster (
107
- & self ,
108
- endpoints : & [ String ] ,
109
- timeout : Duration ,
110
- ) -> Result < Cluster > {
111
- let members = self . validate_endpoints ( endpoints, timeout) . await ?;
112
- let ( client, members) = self . try_connect_leader ( & members, timeout) . await ?;
107
+ pub async fn connect_cluster ( & self , endpoints : & [ String ] , config : & Config ) -> Result < Cluster > {
108
+ let members = self . validate_endpoints ( endpoints, config) . await ?;
109
+ let ( client, members) = self . try_connect_leader ( & members, config) . await ?;
113
110
let id = members. header . as_ref ( ) . unwrap ( ) . cluster_id ;
114
111
let tso = TimestampOracle :: new ( id, & client) ?;
115
112
let cluster = Cluster {
@@ -122,10 +119,10 @@ impl Connection {
122
119
}
123
120
124
121
// Re-establish connection with PD leader in asynchronous fashion.
125
- pub async fn reconnect ( & self , cluster : & mut Cluster , timeout : Duration ) -> Result < ( ) > {
122
+ pub async fn reconnect ( & self , cluster : & mut Cluster , config : & Config ) -> Result < ( ) > {
126
123
warn ! ( "updating pd client" ) ;
127
124
let start = Instant :: now ( ) ;
128
- let ( client, members) = self . try_connect_leader ( & cluster. members , timeout ) . await ?;
125
+ let ( client, members) = self . try_connect_leader ( & cluster. members , config ) . await ?;
129
126
let tso = TimestampOracle :: new ( cluster. id , & client) ?;
130
127
* cluster = Cluster {
131
128
id : cluster. id ,
@@ -141,7 +138,7 @@ impl Connection {
141
138
async fn validate_endpoints (
142
139
& self ,
143
140
endpoints : & [ String ] ,
144
- timeout : Duration ,
141
+ config : & Config ,
145
142
) -> Result < pdpb:: GetMembersResponse > {
146
143
let mut endpoints_set = HashSet :: with_capacity ( endpoints. len ( ) ) ;
147
144
@@ -152,7 +149,7 @@ impl Connection {
152
149
return Err ( internal_err ! ( "duplicated PD endpoint {}" , ep) ) ;
153
150
}
154
151
155
- let ( _, resp) = match self . connect ( ep, timeout ) . await {
152
+ let ( _, resp) = match self . connect ( ep, config ) . await {
156
153
Ok ( resp) => resp,
157
154
// Ignore failed PD node.
158
155
Err ( e) => {
@@ -193,11 +190,11 @@ impl Connection {
193
190
async fn connect (
194
191
& self ,
195
192
addr : & str ,
196
- _timeout : Duration ,
193
+ config : & Config ,
197
194
) -> Result < ( pdpb:: pd_client:: PdClient < Channel > , pdpb:: GetMembersResponse ) > {
198
195
let mut client = self
199
196
. security_mgr
200
- . connect ( addr, pdpb:: pd_client:: PdClient :: < Channel > :: new)
197
+ . connect ( addr, pdpb:: pd_client:: PdClient :: < Channel > :: new, config )
201
198
. await ?;
202
199
let resp: pdpb:: GetMembersResponse = client
203
200
. get_members ( pdpb:: GetMembersRequest :: default ( ) )
@@ -210,9 +207,9 @@ impl Connection {
210
207
& self ,
211
208
addr : & str ,
212
209
cluster_id : u64 ,
213
- timeout : Duration ,
210
+ config : & Config ,
214
211
) -> Result < ( pdpb:: pd_client:: PdClient < Channel > , pdpb:: GetMembersResponse ) > {
215
- let ( client, r) = self . connect ( addr, timeout ) . await ?;
212
+ let ( client, r) = self . connect ( addr, config ) . await ?;
216
213
Connection :: validate_cluster_id ( addr, & r, cluster_id) ?;
217
214
Ok ( ( client, r) )
218
215
}
@@ -238,7 +235,7 @@ impl Connection {
238
235
async fn try_connect_leader (
239
236
& self ,
240
237
previous : & pdpb:: GetMembersResponse ,
241
- timeout : Duration ,
238
+ config : & Config ,
242
239
) -> Result < ( pdpb:: pd_client:: PdClient < Channel > , pdpb:: GetMembersResponse ) > {
243
240
let previous_leader = previous. leader . as_ref ( ) . unwrap ( ) ;
244
241
let members = & previous. members ;
@@ -252,7 +249,7 @@ impl Connection {
252
249
. chain ( Some ( previous_leader) )
253
250
{
254
251
for ep in & m. client_urls {
255
- match self . try_connect ( ep. as_str ( ) , cluster_id, timeout ) . await {
252
+ match self . try_connect ( ep. as_str ( ) , cluster_id, config ) . await {
256
253
Ok ( ( _, r) ) => {
257
254
resp = Some ( r) ;
258
255
break ' outer;
@@ -269,7 +266,7 @@ impl Connection {
269
266
if let Some ( resp) = resp {
270
267
let leader = resp. leader . as_ref ( ) . unwrap ( ) ;
271
268
for ep in & leader. client_urls {
272
- let r = self . try_connect ( ep. as_str ( ) , cluster_id, timeout ) . await ;
269
+ let r = self . try_connect ( ep. as_str ( ) , cluster_id, config ) . await ;
273
270
if r. is_ok ( ) {
274
271
return r;
275
272
}
0 commit comments