1
- package looper
1
+ package lockerpostgresql
2
2
3
3
import (
4
4
"context"
5
5
"database/sql"
6
6
"fmt"
7
7
"time"
8
+
9
+ "github.com/golang-cz/looper"
8
10
)
9
11
10
12
const defaultTableName = "looper_lock"
11
13
12
14
// PostgresLocker provides an implementation of the Locker interface using
13
15
// a PostgreSQL table for storage.
14
- func PostgresLocker (ctx context.Context , db * sql.DB , tableName string ) (locker , error ) {
16
+ func PostgresLocker (ctx context.Context , db * sql.DB , tableName string ) (looper. Locker , error ) {
15
17
if err := db .PingContext (ctx ); err != nil {
16
- return nil , fmt .Errorf ("%w: %v" , ErrFailedToConnectToLocker , err )
18
+ return nil , fmt .Errorf ("%w: %v" , looper . ErrFailedToConnectToLocker , err )
17
19
}
18
20
19
21
if tableName == "" {
@@ -34,7 +36,7 @@ func PostgresLocker(ctx context.Context, db *sql.DB, tableName string) (locker,
34
36
}
35
37
36
38
// Locker
37
- var _ locker = (* postgresLocker )(nil )
39
+ var _ looper. Locker = (* postgresLocker )(nil )
38
40
39
41
type postgresLocker struct {
40
42
db * sql.DB
@@ -55,7 +57,7 @@ func createLockTable(ctx context.Context, db *sql.DB, table string) error {
55
57
),
56
58
).Scan (& tableExists )
57
59
if err != nil {
58
- return fmt .Errorf ("%w: %v" , ErrFailedToCheckLockExistence , err )
60
+ return fmt .Errorf ("%w: %v" , looper . ErrFailedToCheckLockExistence , err )
59
61
}
60
62
61
63
if ! tableExists {
@@ -69,59 +71,50 @@ func createLockTable(ctx context.Context, db *sql.DB, table string) error {
69
71
table ,
70
72
))
71
73
if err != nil {
72
- return fmt .Errorf ("%w: %v" , ErrFailedToCreateLockTable , err )
74
+ return fmt .Errorf ("%w: %v" , looper . ErrFailedToCreateLockTable , err )
73
75
}
74
76
}
75
77
76
78
return nil
77
79
}
78
80
79
- func (p * postgresLocker ) lock (
80
- ctx context.Context ,
81
- key string ,
82
- timeout time.Duration ,
83
- ) (lock , error ) {
81
+ func (p * postgresLocker ) Lock (ctx context.Context , key string , timeout time.Duration ) (looper.Lock , error ) {
84
82
// Create a row in the lock table to acquire the lock
85
- _ , err := p .db .ExecContext (
86
- ctx ,
87
- fmt .Sprintf (`
83
+ q := fmt .Sprintf (`
88
84
INSERT INTO %s (job_name)
89
85
VALUES ('%s');` ,
90
- p .table ,
91
- key ,
92
- ))
93
- if err != nil {
94
- var createdAt time.Time
95
- err := p .db .QueryRowContext (
96
- ctx ,
97
- fmt .Sprintf (`
86
+ p .table ,
87
+ key ,
88
+ )
89
+ if _ , err := p .db .ExecContext (ctx , q ); err != nil {
90
+ q := fmt .Sprintf (`
98
91
SELECT created_at
99
92
FROM %s
100
93
WHERE job_name = '%s';` ,
101
- p .table ,
102
- key ,
103
- )).Scan (& createdAt )
104
- if err != nil {
105
- return nil , ErrFailedToCheckLockExistence
94
+ p .table ,
95
+ key ,
96
+ )
97
+
98
+ var createdAt time.Time
99
+ if err := p .db .QueryRowContext (ctx , q ).Scan (& createdAt ); err != nil {
100
+ return nil , looper .ErrFailedToCheckLockExistence
106
101
}
107
102
108
103
if createdAt .Before (time .Now ().Add (- timeout )) {
109
- _ , err := p .db .ExecContext (
110
- ctx ,
111
- fmt .Sprintf (`
104
+ q := fmt .Sprintf (`
112
105
DELETE FROM %s
113
106
WHERE job_name = '%s';` ,
114
- p .table ,
115
- key ,
116
- ) )
117
- if err != nil {
118
- return nil , ErrFailedToReleaseLock
107
+ p .table ,
108
+ key ,
109
+ )
110
+ if _ , err := p . db . ExecContext ( ctx , q ); err != nil {
111
+ return nil , looper . ErrFailedToReleaseLock
119
112
}
120
113
121
- return p .lock (ctx , key , timeout )
114
+ return p .Lock (ctx , key , timeout )
122
115
}
123
116
124
- return nil , ErrFailedToObtainLock
117
+ return nil , looper . ErrFailedToObtainLock
125
118
}
126
119
127
120
pl := & postgresLock {
@@ -134,15 +127,15 @@ func (p *postgresLocker) lock(
134
127
}
135
128
136
129
// Lock
137
- var _ lock = (* postgresLock )(nil )
130
+ var _ looper. Lock = (* postgresLock )(nil )
138
131
139
132
type postgresLock struct {
140
133
db * sql.DB
141
134
table string
142
135
key string
143
136
}
144
137
145
- func (p * postgresLock ) unlock (ctx context.Context ) error {
138
+ func (p * postgresLock ) Unlock (ctx context.Context ) error {
146
139
// Release the lock by deleting the row
147
140
_ , err := p .db .ExecContext (
148
141
ctx ,
@@ -153,7 +146,7 @@ func (p *postgresLock) unlock(ctx context.Context) error {
153
146
p .key ,
154
147
))
155
148
if err != nil {
156
- return ErrFailedToReleaseLock
149
+ return looper . ErrFailedToReleaseLock
157
150
}
158
151
159
152
return nil
0 commit comments