Skip to content

Commit 25482c0

Browse files
committed
feat(client): blocking LineSenderPool
1 parent f7f593f commit 25482c0

File tree

3 files changed

+206
-71
lines changed

3 files changed

+206
-71
lines changed

README.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,15 @@ To connect via TCP, set the configuration string to:
8080
**Warning: Experimental feature designed for use with HTTP senders ONLY**
8181

8282
Version 3 of the client introduces a `LineSenderPool`, which provides a mechanism
83-
to cache previously-used `LineSender`s in memory so they can be reused without
84-
having to allocate and instantiate new senders.
83+
to pool previously-used `LineSender`s so they can be reused without having
84+
to allocate and instantiate new senders.
8585

86-
A LineSenderPool is thread-safe and can be used to concurrently Acquire and Release senders
86+
A LineSenderPool is thread-safe and can be used to concurrently obtain senders
8787
across multiple goroutines.
8888

8989
Since `LineSender`s must be used in a single-threaded context, a typical pattern is to Acquire
9090
a sender from a `LineSenderPool` at the beginning of a goroutine and use a deferred
91-
execution block to Release the sender at the end of the goroutine.
91+
execution block to Close the sender at the end of the goroutine.
9292

9393
Here is an example of the `LineSenderPool` Acquire, Release, and Close semantics:
9494

@@ -112,7 +112,7 @@ func main() {
112112
}
113113
}()
114114

115-
sender, err := pool.Acquire(ctx)
115+
sender, err := pool.Sender(ctx)
116116
if err != nil {
117117
panic(err)
118118
}
@@ -122,7 +122,8 @@ func main() {
122122
Float64Column("price", 123.45).
123123
AtNow(ctx)
124124

125-
if err := pool.Release(ctx, sender); err != nil {
125+
// Close call returns the sender back to the pool
126+
if err := sender.Close(ctx); err != nil {
126127
panic(err)
127128
}
128129
}

sender_pool.go

Lines changed: 144 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@ import (
2828
"context"
2929
"errors"
3030
"fmt"
31+
"math/big"
3132
"strings"
3233
"sync"
34+
"sync/atomic"
35+
"time"
3336
)
3437

3538
// LineSenderPool wraps a mutex-protected slice of [LineSender]. It allows a goroutine to
@@ -38,12 +41,21 @@ import (
3841
// WARNING: This is an experimental API that is designed to work with HTTP senders ONLY.
3942
type LineSenderPool struct {
4043
maxSenders int
44+
numSenders int // number of used and free senders
4145
conf string
4246

4347
closed bool
4448

45-
senders []LineSender
46-
mu *sync.Mutex
49+
freeSenders []*pooledSender
50+
mu *sync.Mutex
51+
cond sync.Cond // used to wake up free sender waiters
52+
}
53+
54+
type pooledSender struct {
55+
pool *LineSenderPool
56+
wrapped LineSender
57+
dirty bool // set to true if any of the sender calls returned an error
58+
tick uint64 // even values stand for free sender, odd values mean in-use sender
4759
}
4860

4961
// LineSenderPoolOption defines line sender pool config option.
@@ -61,11 +73,12 @@ func PoolFromConf(conf string, opts ...LineSenderPoolOption) (*LineSenderPool, e
6173
}
6274

6375
pool := &LineSenderPool{
64-
maxSenders: 64,
65-
conf: conf,
66-
senders: []LineSender{},
67-
mu: &sync.Mutex{},
76+
maxSenders: 64,
77+
conf: conf,
78+
freeSenders: make([]*pooledSender, 0, 64),
79+
mu: &sync.Mutex{},
6880
}
81+
pool.cond = *sync.NewCond(pool.mu)
6982

7083
for _, opt := range opts {
7184
opt(pool)
@@ -82,50 +95,71 @@ func WithMaxSenders(count int) LineSenderPoolOption {
8295
}
8396
}
8497

85-
// Acquire obtains a LineSender from the pool. If the pool is empty, a new
98+
// Sender obtains a LineSender from the pool. If the pool is empty, a new
8699
// LineSender will be instantiated using the pool's config string.
87-
func (p *LineSenderPool) Acquire(ctx context.Context) (LineSender, error) {
100+
// If there is already maximum number of senders obtained from the pool,
101+
// this calls will block until one of the senders is returned back to
102+
// the pool by calling sender.Close().
103+
func (p *LineSenderPool) Sender(ctx context.Context) (LineSender, error) {
88104
p.mu.Lock()
89105
defer p.mu.Unlock()
90106

91107
if p.closed {
92-
return nil, fmt.Errorf("cannot Acquire a LineSender from a closed LineSenderPool")
108+
return nil, errors.New("cannot Acquire a LineSender from a closed LineSenderPool")
109+
}
110+
111+
// We may have to wait for a free sender
112+
for len(p.freeSenders) == 0 && p.numSenders == p.maxSenders {
113+
p.cond.Wait()
93114
}
94115

95-
if len(p.senders) > 0 {
116+
if len(p.freeSenders) > 0 {
96117
// Pop sender off the slice and return it
97-
s := p.senders[len(p.senders)-1]
98-
p.senders = p.senders[0 : len(p.senders)-1]
118+
s := p.freeSenders[len(p.freeSenders)-1]
119+
atomic.AddUint64(&s.tick, 1)
120+
p.freeSenders = p.freeSenders[0 : len(p.freeSenders)-1]
99121
return s, nil
100122
}
101123

102-
return LineSenderFromConf(ctx, p.conf)
124+
s, err := LineSenderFromConf(ctx, p.conf)
125+
if err != nil {
126+
return nil, err
127+
}
128+
p.numSenders++
129+
ps := &pooledSender{
130+
pool: p,
131+
wrapped: s,
132+
}
133+
atomic.AddUint64(&ps.tick, 1)
134+
return ps, nil
103135
}
104136

105-
// Release flushes the LineSender and returns it back to the pool. If the pool
106-
// is full, the sender is closed and discarded. In cases where the sender's
107-
// flush fails, it is not added back to the pool.
108-
func (p *LineSenderPool) Release(ctx context.Context, s LineSender) error {
109-
// If there is an error on flush, do not add the sender back to the pool
110-
if err := s.Flush(ctx); err != nil {
111-
return err
137+
func (p *LineSenderPool) free(ctx context.Context, ps *pooledSender) error {
138+
var flushErr error
139+
140+
if ps.dirty {
141+
flushErr = ps.Flush(ctx)
112142
}
113143

114144
p.mu.Lock()
115145
defer p.mu.Unlock()
116146

117-
for i := range p.senders {
118-
if p.senders[i] == s {
119-
return fmt.Errorf("LineSender %p has already been released back to the pool", s)
120-
}
147+
if flushErr != nil {
148+
// Failed to flush, close and call it a day
149+
p.numSenders--
150+
closeErr := ps.wrapped.Close(ctx)
151+
return fmt.Errorf("%s %w", flushErr, closeErr)
121152
}
122153

123-
if p.closed || len(p.senders) >= p.maxSenders {
124-
return s.Close(ctx)
154+
if ps.dirty || p.closed {
155+
// Previous error or closed pool, close and call it a day
156+
p.numSenders--
157+
return ps.wrapped.Close(ctx)
125158
}
126159

127-
p.senders = append(p.senders, s)
128-
160+
p.freeSenders = append(p.freeSenders, ps)
161+
// Notify free sender waiters, if any
162+
p.cond.Broadcast()
129163
return nil
130164
}
131165

@@ -135,23 +169,28 @@ func (p *LineSenderPool) Close(ctx context.Context) error {
135169
p.mu.Lock()
136170
defer p.mu.Unlock()
137171

172+
if p.closed {
173+
// Already closed
174+
return nil
175+
}
138176
p.closed = true
139177

140178
var senderErrors []error
141179

142-
for _, s := range p.senders {
143-
senderErr := s.Close(ctx)
180+
for _, ps := range p.freeSenders {
181+
senderErr := ps.wrapped.Close(ctx)
144182
if senderErr != nil {
145183
senderErrors = append(senderErrors, senderErr)
146-
147184
}
148185
}
186+
p.numSenders -= len(p.freeSenders)
187+
p.freeSenders = []*pooledSender{}
149188

150189
if len(senderErrors) == 0 {
151190
return nil
152191
}
153192

154-
err := fmt.Errorf("error closing one or more LineSenders in the pool")
193+
err := errors.New("error closing one or more LineSenders in the pool")
155194
for _, senderErr := range senderErrors {
156195
err = fmt.Errorf("%s %w", err, senderErr)
157196
}
@@ -170,10 +209,81 @@ func (p *LineSenderPool) IsClosed() bool {
170209
return p.closed
171210
}
172211

173-
// Len returns the numbers of cached LineSenders in the pool.
212+
// Len returns the number of LineSenders in the pool.
174213
func (p *LineSenderPool) Len() int {
175214
p.mu.Lock()
176215
defer p.mu.Unlock()
177216

178-
return len(p.senders)
217+
return p.numSenders
218+
}
219+
220+
func (ps *pooledSender) Table(name string) LineSender {
221+
ps.wrapped.Table(name)
222+
return ps
223+
}
224+
225+
func (ps *pooledSender) Symbol(name, val string) LineSender {
226+
ps.wrapped.Symbol(name, val)
227+
return ps
228+
}
229+
230+
func (ps *pooledSender) Int64Column(name string, val int64) LineSender {
231+
ps.wrapped.Int64Column(name, val)
232+
return ps
233+
}
234+
235+
func (ps *pooledSender) Long256Column(name string, val *big.Int) LineSender {
236+
ps.wrapped.Long256Column(name, val)
237+
return ps
238+
}
239+
240+
func (ps *pooledSender) TimestampColumn(name string, ts time.Time) LineSender {
241+
ps.wrapped.TimestampColumn(name, ts)
242+
return ps
243+
}
244+
245+
func (ps *pooledSender) Float64Column(name string, val float64) LineSender {
246+
ps.wrapped.Float64Column(name, val)
247+
return ps
248+
}
249+
250+
func (ps *pooledSender) StringColumn(name, val string) LineSender {
251+
ps.wrapped.StringColumn(name, val)
252+
return ps
253+
}
254+
255+
func (ps *pooledSender) BoolColumn(name string, val bool) LineSender {
256+
ps.wrapped.BoolColumn(name, val)
257+
return ps
258+
}
259+
260+
func (ps *pooledSender) AtNow(ctx context.Context) error {
261+
err := ps.wrapped.AtNow(ctx)
262+
if err != nil {
263+
ps.dirty = true
264+
}
265+
return err
266+
}
267+
268+
func (ps *pooledSender) At(ctx context.Context, ts time.Time) error {
269+
err := ps.wrapped.At(ctx, ts)
270+
if err != nil {
271+
ps.dirty = true
272+
}
273+
return err
274+
}
275+
276+
func (ps *pooledSender) Flush(ctx context.Context) error {
277+
err := ps.wrapped.Flush(ctx)
278+
if err != nil {
279+
ps.dirty = true
280+
}
281+
return err
282+
}
283+
284+
func (ps *pooledSender) Close(ctx context.Context) error {
285+
if atomic.AddUint64(&ps.tick, 1)&1 == 1 {
286+
return errors.New("double sender close")
287+
}
288+
return ps.pool.free(ctx, ps)
179289
}

0 commit comments

Comments
 (0)