Skip to content

Conversation

kfindeisen
Copy link
Member

@kfindeisen kfindeisen commented Sep 17, 2025

This PR partially breaks up activator.py into a new abstraction (ServiceSetup) and separate "driver" modules for Knative and Keda. activator.py continues to hold the application-level logic like raw notification handling. The separation makes it possible to start writing unit tests for activator, though this version doesn't test anything that depends on Kafka or S3.

This is dead code that was used by the Knative implementation before
LocalRepoTracker was introduced.
Many of the objects used by the activators have generic names despite
having very specific functions, leading to possible confusion between
e.g. multiple Kafka objects. All the objects have now been givenless
ambiguous names except the S3 storage client, which really is generic
(the bucket is what's raw-specific and has been renamed accordingly).
ServiceSetup abstracts away what should be initialized at service
start, so that the Knative and Keda starters no longer depend on
objects they don't use themselves.
Dependency inversion means create_app and keda_start no longer need to
know about shared components like Kafka or the Butler. Instead, both
depend on the new ServiceSetup class.
The code doesn't depend on MWI as a whole, only on its ingest_image
method. Abstracting it as a generic ingestion API means that only the
caller (process_visit) needs to know about MWI itself.
The code doesn't depend on MWI as a whole, only on its skyangle method (which
is a dubious fit for MWI anyway). Abstracting it as a generic sky-angle API
means that only the caller (process_visit) needs to know about MWI itself.
This removes dependencies on HTTP servers (of any kind) from the
activator. The largest coupling between the new driver_gunicorn and the
activator is in the set of exceptions that process_visit may raise.

This commit also removes the ability to run a Flask test server on a
local machine, a feature we haven't used since the earliest prototyping
and that requires a standalone installation of Flask.
This removes dependencies on Keda and Redis from the activator. The
largest coupling between the new driver_keda and the activator is in
the set of exceptions that process_visit may raise.
Not what the activator only depends on rubin-env packages, it can be
imported in unit tests. However, it's still dangerous to handle because
a number of things happen at import time (evaluating environment
variables, and registering code that should be set up on program
start).
@kfindeisen kfindeisen marked this pull request as ready for review September 29, 2025 22:44
@kfindeisen kfindeisen requested a review from hsinfang September 30, 2025 16:54
Copy link
Collaborator

@hsinfang hsinfang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the well-organized commits which made the moving clear!

# Kafka group; must be worker-unique to keep workers from "stealing" messages for others.
notification_kafka_group_id = str(uuid.uuid4())
# The time (in seconds) after which to ignore old nextVisit messages.
visit_expire = float(os.environ.get("MESSAGE_EXPIRATION", 3600))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not very important... but this variable can only be removed at commit "Factor Keda code into a separate module."

from .setup import ServiceSetup

# Platform that prompt processing will run on
platform = os.environ["PLATFORM"].lower()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove this parameter at phalanx too?

if id == 42:
return self.visit.get_rotation_sky()
else:
return astropy.coordinates.Angle(-40.0 * u.degree)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm likely missing something but I'm not sure what this else part is for here at test_one_pass?


def test_long_expiration(self):
self.assertTrue(is_processable(self.visit, 3600.0))
self.assertTrue(is_processable(self.visit, 610.0)) # Test may have long delays in Jenkins
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that we'll see whether Jenkins has a long delay and fail this test?

# Right now, messages are rejected based on message age, not time since exposure.
# This may change in the future.
self.assertFalse(is_processable(self.visit, 550.0))
self.assertFalse(is_processable(self.visit, 510.0)) # Test may have long delays in Jenkins
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why this line gets this comment. May you please elaborate?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants