Skip to content

Commit ebbb540

Browse files
committed
Extract important queries to postgres functions
The motivating use case for extracting these functions is that it gives more control to the developer using this package on how to deploy this to their infrastructure. For instance, some people may be interested in a different table structure for `payloads`, like having it partitioned. With this change, developers are free to change the implementation of the postgres functions as they see fit as long as they implement an equivalent logic and return the same types.
1 parent 49f4cba commit ebbb540

File tree

6 files changed

+92
-65
lines changed

6 files changed

+92
-65
lines changed

hasql-queue.cabal

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
cabal-version: 1.12
22

3-
-- This file has been generated from package.yaml by hpack version 0.31.2.
3+
-- This file has been generated from package.yaml by hpack version 0.34.4.
44
--
55
-- see: https://github.com/sol/hpack
66
--
7-
-- hash: 956ae93525f9dafcc0c9c8149cd2bbc8cfcfe4e63310adec92ce40f995e4cbf4
7+
-- hash: 30a78bb71c0fb6470ad0d6b6788b23f19801ab253d1c65e008a48e329e01b914
88

99
name: hasql-queue
10-
version: 1.2.0.1
10+
version: 1.3.0.0
1111
synopsis: A PostgreSQL backed queue
1212
description: A PostgreSQL backed queue. Please see README.md
1313
category: Web
@@ -18,7 +18,8 @@ maintainer: [email protected]
1818
copyright: 2020 Jonathan Fischoff
1919
license: BSD3
2020
license-file: LICENSE
21-
tested-with: GHC ==8.8.1
21+
tested-with:
22+
GHC ==8.8.1
2223
build-type: Simple
2324
extra-source-files:
2425
README.md
@@ -42,7 +43,16 @@ library
4243
Paths_hasql_queue
4344
hs-source-dirs:
4445
src
45-
default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes
46+
default-extensions:
47+
OverloadedStrings
48+
LambdaCase
49+
RecordWildCards
50+
TupleSections
51+
GeneralizedNewtypeDeriving
52+
QuasiQuotes
53+
ScopedTypeVariables
54+
TypeApplications
55+
AllowAmbiguousTypes
4656
ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls
4757
build-depends:
4858
aeson
@@ -67,7 +77,16 @@ executable benchmark
6777
Paths_hasql_queue
6878
hs-source-dirs:
6979
benchmarks
70-
default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes
80+
default-extensions:
81+
OverloadedStrings
82+
LambdaCase
83+
RecordWildCards
84+
TupleSections
85+
GeneralizedNewtypeDeriving
86+
QuasiQuotes
87+
ScopedTypeVariables
88+
TypeApplications
89+
AllowAmbiguousTypes
7190
ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls -O2 -threaded -rtsopts -with-rtsopts=-N
7291
build-depends:
7392
aeson
@@ -98,7 +117,16 @@ executable hasql-queue-tmp-db
98117
Paths_hasql_queue
99118
hs-source-dirs:
100119
hasql-queue-tmp-db
101-
default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes
120+
default-extensions:
121+
OverloadedStrings
122+
LambdaCase
123+
RecordWildCards
124+
TupleSections
125+
GeneralizedNewtypeDeriving
126+
QuasiQuotes
127+
ScopedTypeVariables
128+
TypeApplications
129+
AllowAmbiguousTypes
102130
ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls -O2 -threaded -rtsopts -with-rtsopts=-N -g2
103131
build-depends:
104132
aeson
@@ -137,7 +165,16 @@ test-suite unit-tests
137165
Paths_hasql_queue
138166
hs-source-dirs:
139167
test
140-
default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes
168+
default-extensions:
169+
OverloadedStrings
170+
LambdaCase
171+
RecordWildCards
172+
TupleSections
173+
GeneralizedNewtypeDeriving
174+
QuasiQuotes
175+
ScopedTypeVariables
176+
TypeApplications
177+
AllowAmbiguousTypes
141178
ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls -O2 -threaded -rtsopts -with-rtsopts=-N
142179
build-depends:
143180
aeson

package.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: hasql-queue
2-
version: '1.2.0.2'
2+
version: '1.3.0.0'
33
synopsis: A PostgreSQL backed queue
44
description: A PostgreSQL backed queue. Please see README.md
55
category: Web

src/Hasql/Queue/High/ExactlyOnce.hs

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -58,30 +58,13 @@ dequeue valueDecoder count
5858
| count <= 0 = pure []
5959
| otherwise = do
6060
let multipleQuery = [here|
61-
DELETE FROM payloads
62-
WHERE id in
63-
( SELECT p1.id
64-
FROM payloads AS p1
65-
WHERE p1.state='enqueued'
66-
ORDER BY p1.modified_at ASC
67-
FOR UPDATE SKIP LOCKED
68-
LIMIT $1
69-
)
70-
RETURNING value
61+
SELECT value FROM dequeue_payload($1)
7162
|]
63+
7264
multipleEncoder = E.param $ E.nonNullable $ fromIntegral >$< E.int4
7365

7466
singleQuery = [here|
75-
DELETE FROM payloads
76-
WHERE id =
77-
( SELECT p1.id
78-
FROM payloads AS p1
79-
WHERE p1.state='enqueued'
80-
ORDER BY p1.modified_at ASC
81-
FOR UPDATE SKIP LOCKED
82-
LIMIT 1
83-
)
84-
RETURNING value
67+
SELECT value FROM dequeue_payload(1)
8568
|]
8669

8770
singleEncoder = mempty

src/Hasql/Queue/Internal.hs

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,6 @@ newtype PayloadId = PayloadId { unPayloadId :: Int64 }
6262
data Payload a = Payload
6363
{ pId :: PayloadId
6464
, pState :: State
65-
-- TODO do I need this?
66-
, pAttempts :: Int
67-
, pModifiedAt :: Int
68-
-- TODO rename. I don't need this either.
6965
, pValue :: a
7066
} deriving (Show, Eq)
7167

@@ -75,8 +71,6 @@ payloadDecoder thePayloadDecoder
7571
= Payload
7672
<$> payloadIdRow
7773
<*> D.column (D.nonNullable stateDecoder)
78-
<*> D.column (D.nonNullable $ fromIntegral <$> D.int4)
79-
<*> D.column (D.nonNullable $ fromIntegral <$> D.int4)
8074
<*> D.column (D.nonNullable thePayloadDecoder)
8175

8276
payloadIdEncoder :: E.Value PayloadId
@@ -92,9 +86,7 @@ payloadIdRow = D.column (D.nonNullable payloadIdDecoder)
9286
enqueuePayload :: E.Value a -> [a] -> Session [PayloadId]
9387
enqueuePayload theEncoder values = do
9488
let theQuery = [here|
95-
INSERT INTO payloads (attempts, value)
96-
SELECT 0, * FROM unnest($1)
97-
RETURNING id
89+
SELECT id FROM enqueue_payload($1)
9890
|]
9991
encoder = E.param $ E.nonNullable $ E.foldableArray $ E.nonNullable theEncoder
10092
decoder = D.rowList (D.column (D.nonNullable payloadIdDecoder))
@@ -105,30 +97,15 @@ enqueuePayload theEncoder values = do
10597
dequeuePayload :: D.Value a -> Int -> Session [Payload a]
10698
dequeuePayload valueDecoder count = do
10799
let multipleQuery = [here|
108-
DELETE FROM payloads
109-
WHERE id in
110-
( SELECT p1.id
111-
FROM payloads AS p1
112-
WHERE p1.state='enqueued'
113-
ORDER BY p1.modified_at ASC
114-
FOR UPDATE SKIP LOCKED
115-
LIMIT $1
116-
)
117-
RETURNING id, state, attempts, modified_at, value
100+
SELECT id, state, value
101+
FROM dequeue_payload($1)
118102
|]
103+
119104
multipleEncoder = E.param $ E.nonNullable $ fromIntegral >$< E.int4
120105

121106
singleQuery = [here|
122-
DELETE FROM payloads
123-
WHERE id =
124-
( SELECT p1.id
125-
FROM payloads AS p1
126-
WHERE p1.state='enqueued'
127-
ORDER BY p1.modified_at ASC
128-
FOR UPDATE SKIP LOCKED
129-
LIMIT 1
130-
)
131-
RETURNING id, state, attempts, modified_at, value
107+
SELECT id, state, value
108+
FROM dequeue_payload(1)
132109
|]
133110

134111
singleEncoder = mempty
@@ -144,7 +121,7 @@ dequeuePayload valueDecoder count = do
144121
getPayload :: D.Value a -> PayloadId -> Session (Maybe (Payload a))
145122
getPayload decoder payloadId = do
146123
let theQuery = [here|
147-
SELECT id, state, attempts, modified_at, value
124+
SELECT id, state, value
148125
FROM payloads
149126
WHERE id = $1
150127
|]
@@ -168,10 +145,7 @@ getCount = do
168145
incrementAttempts :: Int -> [PayloadId] -> Session ()
169146
incrementAttempts retryCount pids = do
170147
let theQuery = [here|
171-
UPDATE payloads
172-
SET state=CASE WHEN attempts >= $1 THEN 'failed' :: state_t ELSE 'enqueued' END
173-
, attempts=attempts+1
174-
WHERE id = ANY($2)
148+
SELECT increment_payload_attempts($1, $2)
175149
|]
176150
encoder = (fst >$< E.param (E.nonNullable E.int4)) <>
177151
(snd >$< E.param (E.nonNullable $ E.foldableArray $ E.nonNullable payloadIdEncoder))

src/Hasql/Queue/Migrate.hs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,36 @@ migrationQueryString valueType = [i|
6262
CREATE INDEX IF NOT EXISTS active_modified_at_idx ON payloads USING btree (modified_at)
6363
WHERE (state = 'enqueued');
6464

65+
CREATE OR REPLACE FUNCTION dequeue_payload(limit_ INT) RETURNS SETOF payloads AS
66+
$$
67+
WITH available AS
68+
( SELECT p1.id
69+
FROM payloads AS p1
70+
WHERE p1.state='enqueued'
71+
ORDER BY p1.modified_at ASC
72+
FOR UPDATE SKIP LOCKED
73+
LIMIT limit_
74+
)
75+
DELETE FROM payloads
76+
USING available
77+
WHERE payloads.id = available.id
78+
RETURNING payloads.*
79+
$$ LANGUAGE SQL VOLATILE;
80+
81+
CREATE OR REPLACE FUNCTION increment_payload_attempts(threshold_ INT, ids_ BIGINT[]) RETURNS VOID AS
82+
$$
83+
UPDATE payloads
84+
SET state=CASE WHEN attempts >= threshold_ THEN 'failed' :: state_t ELSE 'enqueued' END
85+
, attempts=attempts+1
86+
WHERE id = ANY(ids_)
87+
$$ LANGUAGE SQL VOLATILE;
88+
89+
CREATE OR REPLACE FUNCTION enqueue_payload(values_ ${valueType}[]) RETURNS SETOF payloads AS
90+
$$
91+
INSERT INTO payloads (attempts, value)
92+
SELECT 0, * FROM unnest(values_)
93+
RETURNING *
94+
$$ LANGUAGE SQL VOLATILE;
6595
|]
6696

6797
{-| This function creates a table and enumeration type that is
@@ -123,6 +153,9 @@ Drop everything created by 'migrate'
123153
teardown :: Connection -> IO ()
124154
teardown conn = do
125155
let theQuery = [i|
156+
DROP FUNCTION IF EXISTS enqueue_payload;
157+
DROP FUNCTION IF EXISTS dequeue_payload;
158+
DROP FUNCTION IF EXISTS increment_payload_attempts;
126159
DROP TABLE IF EXISTS payloads;
127160
DROP TYPE IF EXISTS state_t;
128161
DROP SEQUENCE IF EXISTS modified_index;

test/Hasql/Queue/Low/AtLeastOnceSpec.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,8 @@ spec = describe "Hasql.Queue.Low.AtLeastOnce" $ aroundAll withSetup $ describe "
142142
let Just decoded = mapM (decode . encode) xs
143143
sort decoded `shouldBe` sort expected
144144

145-
it "enqueue returns a PayloadId that cooresponds to the entry it added" $ withConnection $ \conn -> do
145+
it "enqueue returns a PayloadId that corresponds to the entry it added" $ withConnection $ \conn -> do
146146
[payloadId] <- I.runThrow (I.enqueuePayload E.int4 [1]) conn
147147
Just actual <- getPayload conn D.int4 payloadId
148148

149-
pValue actual `shouldBe` 1
149+
pId actual `shouldBe` payloadId

0 commit comments

Comments
 (0)