-
Notifications
You must be signed in to change notification settings - Fork 0
DM-51756: Refactor Prompt Processing activator module #348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This is dead code that was used by the Knative implementation before LocalRepoTracker was introduced.
Many of the objects used by the activators have generic names despite having very specific functions, leading to possible confusion between e.g. multiple Kafka objects. All the objects have now been givenless ambiguous names except the S3 storage client, which really is generic (the bucket is what's raw-specific and has been renamed accordingly).
ServiceSetup abstracts away what should be initialized at service start, so that the Knative and Keda starters no longer depend on objects they don't use themselves.
Dependency inversion means create_app and keda_start no longer need to know about shared components like Kafka or the Butler. Instead, both depend on the new ServiceSetup class.
The code doesn't depend on MWI as a whole, only on its ingest_image method. Abstracting it as a generic ingestion API means that only the caller (process_visit) needs to know about MWI itself.
The code doesn't depend on MWI as a whole, only on its skyangle method (which is a dubious fit for MWI anyway). Abstracting it as a generic sky-angle API means that only the caller (process_visit) needs to know about MWI itself.
This removes dependencies on HTTP servers (of any kind) from the activator. The largest coupling between the new driver_gunicorn and the activator is in the set of exceptions that process_visit may raise. This commit also removes the ability to run a Flask test server on a local machine, a feature we haven't used since the earliest prototyping and that requires a standalone installation of Flask.
This removes dependencies on Keda and Redis from the activator. The largest coupling between the new driver_keda and the activator is in the set of exceptions that process_visit may raise.
Not what the activator only depends on rubin-env packages, it can be imported in unit tests. However, it's still dangerous to handle because a number of things happen at import time (evaluating environment variables, and registering code that should be set up on program start).
7fc8fae
to
7bf79eb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the well-organized commits which made the moving clear!
# Kafka group; must be worker-unique to keep workers from "stealing" messages for others. | ||
notification_kafka_group_id = str(uuid.uuid4()) | ||
# The time (in seconds) after which to ignore old nextVisit messages. | ||
visit_expire = float(os.environ.get("MESSAGE_EXPIRATION", 3600)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not very important... but this variable can only be removed at commit "Factor Keda code into a separate module."
from .setup import ServiceSetup | ||
|
||
# Platform that prompt processing will run on | ||
platform = os.environ["PLATFORM"].lower() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove this parameter at phalanx too?
if id == 42: | ||
return self.visit.get_rotation_sky() | ||
else: | ||
return astropy.coordinates.Angle(-40.0 * u.degree) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm likely missing something but I'm not sure what this else
part is for here at test_one_pass
?
|
||
def test_long_expiration(self): | ||
self.assertTrue(is_processable(self.visit, 3600.0)) | ||
self.assertTrue(is_processable(self.visit, 610.0)) # Test may have long delays in Jenkins |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that we'll see whether Jenkins has a long delay and fail this test?
# Right now, messages are rejected based on message age, not time since exposure. | ||
# This may change in the future. | ||
self.assertFalse(is_processable(self.visit, 550.0)) | ||
self.assertFalse(is_processable(self.visit, 510.0)) # Test may have long delays in Jenkins |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why this line gets this comment. May you please elaborate?
This PR partially breaks up
activator.py
into a new abstraction (ServiceSetup
) and separate "driver" modules for Knative and Keda.activator.py
continues to hold the application-level logic like raw notification handling. The separation makes it possible to start writing unit tests foractivator
, though this version doesn't test anything that depends on Kafka or S3.