Skip to content

Commit 4e1eb08

Browse files
committed
More test and test_util cleanup
1 parent f82e67b commit 4e1eb08

File tree

3 files changed

+124
-120
lines changed

3 files changed

+124
-120
lines changed

pulpcore/tests/functional/test_pubsub.py

Lines changed: 97 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -29,28 +29,58 @@ def django_connection_reset(request):
2929
django_db_blocker.block()
3030

3131

32-
def test_postgres_pubsub():
33-
"""Testing postgres low-level implementation."""
34-
from django.db import connection
35-
36-
state = SimpleNamespace()
37-
state.got_message = False
38-
with connection.cursor() as cursor:
39-
assert connection.connection is cursor.connection
40-
conn = cursor.connection
41-
# Listen and Notify
42-
conn.execute("LISTEN abc")
43-
conn.add_notify_handler(lambda notification: setattr(state, "got_message", True))
44-
cursor.execute("NOTIFY abc, 'foo'")
45-
assert state.got_message is True
46-
conn.execute("SELECT 1")
47-
assert state.got_message is True
48-
49-
# Reset and retry
32+
class TestPostgresSpecifics:
33+
def test_listen_notify_in_same_process(self):
34+
"""Testing postgres low-level implementation."""
35+
from django.db import connection
36+
37+
state = SimpleNamespace()
5038
state.got_message = False
51-
conn.execute("UNLISTEN abc")
52-
cursor.execute("NOTIFY abc, 'foo'")
53-
assert state.got_message is False
39+
with connection.cursor() as cursor:
40+
assert connection.connection is cursor.connection
41+
conn = cursor.connection
42+
# Listen and Notify
43+
conn.execute("LISTEN abc")
44+
conn.add_notify_handler(lambda notification: setattr(state, "got_message", True))
45+
cursor.execute("NOTIFY abc, 'foo'")
46+
assert state.got_message is True
47+
conn.execute("SELECT 1")
48+
assert state.got_message is True
49+
50+
# Reset and retry
51+
state.got_message = False
52+
conn.execute("UNLISTEN abc")
53+
cursor.execute("NOTIFY abc, 'foo'")
54+
assert state.got_message is False
55+
56+
def test_low_level_assumptions_on_multiprocess(self):
57+
"""Asserts that we are really testing two different connections.
58+
59+
From psycopg, the backend_id is:
60+
"The process ID (PID) of the backend process handling this connection."
61+
"""
62+
from django.db import connection
63+
64+
def host_act(host_turn, log):
65+
with host_turn(): # 1
66+
assert connection.connection is None
67+
with connection.cursor() as cursor:
68+
cursor.execute("select 1")
69+
assert connection.connection is not None
70+
log.put(connection.connection.info.backend_pid)
71+
72+
def child_act(child_turn, log):
73+
with child_turn(): # 2
74+
assert connection.connection is None
75+
with connection.cursor() as cursor:
76+
cursor.execute("select 1")
77+
assert connection.connection is not None
78+
log.put(connection.connection.info.backend_pid)
79+
80+
log = IpcUtil.run(host_act, child_act)
81+
assert len(log) == 2
82+
host_connection_pid, child_connection_pid = log
83+
assert host_connection_pid != child_connection_pid
5484

5585

5686
M = pubsub.PubsubMessage
@@ -59,20 +89,34 @@ def test_postgres_pubsub():
5989
]
6090

6191

92+
def unsubscribe_all(channels, subscriber):
93+
for channel in channels:
94+
subscriber.unsubscribe(channel)
95+
96+
97+
def subscribe_all(channels, subscriber):
98+
for channel in channels:
99+
subscriber.subscribe(channel)
100+
101+
102+
def publish_all(messages, publisher):
103+
for channel, payload in messages:
104+
publisher.publish(channel, payload=payload)
105+
106+
62107
@pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS)
108+
@pytest.mark.parametrize(
109+
"payload",
110+
(
111+
pytest.param(None, id="none"),
112+
pytest.param("", id="empty-string"),
113+
pytest.param("payload", id="non-empty-string"),
114+
pytest.param(123, id="int"),
115+
pytest.param(datetime.now(), id="datetime"),
116+
pytest.param(True, id="bool"),
117+
),
118+
)
63119
class TestPublish:
64-
65-
@pytest.mark.parametrize(
66-
"payload",
67-
(
68-
pytest.param(None, id="none"),
69-
pytest.param("", id="empty-string"),
70-
pytest.param("payload", id="non-empty-string"),
71-
pytest.param(123, id="int"),
72-
pytest.param(datetime.now(), id="datetime"),
73-
pytest.param(True, id="bool"),
74-
),
75-
)
76120
def test_with_payload_as(self, pubsub_backend: pubsub.BasePubSubBackend, payload):
77121
pubsub_backend.publish("channel", payload=payload)
78122

@@ -90,29 +134,17 @@ def test_with_payload_as(self, pubsub_backend: pubsub.BasePubSubBackend, payload
90134
),
91135
)
92136
class TestNoIpcSubscribeFetch:
93-
def unsubscribe_all(self, channels, subscriber):
94-
for channel in channels:
95-
subscriber.unsubscribe(channel)
96-
97-
def subscribe_all(self, channels, subscriber):
98-
for channel in channels:
99-
subscriber.subscribe(channel)
100-
101-
def publish_all(self, messages, publisher):
102-
for channel, payload in messages:
103-
publisher.publish(channel, payload=payload)
104-
105137
def test_with(
106138
self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage]
107139
):
108140
channels = {m.channel for m in messages}
109141
publisher = pubsub_backend
110142
with pubsub_backend() as subscriber:
111-
self.subscribe_all(channels, subscriber)
112-
self.publish_all(messages, publisher)
143+
subscribe_all(channels, subscriber)
144+
publish_all(messages, publisher)
113145
assert subscriber.fetch() == messages
114146

115-
self.unsubscribe_all(channels, subscriber)
147+
unsubscribe_all(channels, subscriber)
116148
assert subscriber.fetch() == []
117149

118150
def test_select_readiness_with(
@@ -122,14 +154,14 @@ def test_select_readiness_with(
122154
CHANNELS = {m.channel for m in messages}
123155
publisher = pubsub_backend
124156
with pubsub_backend() as subscriber:
125-
self.subscribe_all(CHANNELS, subscriber)
157+
subscribe_all(CHANNELS, subscriber)
126158
assert subscriber.get_subscriptions() == CHANNELS
127159

128160
ready, _, _ = select.select([subscriber], [], [], TIMEOUT)
129161
assert subscriber not in ready
130162
assert subscriber.fetch() == []
131163

132-
self.publish_all(messages, publisher)
164+
publish_all(messages, publisher)
133165
ready, _, _ = select.select([subscriber], [], [], TIMEOUT)
134166
assert subscriber in ready
135167

@@ -141,49 +173,19 @@ def test_select_readiness_with(
141173
assert subscriber not in ready
142174
assert subscriber.fetch() == []
143175

144-
self.unsubscribe_all(CHANNELS, subscriber)
145-
self.publish_all(messages, publisher)
176+
unsubscribe_all(CHANNELS, subscriber)
177+
publish_all(messages, publisher)
146178
ready, _, _ = select.select([subscriber], [], [], TIMEOUT)
147179
assert subscriber not in ready
148180
assert subscriber.fetch() == []
149181

150182

151-
def test_postgres_backend_ipc():
152-
"""Asserts that we are really testing two different connections.
153-
154-
From psycopg, the backend_id is:
155-
"The process ID (PID) of the backend process handling this connection."
156-
"""
157-
from django.db import connection
158-
159-
def host_act(host_turn, log):
160-
with host_turn(): # 1
161-
assert connection.connection is None
162-
with connection.cursor() as cursor:
163-
cursor.execute("select 1")
164-
assert connection.connection is not None
165-
log.put(connection.connection.info.backend_pid)
166-
167-
def child_act(child_turn, log):
168-
with child_turn(): # 2
169-
assert connection.connection is None
170-
with connection.cursor() as cursor:
171-
cursor.execute("select 1")
172-
assert connection.connection is not None
173-
log.put(connection.connection.info.backend_pid)
174-
175-
log = IpcUtil.run(host_act, child_act)
176-
assert len(log) == 2
177-
host_connection_pid, child_connection_pid = log
178-
assert host_connection_pid != child_connection_pid
179-
180-
181183
@pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS)
182184
@pytest.mark.parametrize(
183185
"messages",
184186
(
185187
pytest.param([M("a", "A1")], id="single-message"),
186-
pytest.param([M("a", "A1")], id="test-leaking"),
188+
pytest.param([M("a", "A1")], id="test-if-leaking"),
187189
pytest.param([M("b", "B1"), M("b", "B2")], id="two-messages-in-same-channel"),
188190
pytest.param(
189191
[M("c", "C1"), M("c", "C2"), M("d", "D1"), M("d", "D1")],
@@ -212,8 +214,7 @@ def subscriber_act(subscriber_turn, log):
212214
with pubsub_backend() as subscriber:
213215
with subscriber_turn(): # 1
214216
log.put("subscribe")
215-
for channel in CHANNELS:
216-
subscriber.subscribe(channel)
217+
subscribe_all(CHANNELS, subscriber)
217218

218219
with subscriber_turn(): # 3
219220
log.put("fetch")
@@ -225,8 +226,7 @@ def subscriber_act(subscriber_turn, log):
225226
log.put("fetch+unsubscribe")
226227
assert subscriber.fetch() == messages
227228
assert subscriber.fetch() == []
228-
for channel in CHANNELS:
229-
subscriber.unsubscribe(channel)
229+
unsubscribe_all(CHANNELS, subscriber)
230230

231231
with subscriber_turn(done=True): # 7
232232
log.put("fetch-empty")
@@ -235,20 +235,17 @@ def subscriber_act(subscriber_turn, log):
235235
# child
236236
def publisher_act(publisher_turn, log):
237237
publisher = pubsub_backend
238-
with publisher_turn():
238+
with publisher_turn(): # 2
239239
log.put("publish")
240-
for message in messages: # 2
241-
publisher.publish(message.channel, payload=message.payload)
240+
publish_all(messages, publisher)
242241

243-
with publisher_turn():
242+
with publisher_turn(): # 4
244243
log.put("publish")
245-
for message in messages: # 4
246-
publisher.publish(message.channel, payload=message.payload)
244+
publish_all(messages, publisher)
247245

248-
with publisher_turn():
246+
with publisher_turn(): # 6
249247
log.put("publish")
250-
for message in messages: # 6
251-
publisher.publish(message.channel, payload=message.payload)
248+
publish_all(messages, publisher)
252249

253250
log = IpcUtil.run(subscriber_act, publisher_act)
254251
assert log == EXPECTED_LOG
@@ -270,8 +267,7 @@ def subscriber_act(subscriber_turn, log):
270267
with pubsub_backend() as subscriber:
271268
with subscriber_turn(): # 1
272269
log.put("subscribe/select-empty")
273-
for channel in CHANNELS:
274-
subscriber.subscribe(channel)
270+
subscribe_all(CHANNELS, subscriber)
275271
assert subscriber.get_subscriptions() == CHANNELS
276272
ready, _, _ = select.select([subscriber], [], [], TIMEOUT)
277273
assert subscriber not in ready
@@ -286,8 +282,7 @@ def subscriber_act(subscriber_turn, log):
286282
assert subscriber in ready
287283
assert subscriber.fetch() == messages
288284
assert subscriber.fetch() == []
289-
for channel in CHANNELS:
290-
subscriber.unsubscribe(channel)
285+
unsubscribe_all(CHANNELS, subscriber)
291286

292287
with subscriber_turn(done=True): # 5
293288
log.put("fetch/select-empty")
@@ -299,13 +294,11 @@ def publisher_act(publisher_turn, log):
299294
publisher = pubsub_backend
300295
with publisher_turn(): # 2
301296
log.put("publish")
302-
for message in messages:
303-
publisher.publish(message.channel, payload=message.payload)
297+
publish_all(messages, publisher)
304298

305299
with publisher_turn(): # 4
306300
log.put("publish")
307-
for message in messages:
308-
publisher.publish(message.channel, payload=message.payload)
301+
publish_all(messages, publisher)
309302

310303
log = IpcUtil.run(subscriber_act, publisher_act)
311304
assert log == EXPECTED_LOG

pulpcore/tests/functional/test_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def child_act(child_turn, log):
1919
with pytest.raises(Exception, match=error_msg):
2020
IpcUtil.run(host_act, child_act)
2121

22-
def test_turns_are_respected(self):
22+
def test_turns_are_deterministic(self):
2323
RUNS = 1000
2424
errors = 0
2525

pulpcore/tests/functional/utils.py

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,27 @@ class ProcessErrorData(NamedTuple):
171171

172172

173173
class IpcUtil:
174+
TIMEOUT_ERROR_MESSAGE = (
175+
"Tip: make sure the last 'with turn()' (in execution order) "
176+
"is called with 'actor_turn(done=True)', otherwise it may hang."
177+
)
178+
SUBPROCESS_ERROR_HEADER_TEMPLATE = "Error from sub-process (pid={pid) on test using IpcUtil"
179+
TURN_WAIT_TIMEOUT = 1
174180

175181
@staticmethod
176182
def run(host_act, child_act) -> list:
177-
# ensures a connection from one run doesn't interfere with the other
183+
"""Run two processes in synchronous alternate turns.
184+
185+
The act are functions with the signature (act_turn, log), where act_turn is
186+
a context manager where each step of the act takes place, and log is a
187+
queue where each actor can put messages in using Q.put(item).
188+
189+
Args:
190+
host_act: The function of the act that start the communication
191+
child_act: The function of the act that follows host_act
192+
Returns:
193+
A list with the items collected via log.
194+
"""
178195
conn_1, conn_2 = mp.Pipe()
179196
log = mp.SimpleQueue()
180197
lock = mp.Lock()
@@ -208,19 +225,12 @@ def run(host_act, child_act) -> list:
208225
@staticmethod
209226
@contextmanager
210227
def _actor_turn(conn: Connection, starts: bool, log, lock: mp.Lock, done: bool = False):
211-
TIMEOUT = 1
228+
def flush_conn(conn: Connection):
229+
if not conn.poll(IpcUtil.TURN_WAIT_TIMEOUT):
230+
raise TimeoutError(IpcUtil.TIMEOUT_ERROR_MESSAGE)
231+
conn.recv()
212232

213233
try:
214-
215-
def flush_conn(conn):
216-
if not conn.poll(TIMEOUT):
217-
err_msg = (
218-
"Tip: make sure the last 'with turn()' (in execution order) "
219-
"is called with 'actor_turn(done=True)', otherwise it may hang."
220-
)
221-
raise TimeoutError(err_msg)
222-
conn.recv()
223-
224234
if starts:
225235
with lock:
226236
conn.send("done")
@@ -234,9 +244,8 @@ def flush_conn(conn):
234244
conn.send("done")
235245
except Exception as e:
236246
traceback.print_exc(file=sys.stderr)
237-
err_header = f"Error from sub-process (pid={os.getpid()}) on test using IpcUtil"
238-
traceback_str = f"{err_header}\n\n{traceback.format_exc()}"
239-
247+
error_header = IpcUtil.SUBPROCESS_ERROR_HEADER_TEMPLATE.format(pid=os.getpid())
248+
traceback_str = f"{error_header}\n\n{traceback.format_exc()}"
240249
error = ProcessErrorData(e, traceback_str)
241250
log.put(error)
242251
exit(1)
@@ -246,4 +255,6 @@ def read_log(log: mp.SimpleQueue) -> list:
246255
result = []
247256
while not log.empty():
248257
result.append(log.get())
258+
for item in result:
259+
log.put(item)
249260
return result

0 commit comments

Comments
 (0)