Skip to content

Commit 64609f0

Browse files
Merge pull request #1 from golang-cz/update
update
2 parents 74e72fb + 3790ff7 commit 64609f0

File tree

2 files changed

+68
-63
lines changed

2 files changed

+68
-63
lines changed

locker.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,6 @@ var (
1414
ErrFailedToCheckLockExistence = errors.New("looper - failed to check lock existence")
1515
)
1616

17-
type lockerKind int
18-
19-
const (
20-
lockerNop lockerKind = iota
21-
lockerRedis
22-
)
23-
2417
// Lock if an error is returned by lock, the job will not be scheduled.
2518
type locker interface {
2619
lock(ctx context.Context, key string, timeout time.Duration) (lock, error)

looper.go

Lines changed: 68 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ func SetPanicHandler(handler PanicHandlerFunc) {
2626
// Looper
2727
type Looper struct {
2828
running bool
29+
jobs []*Job
2930
startupTime time.Duration
30-
locker locker
3131
hooks hooks
3232
mu sync.RWMutex
33-
jobs []*Job
33+
locker locker
3434
}
3535

3636
type (
@@ -46,18 +46,13 @@ type hooks struct {
4646
}
4747

4848
type Config struct {
49-
// Locker for jobs
50-
//
51-
// Options:
52-
// PostgresLocker(ctx context.Context, db *sql.DB, table string)
53-
// RedisLocker(ctx context.Context, rc redis.UniversalClient)
54-
Locker locker
55-
5649
// Startup time ensuring a consistent delay between registered jobs on start of looper.
5750
//
5851
// StartupTime = 1 second; 5 registered jobs; Jobs would be initiated
5952
// with 200ms delay
6053
StartupTime time.Duration
54+
55+
Locker locker
6156
}
6257

6358
type JobFn func(ctx context.Context) error
@@ -226,7 +221,7 @@ func (l *Looper) StartJobByName(jobName string) error {
226221
found = true
227222
if j.Active && !j.Started {
228223
j.Started = true
229-
go j.start()
224+
go j.startLoop()
230225
}
231226
}
232227

@@ -263,7 +258,7 @@ func (l *Looper) startJobs() {
263258
j.mu.Lock()
264259
if j.Active && !j.Started {
265260
j.Started = true
266-
go j.start()
261+
go j.startLoop()
267262
time.Sleep(delay)
268263
}
269264

@@ -299,18 +294,14 @@ func (l *Looper) Stop() {
299294
l.mu.Unlock()
300295
}
301296

302-
func (j *Job) start() {
297+
func (j *Job) startLoop() {
303298
defer func() {
304299
j.mu.Lock()
305300
j.Started = false
306301
j.contextCancel = nil
307302
j.mu.Unlock()
308303
}()
309304

310-
var errLock error
311-
var err error
312-
ctxLock := context.Background()
313-
314305
for {
315306
j.mu.RLock()
316307
if !j.Active || !j.Started {
@@ -319,65 +310,86 @@ func (j *Job) start() {
319310
}
320311
j.mu.RUnlock()
321312

322-
ctx, cancel := context.WithTimeout(context.Background(), j.Timeout)
323-
324-
j.mu.Lock()
325-
j.contextCancel = cancel
326-
j.Running = true
327-
328-
var lo lock
329-
330-
if j.WithLocker {
331-
lo, errLock = j.locker.lock(ctxLock, j.Name, j.Timeout)
332-
if errors.Is(errLock, ErrFailedToObtainLock) {
333-
time.Sleep(time.Duration(time.Second))
334-
j.Running = false
335-
cancel()
336-
j.mu.Unlock()
337-
continue
338-
}
313+
start := time.Now()
339314

340-
if errLock != nil {
341-
err = errLock
342-
}
315+
j.BeforeJob(j.Name)
316+
err := j.start()
317+
if err != nil {
318+
j.AfterJobError(j.Name, time.Since(start), err)
319+
time.Sleep(j.WaitAfterError)
320+
} else {
321+
j.AfterJob(j.Name, time.Since(start))
322+
time.Sleep(j.WaitAfterSuccess)
343323
}
324+
}
325+
}
344326

345-
j.BeforeJob(j.Name)
327+
func (j *Job) start() error {
328+
defer func() {
329+
j.mu.Lock()
330+
j.Running = false
346331
j.mu.Unlock()
332+
}()
347333

348-
start := time.Now()
349-
if err == nil {
350-
err = j.Run(ctx)
351-
}
334+
j.mu.Lock()
335+
j.Running = true
336+
j.mu.Unlock()
352337

353-
if j.WithLocker && errLock == nil {
354-
errLock = lo.unlock(ctxLock)
338+
lo, err := j.lock()
339+
if err != nil {
340+
if errors.Is(err, ErrFailedToObtainLock) {
341+
time.Sleep(time.Second)
342+
return nil
355343
}
356344

357-
if err != nil || errLock != nil {
358-
if err != nil {
359-
j.AfterJobError(j.Name, time.Since(start), err)
360-
} else {
361-
j.AfterJobError(j.Name, time.Since(start), errLock)
362-
}
345+
return err
346+
}
363347

364-
time.Sleep(j.WaitAfterError)
365-
} else {
366-
j.AfterJob(j.Name, time.Since(start))
367-
time.Sleep(j.WaitAfterSuccess)
348+
ctx, cancel := context.WithTimeout(context.Background(), j.Timeout)
349+
defer cancel()
350+
351+
j.contextCancel = cancel
352+
353+
err = j.run(ctx)
354+
if err != nil {
355+
errLock := j.unlock(lo)
356+
if errLock != nil {
357+
return errors.Join(err, errLock)
368358
}
369359

370-
cancel()
360+
return err
361+
}
362+
363+
err = j.unlock(lo)
364+
if err != nil {
365+
return err
371366
}
367+
368+
return nil
369+
}
370+
371+
func (j *Job) lock() (lo lock, err error) {
372+
if j.WithLocker {
373+
lo, err = j.locker.lock(context.Background(), j.Name, j.Timeout)
374+
}
375+
376+
return lo, err
372377
}
373378

374-
func (j *Job) Run(ctx context.Context) (err error) {
379+
func (j *Job) unlock(lo lock) (err error) {
380+
if j.WithLocker {
381+
return lo.unlock(context.Background())
382+
}
383+
384+
return nil
385+
}
386+
387+
func (j *Job) run(ctx context.Context) (err error) {
375388
defer func() {
376389
j.mu.Lock()
377390
defer j.mu.Unlock()
378391

379392
j.LastRun = time.Now()
380-
j.Running = false
381393

382394
r := recover()
383395
if r != nil {

0 commit comments

Comments
 (0)