Skip to content

Commit 167143e

Browse files
author
Kane
committed
remember last error and keep internal buffer of last N errors. fixed unreliable timeout test
1 parent 2074adb commit 167143e

File tree

2 files changed

+121
-29
lines changed

2 files changed

+121
-29
lines changed

circuitbreaker.go

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
package circuit
3232

3333
import (
34+
"container/ring"
3435
"context"
3536
"errors"
3637
"sync"
@@ -75,12 +76,14 @@ const (
7576
var (
7677
defaultInitialBackOffInterval = 500 * time.Millisecond
7778
defaultBackoffMaxElapsedTime = 0 * time.Second
79+
defaultErrorHistoryDepth = 10
7880
)
7981

8082
// Error codes returned by Call
8183
var (
82-
ErrBreakerOpen = errors.New("breaker open")
83-
ErrBreakerTimeout = errors.New("breaker time out")
84+
ErrBreakerOpen = errors.New("breaker open")
85+
ErrBreakerTimeout = errors.New("breaker time out")
86+
ErrBreakerNoErrorRecorded = errors.New("no error in breaker history")
8487
)
8588

8689
// TripFunc is a function called by a Breaker's Fail() function and determines whether
@@ -115,15 +118,19 @@ type Breaker struct {
115118
eventReceivers []chan BreakerEvent
116119
listeners []chan ListenerEvent
117120
backoffLock sync.Mutex
121+
122+
//ring buffer for last N errors
123+
errorsBuffer *ring.Ring
118124
}
119125

120126
// Options holds breaker configuration options.
121127
type Options struct {
122-
BackOff backoff.BackOff
123-
Clock clock.Clock
124-
ShouldTrip TripFunc
125-
WindowTime time.Duration
126-
WindowBuckets int
128+
BackOff backoff.BackOff
129+
Clock clock.Clock
130+
ShouldTrip TripFunc
131+
WindowTime time.Duration
132+
WindowBuckets int
133+
ErrorHistoryDepth int
127134
}
128135

129136
// NewBreakerWithOptions creates a base breaker with a specified backoff, clock and TripFunc
@@ -153,12 +160,17 @@ func NewBreakerWithOptions(options *Options) *Breaker {
153160
options.WindowBuckets = DefaultWindowBuckets
154161
}
155162

163+
if options.ErrorHistoryDepth <= 0 {
164+
options.ErrorHistoryDepth = defaultErrorHistoryDepth
165+
}
166+
156167
return &Breaker{
157-
BackOff: options.BackOff,
158-
Clock: options.Clock,
159-
ShouldTrip: options.ShouldTrip,
160-
nextBackOff: options.BackOff.NextBackOff(),
161-
counts: newWindow(options.WindowTime, options.WindowBuckets),
168+
BackOff: options.BackOff,
169+
Clock: options.Clock,
170+
ShouldTrip: options.ShouldTrip,
171+
nextBackOff: options.BackOff.NextBackOff(),
172+
counts: newWindow(options.WindowTime, options.WindowBuckets),
173+
errorsBuffer: ring.New(options.ErrorHistoryDepth),
162174
}
163175
}
164176

@@ -293,6 +305,35 @@ func (cb *Breaker) Fail() {
293305
}
294306
}
295307

308+
// FailWithError is the same as Fail, but keeps history of errors in internal ring buffer
309+
func (cb *Breaker) FailWithError(err error) {
310+
cb.errorsBuffer = cb.errorsBuffer.Next()
311+
cb.errorsBuffer.Value = err
312+
cb.Fail()
313+
}
314+
315+
// LastError returns last error from internal buffer
316+
func (cb *Breaker) LastError() error {
317+
if cb.errorsBuffer.Value == nil {
318+
return ErrBreakerNoErrorRecorded
319+
}
320+
return cb.errorsBuffer.Value.(error)
321+
}
322+
323+
// Errors returns all errors from internal buffer
324+
func (cb *Breaker) Errors() (errors []error) {
325+
// reserve capacity to move last error to the end of slice without realloc
326+
errors = make([]error, 0, cb.errorsBuffer.Len()+1)
327+
cb.errorsBuffer.Do(func(x interface{}) {
328+
if x != nil {
329+
errors = append(errors, x.(error))
330+
}
331+
})
332+
// move last error to the end
333+
errors = append(errors[1:], errors[0])
334+
return errors
335+
}
336+
296337
// Success is used to indicate a success condition the Breaker should record. If
297338
// the success was triggered by a retry attempt, the breaker will be Reset().
298339
func (cb *Breaker) Success() {
@@ -302,7 +343,9 @@ func (cb *Breaker) Success() {
302343
cb.backoffLock.Unlock()
303344

304345
state := cb.state()
305-
if state == halfopen {
346+
// if state was halfopen and it's successful request this state will be `open`.
347+
// due to cb.halfOpens is 1 at this point (request grouping)
348+
if state == halfopen || state == open {
306349
cb.Reset()
307350
}
308351
atomic.StoreInt64(&cb.consecFailures, 0)
@@ -362,7 +405,7 @@ func (cb *Breaker) CallContext(ctx context.Context, circuit func() error, timeou
362405

363406
if err != nil {
364407
if ctx.Err() != context.Canceled {
365-
cb.Fail()
408+
cb.FailWithError(err)
366409
}
367410
return err
368411
}

circuitbreaker_test.go

Lines changed: 64 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package circuit
33
import (
44
"context"
55
"fmt"
6-
"sync/atomic"
6+
"runtime"
77
"testing"
88
"time"
99

@@ -253,6 +253,52 @@ func TestThresholdBreakerCalling(t *testing.T) {
253253
}
254254
}
255255

256+
func TestThresholdBreakerErrorHistory(t *testing.T) {
257+
cb := NewThresholdBreaker(2)
258+
err := fmt.Errorf("error 1")
259+
cb.FailWithError(err)
260+
if cb.LastError() != err {
261+
t.Fatal("expected last error to be `error 1`")
262+
}
263+
264+
cb = NewThresholdBreaker(1)
265+
if cb.LastError() != nil {
266+
t.Fatalf("expected last error to be `nil`, got %s", cb.LastError())
267+
}
268+
269+
err = cb.Call(func() error {
270+
return fmt.Errorf("circuit error")
271+
}, 0)
272+
if err == nil {
273+
t.Fatal("expected threshold breaker to error")
274+
}
275+
if !cb.Tripped() {
276+
t.Fatal("expected threshold breaker to be open")
277+
}
278+
if cb.LastError().Error() != "circuit error" {
279+
t.Fatalf("expected last error to be `circut error`, got %s", cb.LastError())
280+
}
281+
282+
cb.Success()
283+
cb.Call(func() error {
284+
return fmt.Errorf("circuit error 1")
285+
}, 0)
286+
if cb.LastError().Error() != "circuit error 1" {
287+
t.Fatalf("expected last error to be `circut error 1`, got %s", cb.LastError())
288+
}
289+
290+
errs := cb.Errors()
291+
if len(errs) != 2 {
292+
t.Fatalf("expected `%d` errors, got %d", 2, len(errs))
293+
}
294+
if errs[0].Error() != "circuit error" {
295+
t.Fatalf("expected `%s` error, got %s", "circuit error", errs[0].Error())
296+
}
297+
if errs[1].Error() != "circuit error 1" {
298+
t.Fatalf("expected `%s` error, got %s", "circuit error 1", errs[0].Error())
299+
}
300+
}
301+
256302
func TestThresholdBreakerCallingContext(t *testing.T) {
257303
circuit := func() error {
258304
return fmt.Errorf("error")
@@ -323,37 +369,40 @@ func TestThresholdBreakerResets(t *testing.T) {
323369
}
324370

325371
func TestTimeoutBreaker(t *testing.T) {
326-
wait := make(chan struct{})
327-
328372
c := clock.NewMock()
329-
called := int32(0)
330373

331374
circuit := func() error {
332-
wait <- struct{}{}
333-
atomic.AddInt32(&called, 1)
334-
<-wait
375+
time.Sleep(100000000 * time.Millisecond)
335376
return nil
336377
}
337378

338379
cb := NewThresholdBreaker(1)
339380
cb.Clock = c
340381

341382
errc := make(chan error)
342-
go func() { errc <- cb.Call(circuit, time.Millisecond) }()
343-
383+
wait := make(chan struct{})
384+
go func() { wait <- struct{}{}; errc <- cb.Call(circuit, time.Millisecond) }()
344385
<-wait
345-
c.Add(time.Millisecond * 3)
346-
wait <- struct{}{}
386+
// yield and advance the clock
387+
runtime.Gosched()
388+
c.Add(time.Millisecond * 1000)
347389

348390
err := <-errc
349-
if err == nil {
350-
t.Fatal("expected timeout breaker to return an error")
391+
if err != ErrBreakerTimeout {
392+
t.Fatalf("expected timeout breaker to return an error `%s`, got %s", ErrBreakerTimeout, err)
351393
}
352394

353-
go cb.Call(circuit, time.Millisecond)
395+
cb.Clock = clock.NewMock()
396+
go func() { wait <- struct{}{}; errc <- cb.Call(circuit, time.Millisecond) }()
354397
<-wait
398+
// yield and advance the clock
399+
runtime.Gosched()
355400
c.Add(time.Millisecond * 3)
356-
wait <- struct{}{}
401+
402+
err = <-errc
403+
if err != ErrBreakerOpen {
404+
t.Fatalf("expected timeout breaker to return an error `%s`, got %s", ErrBreakerOpen, err)
405+
}
357406

358407
if !cb.Tripped() {
359408
t.Fatal("expected timeout breaker to be open")

0 commit comments

Comments
 (0)