Skip to content

Commit 8060252

Browse files
authored
Merge pull request #138 from ydb-platform/endpoint-info
## 3.11.8
2 parents 950ce16 + 3d095fa commit 8060252

File tree

20 files changed

+370
-240
lines changed

20 files changed

+370
-240
lines changed

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
## 3.11.8
2+
* Added `trace.EndpointInfo.LastUpdated()` timestamp
3+
* Refactored `endpoint.Endpoint` (split to struct `endopint` and interface `Endpoint`)
4+
* Returned safe-thread copy of `endpoint.Endpoint` to trace callbacks
5+
* Added `endpoint.Endpoint.Touch()` func for refresh endpoint info
6+
* Added `conn.conn.onClose` slice for call optional funcs on close step
7+
* Added removing `conn.Conn` from `conn.Pool` on `conn.Conn.Close()` call
8+
* Checked cluster close/empty on keeper goroutine
9+
* Fixed `internal.errors.New` wrapping depth
10+
111
## 3.11.7
212
* Removed internal alias-type `errors.IssuesIterator`
313

internal/cluster/cluster.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/ydb-platform/ydb-go-sdk/v3/internal/cluster/entry"
1616
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
1717
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
18-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint/info"
1918
"github.com/ydb-platform/ydb-go-sdk/v3/internal/errors"
2019
"github.com/ydb-platform/ydb-go-sdk/v3/internal/repeater"
2120
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
@@ -37,8 +36,10 @@ var (
3736

3837
// ErrNilBalancerElement returned when requested on a nil Balancer element.
3938
ErrNilBalancerElement = errors.New("nil balancer element")
39+
4040
// ErrUnknownBalancerElement returned when requested on a unknown Balancer element.
4141
ErrUnknownBalancerElement = errors.New("unknown balancer element")
42+
4243
// ErrUnknownTypeOfBalancerElement returned when requested on a unknown types of Balancer element.
4344
ErrUnknownTypeOfBalancerElement = errors.New("unknown types of balancer element")
4445
)
@@ -210,7 +211,7 @@ func (c *cluster) Get(ctx context.Context, opts ...crudOption) (cc conn.Conn, er
210211
if err != nil {
211212
onDone(nil, err)
212213
} else {
213-
onDone(cc.Endpoint(), nil)
214+
onDone(cc.Endpoint().Copy(), nil)
214215
}
215216
}()
216217

@@ -235,7 +236,7 @@ func (c *cluster) Get(ctx context.Context, opts ...crudOption) (cc conn.Conn, er
235236

236237
// Insert inserts new connection into the cluster.
237238
func (c *cluster) Insert(ctx context.Context, e endpoint.Endpoint, opts ...crudOption) (cc conn.Conn) {
238-
onDone := trace.DriverOnClusterInsert(c.config.Trace(), &ctx, e)
239+
onDone := trace.DriverOnClusterInsert(c.config.Trace(), &ctx, e.Copy())
239240
defer func() {
240241
if cc != nil {
241242
onDone(cc.GetState())
@@ -261,16 +262,14 @@ func (c *cluster) Insert(ctx context.Context, e endpoint.Endpoint, opts ...crudO
261262
panic("ydb: can't insert already existing endpoint")
262263
}
263264

264-
var wait chan struct{}
265-
defer func() {
266-
if wait != nil {
267-
close(wait)
268-
}
269-
}()
265+
cc.Endpoint().Touch()
270266

271267
entry := entry.Entry{Conn: cc}
268+
272269
entry.InsertInto(c.balancer)
270+
273271
c.index[e.Address()] = entry
272+
274273
if e.NodeID() > 0 {
275274
c.endpoints[e.NodeID()] = cc
276275
}
@@ -280,7 +279,7 @@ func (c *cluster) Insert(ctx context.Context, e endpoint.Endpoint, opts ...crudO
280279

281280
// Update updates existing connection's runtime stats such that load factor and others.
282281
func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint, opts ...crudOption) (cc conn.Conn) {
283-
onDone := trace.DriverOnClusterUpdate(c.config.Trace(), &ctx, e)
282+
onDone := trace.DriverOnClusterUpdate(c.config.Trace(), &ctx, e.Copy())
284283
defer func() {
285284
if cc != nil {
286285
onDone(cc.GetState())
@@ -307,6 +306,8 @@ func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint, opts ...crudO
307306
panic("ydb: cluster entry with nil conn")
308307
}
309308

309+
entry.Conn.Endpoint().Touch()
310+
310311
delete(c.endpoints, e.NodeID())
311312
c.index[e.Address()] = entry
312313

@@ -316,15 +317,15 @@ func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint, opts ...crudO
316317

317318
if entry.Handle != nil {
318319
// entry.Handle may be nil when connection is being tracked.
319-
c.balancer.Update(entry.Handle, info.Info{})
320+
c.balancer.Update(entry.Handle, e.Info())
320321
}
321322

322323
return entry.Conn
323324
}
324325

325326
// Remove removes and closes previously inserted connection.
326327
func (c *cluster) Remove(ctx context.Context, e endpoint.Endpoint, opts ...crudOption) (cc conn.Conn) {
327-
onDone := trace.DriverOnClusterRemove(c.config.Trace(), &ctx, e)
328+
onDone := trace.DriverOnClusterRemove(c.config.Trace(), &ctx, e.Copy())
328329
defer func() {
329330
if cc != nil {
330331
onDone(cc.GetState())

internal/conn/conn.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type conn struct {
4545
state State
4646
locks int32
4747
ttl timeutil.Timer
48+
onClose []func(Conn)
4849
}
4950

5051
func (c *conn) IsState(states ...State) bool {
@@ -79,7 +80,7 @@ func (c *conn) Endpoint() endpoint.Endpoint {
7980
if c != nil {
8081
return c.endpoint
8182
}
82-
return endpoint.Endpoint{}
83+
return nil
8384
}
8485

8586
func (c *conn) SetState(ctx context.Context, s State) State {
@@ -92,7 +93,7 @@ func (c *conn) setState(ctx context.Context, s State) State {
9293
onDone := trace.DriverOnConnStateChange(
9394
trace.ContextDriver(ctx).Compose(c.config.Trace()),
9495
&ctx,
95-
c.endpoint,
96+
c.endpoint.Copy(),
9697
c.state,
9798
)
9899
c.state = s
@@ -123,7 +124,7 @@ func (c *conn) take(ctx context.Context) (cc *grpc.ClientConn, err error) {
123124
onDone := trace.DriverOnConnTake(
124125
trace.ContextDriver(ctx).Compose(c.config.Trace()),
125126
&ctx,
126-
c.endpoint,
127+
c.endpoint.Copy(),
127128
)
128129
defer func() {
129130
onDone(int(atomic.LoadInt32(&c.locks)), err)
@@ -160,7 +161,7 @@ func (c *conn) release(ctx context.Context) {
160161
onDone := trace.DriverOnConnRelease(
161162
trace.ContextDriver(ctx).Compose(c.config.Trace()),
162163
&ctx,
163-
c.endpoint,
164+
c.endpoint.Copy(),
164165
)
165166
atomic.AddInt32(&c.locks, -1)
166167
onDone(int(atomic.LoadInt32(&c.locks)))
@@ -200,6 +201,9 @@ func (c *conn) Close(ctx context.Context) (err error) {
200201
c.closed = true
201202
err = c.close(ctx)
202203
c.setState(ctx, Destroyed)
204+
for _, f := range c.onClose {
205+
f(c)
206+
}
203207
return err
204208
}
205209

@@ -210,7 +214,7 @@ func (c *conn) pessimize(ctx context.Context, err error) {
210214
trace.DriverOnPessimizeNode(
211215
trace.ContextDriver(ctx).Compose(c.config.Trace()),
212216
&ctx,
213-
c.endpoint,
217+
c.endpoint.Copy(),
214218
c.GetState(),
215219
err,
216220
)(c.SetState(ctx, Banned))
@@ -249,7 +253,7 @@ func (c *conn) Invoke(
249253
onDone := trace.DriverOnConnInvoke(
250254
trace.ContextDriver(ctx).Compose(c.config.Trace()),
251255
&ctx,
252-
c.endpoint,
256+
c.endpoint.Copy(),
253257
trace.Method(method),
254258
)
255259
defer func() {
@@ -316,7 +320,7 @@ func (c *conn) NewStream(
316320
streamRecv := trace.DriverOnConnNewStream(
317321
trace.ContextDriver(ctx).Compose(c.config.Trace()),
318322
&ctx,
319-
c.endpoint,
323+
c.endpoint.Copy(),
320324
trace.Method(method),
321325
)
322326
defer func() {
@@ -347,13 +351,24 @@ func (c *conn) NewStream(
347351
}, nil
348352
}
349353

350-
func New(endpoint endpoint.Endpoint, config Config) Conn {
354+
type option func(c *conn)
355+
356+
func withOnClose(onClose func(Conn)) option {
357+
return func(c *conn) {
358+
c.onClose = append(c.onClose, onClose)
359+
}
360+
}
361+
362+
func New(endpoint endpoint.Endpoint, config Config, opts ...option) Conn {
351363
c := &conn{
352364
state: Created,
353365
endpoint: endpoint,
354366
config: config,
355367
done: make(chan struct{}),
356368
}
369+
for _, o := range opts {
370+
o(c)
371+
}
357372
if ttl := config.ConnectionTTL(); ttl > 0 {
358373
c.ttl = timeutil.NewTimer(ttl)
359374
}

internal/conn/pool.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,11 @@ func (p *pool) GetConn(e endpoint.Endpoint) Conn {
6161
if cc, ok := p.conns[e.Address()]; ok {
6262
return cc
6363
}
64-
cc := New(e, p.config)
64+
cc := New(e, p.config, withOnClose(func(c Conn) {
65+
p.mtx.Lock()
66+
defer p.mtx.Unlock()
67+
delete(p.conns, c.Endpoint().Address())
68+
}))
6569
p.conns[e.Address()] = cc
6670
return cc
6771
}
@@ -70,14 +74,17 @@ func (p *pool) Close(ctx context.Context) error {
7074
close(p.done)
7175

7276
p.mtx.Lock()
73-
defer p.mtx.Unlock()
77+
conns := make([]Conn, 0, len(p.conns))
78+
for _, c := range p.conns {
79+
conns = append(conns, c)
80+
}
81+
p.mtx.Unlock()
7482

7583
var issues []error
76-
for a, c := range p.conns {
84+
for _, c := range conns {
7785
if err := c.Close(ctx); err != nil {
7886
issues = append(issues, err)
7987
}
80-
delete(p.conns, a)
8188
}
8289

8390
if len(issues) > 0 {

internal/discovery/discovery.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,9 @@ func (d *client) Discover(ctx context.Context) (endpoints []endpoint.Endpoint, e
123123

124124
var location string
125125
defer func() {
126-
nodes := make([]string, 0)
126+
nodes := make([]trace.EndpointInfo, 0, len(endpoints))
127127
for _, e := range endpoints {
128-
nodes = append(nodes, e.Address())
128+
nodes = append(nodes, e.Copy())
129129
}
130130
onDone(location, nodes, err)
131131
}()

0 commit comments

Comments
 (0)