@@ -79,19 +79,21 @@ func (c *cluster) SetExplorer(repeater repeater.Repeater) {
79
79
}
80
80
81
81
type crudOptionsHolder struct {
82
- locked bool
82
+ withLock bool
83
83
}
84
84
85
85
type crudOption func (h * crudOptionsHolder )
86
86
87
87
func WithoutLock () crudOption {
88
88
return func (h * crudOptionsHolder ) {
89
- h .locked = true
89
+ h .withLock = false
90
90
}
91
91
}
92
92
93
93
func parseOptions (opts ... crudOption ) * crudOptionsHolder {
94
- h := & crudOptionsHolder {}
94
+ h := & crudOptionsHolder {
95
+ withLock : true ,
96
+ }
95
97
for _ , o := range opts {
96
98
o (h )
97
99
}
@@ -112,10 +114,6 @@ type CRUD interface {
112
114
Get (ctx context.Context , opts ... crudOption ) (cc conn.Conn , err error )
113
115
}
114
116
115
- type Pessimizer interface {
116
- Pessimize (ctx context.Context , endpoint endpoint.Endpoint ) error
117
- }
118
-
119
117
type Explorer interface {
120
118
SetExplorer (repeater repeater.Repeater )
121
119
Force ()
@@ -135,7 +133,6 @@ type CRUDExplorerLocker interface {
135
133
type Cluster interface {
136
134
closer.Closer
137
135
CRUD
138
- Pessimizer
139
136
Explorer
140
137
Locker
141
138
conn.PoolGetter
@@ -151,13 +148,57 @@ func New(
151
148
onDone ()
152
149
}()
153
150
154
- return & cluster {
151
+ c := & cluster {
155
152
config : config ,
156
- pool : conn .NewPool (ctx , config ),
157
153
index : make (map [string ]entry.Entry ),
158
154
endpoints : make (map [uint32 ]conn.Conn ),
159
155
balancer : balancer ,
160
156
}
157
+
158
+ c .pool = conn .NewPool (
159
+ ctx ,
160
+ config ,
161
+ func (e endpoint.Endpoint ) {
162
+ c .mu .RLock ()
163
+ defer c .mu .RUnlock ()
164
+
165
+ if c .closed {
166
+ return
167
+ }
168
+
169
+ entry , has := c .index [e .Address ()]
170
+ if ! has {
171
+ return
172
+ }
173
+
174
+ if entry .Handle == nil {
175
+ return
176
+ }
177
+
178
+ if ! c .balancer .Contains (entry .Handle ) {
179
+ return
180
+ }
181
+
182
+ if c .explorer == nil {
183
+ return
184
+ }
185
+
186
+ // count ratio (banned/all)
187
+ online := 0
188
+ for _ , entry = range c .index {
189
+ if entry .Conn != nil && entry .Conn .GetState () == conn .Online {
190
+ online ++
191
+ }
192
+ }
193
+
194
+ // more than half connections banned - re-discover now
195
+ if online * 2 < len (c .index ) {
196
+ c .explorer .Force ()
197
+ }
198
+ },
199
+ )
200
+
201
+ return c
161
202
}
162
203
163
204
func (c * cluster ) Close (ctx context.Context ) (err error ) {
@@ -245,7 +286,7 @@ func (c *cluster) Insert(ctx context.Context, e endpoint.Endpoint, opts ...crudO
245
286
}()
246
287
247
288
options := parseOptions (opts ... )
248
- if ! options .locked {
289
+ if options .withLock {
249
290
c .mu .Lock ()
250
291
defer c .mu .Unlock ()
251
292
}
@@ -288,7 +329,7 @@ func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint, opts ...crudO
288
329
}()
289
330
290
331
options := parseOptions (opts ... )
291
- if ! options .locked {
332
+ if options .withLock {
292
333
c .mu .Lock ()
293
334
defer c .mu .Unlock ()
294
335
}
@@ -333,20 +374,17 @@ func (c *cluster) Remove(ctx context.Context, e endpoint.Endpoint, opts ...crudO
333
374
}()
334
375
335
376
options := parseOptions (opts ... )
336
- if ! options .locked {
377
+ if options .withLock {
337
378
c .mu .Lock ()
379
+ defer c .mu .Unlock ()
338
380
}
339
381
340
382
if c .closed {
341
- if ! options .locked {
342
- c .mu .Unlock ()
343
- }
344
383
return
345
384
}
346
385
347
386
entry , has := c .index [e .Address ()]
348
387
if ! has {
349
- c .mu .Unlock ()
350
388
panic ("ydb: can't remove not-existing endpoint" )
351
389
}
352
390
@@ -355,10 +393,6 @@ func (c *cluster) Remove(ctx context.Context, e endpoint.Endpoint, opts ...crudO
355
393
delete (c .index , e .Address ())
356
394
delete (c .endpoints , e .NodeID ())
357
395
358
- if ! options .locked {
359
- c .mu .Unlock ()
360
- }
361
-
362
396
if entry .Conn != nil {
363
397
// entry.Conn may be nil when connection is being tracked after unsuccessful dial().
364
398
_ = entry .Conn .Close (ctx )
@@ -367,40 +401,6 @@ func (c *cluster) Remove(ctx context.Context, e endpoint.Endpoint, opts ...crudO
367
401
return entry .Conn
368
402
}
369
403
370
- func (c * cluster ) Pessimize (ctx context.Context , e endpoint.Endpoint ) (err error ) {
371
- c .mu .RLock ()
372
- defer c .mu .RUnlock ()
373
- if c .closed {
374
- return errors .Errorf (0 , "cluster: pessimize failed: %w" , ErrClusterClosed )
375
- }
376
-
377
- entry , has := c .index [e .Address ()]
378
- if ! has {
379
- return errors .Errorf (0 , "cluster: pessimize failed: %w" , ErrUnknownEndpoint )
380
- }
381
- if entry .Handle == nil {
382
- return errors .Errorf (0 , "cluster: pessimize failed: %w" , ErrNilBalancerElement )
383
- }
384
- if ! c .balancer .Contains (entry .Handle ) {
385
- return errors .Errorf (0 , "cluster: pessimize failed: %w" , ErrUnknownBalancerElement )
386
- }
387
- entry .Conn .SetState (conn .Banned )
388
- if c .explorer != nil {
389
- // count ratio (banned/all)
390
- online := 0
391
- for _ , entry = range c .index {
392
- if entry .Conn != nil && entry .Conn .GetState () == conn .Online {
393
- online ++
394
- }
395
- }
396
- // more than half connections banned - re-discover now
397
- if online * 2 < len (c .index ) {
398
- c .explorer .Force ()
399
- }
400
- }
401
- return err
402
- }
403
-
404
404
func compareEndpoints (a , b endpoint.Endpoint ) int {
405
405
return strings .Compare (a .Address (), b .Address ())
406
406
}
0 commit comments