Skip to content

Commit 0a58572

Browse files
authored
Merge pull request #147 from ydb-platform/repeater-trace
refactoring of repeater package + add trace.Driver.OnRepeaterWakeUp e…
2 parents a1b9255 + 70c3b41 commit 0a58572

File tree

9 files changed

+174
-176
lines changed

9 files changed

+174
-176
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 3.11.13
2+
* Added `trace.Driver.OnRepeaterWakeUp` event
3+
* Refactored package `repeater`
4+
15
## 3.11.12
26
* Added `trace.ClusterInsertDoneInfo.Inserted` boolean flag for notify about success of insert endpoint into balancer
37
* Added `trace.ClusterRemoveDoneInfo.Removed` boolean flag for notify about success of remove endpoint from balancer

internal/db/database.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ func New(
8181
ctx,
8282
db.cluster.GetConn(endpoint.New(c.Endpoint(), endpoint.WithLocalDC(true))),
8383
db.cluster,
84+
db.config.Trace(),
8485
opts...,
8586
)
8687
if err != nil {

internal/discovery/discovery.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ func New(
2424
ctx context.Context,
2525
cc conn.Conn,
2626
crudExplorer cluster.CRUDExplorerLocker,
27+
driverTrace trace.Driver,
2728
opts ...config.Option,
2829
) (_ discovery.Client, err error) {
2930
c := &client{
@@ -57,9 +58,8 @@ func New(
5758
}
5859

5960
crudExplorer.SetExplorer(
60-
repeater.NewRepeater(
61+
repeater.New(
6162
deadline.ContextWithoutDeadline(ctx),
62-
c.config.Interval(),
6363
func(ctx context.Context) (err error) {
6464
next, err = c.Discover(ctx)
6565
if err != nil {
@@ -107,6 +107,9 @@ func New(
107107

108108
return nil
109109
},
110+
repeater.WithInterval(c.config.Interval()),
111+
repeater.WithName("discovery"),
112+
repeater.WithTrace(driverTrace),
110113
),
111114
)
112115

internal/meta/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
package meta
22

33
const (
4-
Version = "ydb-go-sdk/3.11.12"
4+
Version = "ydb-go-sdk/3.11.13"
55
)

internal/repeater/repeat.go

Lines changed: 72 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@ package repeater
22

33
import (
44
"context"
5-
"sync"
65
"time"
76

8-
"github.com/ydb-platform/ydb-go-sdk/v3/testutil/timeutil"
7+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
98
)
109

1110
type Repeater interface {
@@ -19,50 +18,73 @@ type repeater struct {
1918
// Interval must be greater than zero; if not, Repeater will panic.
2019
interval time.Duration
2120

21+
name string
22+
trace trace.Driver
23+
2224
// Task is a function that must be executed periodically.
2325
task func(context.Context) error
2426

25-
timer timeutil.Timer
26-
stopOnce sync.Once
27-
stop chan struct{}
28-
done chan struct{}
29-
ctx context.Context
30-
cancel context.CancelFunc
31-
force chan struct{}
27+
stop chan struct{}
28+
done chan struct{}
29+
force chan struct{}
30+
}
31+
32+
type option func(r *repeater)
33+
34+
func WithName(name string) option {
35+
return func(r *repeater) {
36+
r.name = name
37+
}
38+
}
39+
40+
func WithTrace(trace trace.Driver) option {
41+
return func(r *repeater) {
42+
r.trace = trace
43+
}
44+
}
45+
46+
func WithInterval(interval time.Duration) option {
47+
return func(r *repeater) {
48+
r.interval = interval
49+
}
3250
}
3351

34-
// NewRepeater creates and begins to execute task periodically.
35-
func NewRepeater(
52+
type event string
53+
54+
const (
55+
eventTick = event("tick")
56+
eventForce = event("force")
57+
)
58+
59+
// New creates and begins to execute task periodically.
60+
func New(
3661
ctx context.Context,
37-
interval time.Duration,
3862
task func(ctx context.Context) (err error),
63+
opts ...option,
3964
) Repeater {
40-
if interval <= 0 {
41-
return nil
42-
}
43-
ctx, cancel := context.WithCancel(ctx)
4465
r := &repeater{
45-
interval: interval,
46-
task: task,
47-
timer: timeutil.NewTimer(interval),
48-
stopOnce: sync.Once{},
49-
stop: make(chan struct{}),
50-
done: make(chan struct{}),
51-
ctx: ctx,
52-
cancel: cancel,
53-
force: make(chan struct{}),
66+
task: task,
67+
stop: make(chan struct{}),
68+
done: make(chan struct{}),
69+
force: make(chan struct{}),
70+
}
71+
72+
for _, o := range opts {
73+
o(r)
74+
}
75+
76+
if r.interval <= 0 {
77+
return nil
5478
}
55-
go r.worker()
79+
80+
go r.worker(ctx, r.interval)
81+
5682
return r
5783
}
5884

5985
// Stop stops to execute its task.
6086
func (r *repeater) Stop() {
61-
r.stopOnce.Do(func() {
62-
close(r.stop)
63-
r.cancel()
64-
<-r.done
65-
})
87+
close(r.stop)
6688
}
6789

6890
func (r *repeater) Force() {
@@ -72,30 +94,32 @@ func (r *repeater) Force() {
7294
}
7395
}
7496

75-
func (r *repeater) singleTask() {
76-
if err := r.task(r.ctx); err != nil {
77-
r.timer.Reset(time.Second)
78-
} else {
79-
r.timer.Reset(r.interval)
80-
}
97+
func (r *repeater) wakeUp(ctx context.Context, e event) {
98+
var cancel context.CancelFunc
99+
ctx, cancel = context.WithCancel(ctx)
100+
defer cancel()
101+
102+
trace.DriverOnRepeaterWakeUp(
103+
r.trace,
104+
&ctx,
105+
r.name,
106+
string(e),
107+
)(r.task(ctx))
81108
}
82109

83-
func (r *repeater) worker() {
84-
defer func() {
85-
close(r.done)
86-
}()
87-
r.singleTask()
110+
func (r *repeater) worker(ctx context.Context, interval time.Duration) {
111+
defer close(r.done)
112+
88113
for {
89114
select {
115+
case <-ctx.Done():
116+
return
90117
case <-r.stop:
91118
return
92-
case <-r.timer.C():
93-
r.singleTask()
119+
case <-time.After(interval):
120+
r.wakeUp(ctx, eventTick)
94121
case <-r.force:
95-
if !r.timer.Stop() {
96-
<-r.timer.C()
97-
}
98-
r.timer.Reset(r.interval)
122+
r.wakeUp(ctx, eventForce)
99123
}
100124
}
101125
}

internal/repeater/test/repeat_test.go

Lines changed: 0 additions & 125 deletions
This file was deleted.

log/driver.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,31 @@ func Driver(log Logger, details trace.Details) (t trace.Driver) {
300300
}
301301
}
302302
}
303+
t.OnRepeaterWakeUp = func(info trace.RepeaterTickStartInfo) func(trace.RepeaterTickDoneInfo) {
304+
name := info.Name
305+
event := info.Event
306+
log.Tracef(`repeater wake up {name:"%s",event:"%s"}`,
307+
name,
308+
event,
309+
)
310+
start := time.Now()
311+
return func(info trace.RepeaterTickDoneInfo) {
312+
if info.Error == nil {
313+
log.Tracef(`repeater wake up done {name:"%s",event:"%s",latency:"%v"}`,
314+
name,
315+
event,
316+
time.Since(start),
317+
)
318+
} else {
319+
log.Errorf(`repeater wake up fail {name:"%s",event:"%s",latency:"%v",error:"%v"}`,
320+
name,
321+
event,
322+
time.Since(start),
323+
info.Error,
324+
)
325+
}
326+
}
327+
}
303328
}
304329
if details&trace.DriverClusterEvents != 0 {
305330
// nolint:govet

0 commit comments

Comments
 (0)