Skip to content

Commit f3724f3

Browse files
committed
Add unit tests for activator.py.
Not what the activator only depends on rubin-env packages, it can be imported in unit tests. However, it's still dangerous to handle because a number of things happen at import time (evaluating environment variables, and registering code that should be set up on program start).
1 parent 6bb783f commit f3724f3

File tree

2 files changed

+313
-0
lines changed

2 files changed

+313
-0
lines changed

tests/test_activator.py

Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
# This file is part of prompt_processing.
2+
#
3+
# Developed for the LSST Data Management System.
4+
# This product includes software developed by the LSST Project
5+
# (https://www.lsst.org).
6+
# See the COPYRIGHT file at the top-level directory of this distribution
7+
# for details of code ownership.
8+
#
9+
# This program is free software: you can redistribute it and/or modify
10+
# it under the terms of the GNU General Public License as published by
11+
# the Free Software Foundation, either version 3 of the License, or
12+
# (at your option) any later version.
13+
#
14+
# This program is distributed in the hope that it will be useful,
15+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17+
# GNU General Public License for more details.
18+
#
19+
# You should have received a copy of the GNU General Public License
20+
# along with this program. If not, see <https://www.gnu.org/licenses/>.
21+
22+
23+
import os.path
24+
import signal
25+
import time
26+
import unittest
27+
28+
import astropy.coordinates
29+
import astropy.time
30+
import astropy.units as u
31+
32+
import shared.raw
33+
from shared.visit import FannedOutVisit
34+
import activator.setup
35+
36+
# Mandatory envvars are loaded at import time
37+
os.environ["RUBIN_INSTRUMENT"] = "LSSTCam"
38+
os.environ["SKYMAP"] = "lsst_cells_v1"
39+
os.environ["CENTRAL_REPO"] = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data", "central_repo")
40+
os.environ["S3_ENDPOINT_URL"] = "https://this.is.a.test"
41+
os.environ["IMAGE_BUCKET"] = "test-bucket-test"
42+
os.environ["KAFKA_CLUSTER"] = ""
43+
os.environ["PREPROCESSING_PIPELINES_CONFIG"] = "- pipelines: []"
44+
os.environ["MAIN_PIPELINES_CONFIG"] = "- pipelines: []"
45+
46+
from activator.activator import _filter_exposures, _ingest_existing_raws, is_processable, \
47+
time_since, with_signal # noqa: E402, no code before imports
48+
49+
50+
# Use of @ServiceSetup.check_on_init in activator interferes with unit tests of ServiceSetup itself
51+
# TODO: find a cleaner way to isolate the tests
52+
activator.setup.ServiceSetup.reset()
53+
54+
55+
# TODO: find a way to test functions that take `confluent_kafka.Message` inputs or mock Kafka consumers
56+
# TODO: find a way to test functions that take S3 notification objects
57+
58+
59+
class FilterExposuresTest(unittest.TestCase):
60+
def setUp(self):
61+
expid = 42
62+
rot = -45.0
63+
self.visit = FannedOutVisit(instrument=os.environ["RUBIN_INSTRUMENT"],
64+
detector=90,
65+
groupId=str(expid),
66+
nimages=1,
67+
filters="k0123",
68+
coordinateSystem=FannedOutVisit.CoordSys.ICRS,
69+
position=[-42.0, 38.2],
70+
startTime=1747891209.9,
71+
rotationSystem=FannedOutVisit.RotSys.SKY,
72+
cameraAngle=rot,
73+
survey="SURVEY",
74+
salIndex=3,
75+
scriptSalIndex=3,
76+
dome=FannedOutVisit.Dome.OPEN,
77+
duration=35.0,
78+
totalCheckpoints=1,
79+
private_sndStamp=1747891150.0,
80+
)
81+
82+
def test_one_pass(self):
83+
exposures = {42}
84+
85+
def _get_angle(id):
86+
if id == 42:
87+
return self.visit.get_rotation_sky()
88+
else:
89+
return astropy.coordinates.Angle(-40.0 * u.degree)
90+
91+
self.assertEqual(_filter_exposures(exposures, self.visit, _get_angle), {42})
92+
93+
def test_one_fail(self):
94+
exposures = {42}
95+
96+
def _get_angle(id):
97+
return astropy.coordinates.Angle(42.0 * u.degree)
98+
99+
self.assertEqual(_filter_exposures(exposures, self.visit, _get_angle), set())
100+
101+
def test_two_mixed(self):
102+
exposures = {42, 52}
103+
104+
def _get_angle(id):
105+
if id == 42:
106+
return self.visit.get_rotation_sky()
107+
else:
108+
return astropy.coordinates.Angle(-40.0 * u.degree)
109+
110+
self.assertEqual(_filter_exposures(exposures, self.visit, _get_angle), {42})
111+
112+
def test_empty(self):
113+
exposures = set()
114+
115+
def _get_angle(id):
116+
if id == 42:
117+
return self.visit.get_rotation_sky()
118+
else:
119+
return astropy.coordinates.Angle(-40.0 * u.degree)
120+
121+
self.assertEqual(_filter_exposures(exposures, self.visit, _get_angle), set())
122+
123+
124+
class IngestExistingRawsTest(unittest.TestCase):
125+
def setUp(self):
126+
def _mock_ingest(oid):
127+
return shared.raw.get_exp_id_from_oid(oid)
128+
129+
self.ingester = unittest.mock.Mock(side_effect=_mock_ingest)
130+
self.visit = FannedOutVisit(instrument=os.environ["RUBIN_INSTRUMENT"],
131+
detector=90,
132+
groupId="88",
133+
nimages=1,
134+
filters="k0123",
135+
coordinateSystem=FannedOutVisit.CoordSys.ICRS,
136+
position=[-42.0, 38.2],
137+
startTime=1747891209.9,
138+
rotationSystem=FannedOutVisit.RotSys.SKY,
139+
cameraAngle=0,
140+
survey="SURVEY",
141+
salIndex=3,
142+
scriptSalIndex=3,
143+
dome=FannedOutVisit.Dome.OPEN,
144+
duration=35.0,
145+
totalCheckpoints=1,
146+
private_sndStamp=1747891150.0,
147+
)
148+
149+
def test_one_ingest(self):
150+
known_ids = set()
151+
oid = os.environ["RUBIN_INSTRUMENT"] + "/20250926/blah_blah_20250926_42" \
152+
"/blah_blah_20250926_42_R00_S00.fits"
153+
with unittest.mock.patch("activator.activator.check_for_snap", return_value=oid), \
154+
unittest.mock.patch("activator.activator._get_storage_client"):
155+
_ingest_existing_raws(self.visit, 1, self.ingester, known_ids)
156+
self.assertEqual(known_ids, {2025092600042})
157+
self.ingester.assert_called_once_with(oid)
158+
159+
def test_one_ingest_missing(self):
160+
known_ids = set()
161+
with unittest.mock.patch("activator.activator.check_for_snap", return_value=None), \
162+
unittest.mock.patch("activator.activator._get_storage_client"):
163+
_ingest_existing_raws(self.visit, 1, self.ingester, known_ids)
164+
self.assertEqual(known_ids, set())
165+
self.ingester.assert_not_called()
166+
167+
def test_one_ingest_duplicate(self):
168+
known_ids = {2025092600042}
169+
oid = os.environ["RUBIN_INSTRUMENT"] + "/20250926/blah_blah_20250926_42" \
170+
"/blah_blah_20250926_42_R00_S00.fits"
171+
with unittest.mock.patch("activator.activator.check_for_snap", return_value=oid), \
172+
unittest.mock.patch("activator.activator._get_storage_client"):
173+
_ingest_existing_raws(self.visit, 1, self.ingester, known_ids)
174+
self.assertEqual(known_ids, {2025092600042})
175+
self.ingester.assert_not_called()
176+
177+
def test_one_ingest_truncated(self):
178+
known_ids = set()
179+
oids = [os.environ["RUBIN_INSTRUMENT"] + f"/20250926/blah_blah_20250926_{seq}"
180+
f"/blah_blah_20250926_{seq}_R00_S00.fits"
181+
for seq in [42, 43]]
182+
with unittest.mock.patch("activator.activator.check_for_snap", side_effect=oids), \
183+
unittest.mock.patch("activator.activator._get_storage_client"):
184+
_ingest_existing_raws(self.visit, 1, self.ingester, known_ids)
185+
# Second snap ignored
186+
self.assertEqual(known_ids, {2025092600042})
187+
self.ingester.assert_called_once_with(oids[0])
188+
189+
def test_two_ingest(self):
190+
known_ids = set()
191+
oids = [os.environ["RUBIN_INSTRUMENT"] + f"/20250926/blah_blah_20250926_{seq}"
192+
f"/blah_blah_20250926_{seq}_R00_S00.fits"
193+
for seq in [42, 43]]
194+
with unittest.mock.patch("activator.activator.check_for_snap", side_effect=oids), \
195+
unittest.mock.patch("activator.activator._get_storage_client"):
196+
_ingest_existing_raws(self.visit, 2, self.ingester, known_ids)
197+
self.assertEqual(known_ids, {2025092600042, 2025092600043})
198+
self.assertEqual(self.ingester.call_count, 2)
199+
for oid in oids:
200+
self.ingester.assert_any_call(oid)
201+
202+
def test_two_ingest_duplicate(self):
203+
known_ids = {2025092600042}
204+
oids = [os.environ["RUBIN_INSTRUMENT"] + f"/20250926/blah_blah_20250926_{seq}"
205+
f"/blah_blah_20250926_{seq}_R00_S00.fits"
206+
for seq in [42, 43]]
207+
with unittest.mock.patch("activator.activator.check_for_snap", side_effect=oids), \
208+
unittest.mock.patch("activator.activator._get_storage_client"):
209+
_ingest_existing_raws(self.visit, 2, self.ingester, known_ids)
210+
self.assertEqual(known_ids, {2025092600042, 2025092600043})
211+
self.ingester.assert_called_once_with(oids[1])
212+
213+
214+
class IsProcessableTest(unittest.TestCase):
215+
def setUp(self):
216+
now = astropy.time.Time.now().replicate(format="unix_tai")
217+
# Generated 10 minutes before now, observed 8 minutes before
218+
self.visit = FannedOutVisit(instrument=os.environ["RUBIN_INSTRUMENT"],
219+
detector=90,
220+
groupId="88",
221+
nimages=1,
222+
filters="k0123",
223+
coordinateSystem=FannedOutVisit.CoordSys.ICRS,
224+
position=[-42.0, 38.2],
225+
startTime=float(now.value) - 500.0,
226+
rotationSystem=FannedOutVisit.RotSys.SKY,
227+
cameraAngle=0,
228+
survey="SURVEY",
229+
salIndex=3,
230+
scriptSalIndex=3,
231+
dome=FannedOutVisit.Dome.OPEN,
232+
duration=35.0,
233+
totalCheckpoints=1,
234+
private_sndStamp=float(now.value) - 600.0,
235+
)
236+
237+
def test_long_expiration(self):
238+
self.assertTrue(is_processable(self.visit, 3600.0))
239+
self.assertTrue(is_processable(self.visit, 610.0)) # Test may have long delays in Jenkins
240+
241+
def test_short_expiration(self):
242+
self.assertFalse(is_processable(self.visit, 100.0))
243+
self.assertFalse(is_processable(self.visit, 499.9))
244+
245+
def test_mid_expiration(self):
246+
# Right now, messages are rejected based on message age, not time since exposure.
247+
# This may change in the future.
248+
self.assertFalse(is_processable(self.visit, 550.0))
249+
self.assertFalse(is_processable(self.visit, 510.0)) # Test may have long delays in Jenkins
250+
self.assertFalse(is_processable(self.visit, 599.9))
251+
252+
253+
class TimeSinceTest(unittest.TestCase):
254+
def test_timed_time_since(self):
255+
start = time.time()
256+
time.sleep(2.0)
257+
self.assertGreater(time_since(start), 2.0)
258+
self.assertLess(time_since(start), 5.0) # Test may have long delays in Jenkins
259+
260+
261+
class WithSignalTest(unittest.TestCase):
262+
def setUp(self):
263+
self.handler = unittest.mock.Mock()
264+
265+
@unittest.removeHandler
266+
def test_no_default(self):
267+
# This test uses SIGPIPE because it's ignored by default
268+
def _ping():
269+
signal.raise_signal(signal.SIGPIPE)
270+
271+
ping = with_signal(signal.SIGPIPE, self.handler)(_ping)
272+
273+
_ping()
274+
self.handler.assert_not_called()
275+
self.handler.reset_mock()
276+
277+
ping()
278+
self.handler.assert_called_once_with(signal.SIGPIPE, unittest.mock.ANY)
279+
self.handler.reset_mock()
280+
281+
_ping()
282+
self.handler.assert_not_called()
283+
self.handler.reset_mock()
284+
285+
@unittest.removeHandler
286+
def test_old_handler(self):
287+
old_handler = unittest.mock.Mock()
288+
signal.signal(signal.SIGUSR1, old_handler)
289+
self.addCleanup(signal.signal, signal.SIGUSR1, signal.SIG_DFL)
290+
291+
def _ping():
292+
signal.raise_signal(signal.SIGUSR1)
293+
294+
ping = with_signal(signal.SIGUSR1, self.handler)(_ping)
295+
296+
_ping()
297+
old_handler.assert_called_once_with(signal.SIGUSR1, unittest.mock.ANY)
298+
self.handler.assert_not_called()
299+
old_handler.reset_mock()
300+
self.handler.reset_mock()
301+
302+
ping()
303+
old_handler.assert_not_called()
304+
self.handler.assert_called_once_with(signal.SIGUSR1, unittest.mock.ANY)
305+
old_handler.reset_mock()
306+
self.handler.reset_mock()
307+
308+
_ping()
309+
old_handler.assert_called_once_with(signal.SIGUSR1, unittest.mock.ANY)
310+
self.handler.assert_not_called()
311+
old_handler.reset_mock()
312+
self.handler.reset_mock()

tests/test_setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
class ServiceSetupTest(unittest.TestCase):
3030
def setUp(self):
3131
super().setUp()
32+
ServiceSetup.reset()
3233
self.addCleanup(ServiceSetup.reset)
3334

3435
def test_empty(self):

0 commit comments

Comments
 (0)