Skip to content

Commit 29ed46a

Browse files
committed
Merge feature/service-status-events into staging (#55)
* Bump main to staging (#7) * Setup (#1) - Add tokio, serde and sqlx crates - Setup .gitignore - Setup Cargo.toml - Setup pipelines * Enable Dependabot (#3) Add dependabot.yml with daily cargo updates configured * Fix staging pipelines (#4) Fix staging pipelines not triggering on pull request * Implement Cargo caching (#5) Add Swatinem's Rust-Cache@v2 action to build, test and deploy pipelines * Improve README.md (#6) - Add deployment badges - Add collaborating info (board + issues) * Clonable Status - Made Status clonable by not using BoxedErrors anymore but Strings for holding the error information - Add get_status() to Service - Made status property of Service private * Adapt Service Manager to new Status enum * Events - Implement Event<T> - Add status_changed Event to ServiceInfo * Event improvements - Add name attribute to Event<T> - Unify Channel and Closure subscribers by using an Enum - Propagate errors when dispatching events - Add error log when errors occur while dispatching events - Subscribers are now removed from an event when they run into an error while dispatching * Event improvements Make the removal of Event subscribers on error optional * Slight refactors - Refactors in service_manager.rs - Refactors in watchdog.rs * WIP: Idk lol I made these changes many months ago. Reviewed them for like half an hour, looks good. I know what I was working on. Will continue now :) * add: allow clippy::multiple_bound_locations for service module * add: observables * refactor: use Mutex instead of RwLock everywhere * refactor: make remove_on_error work on per-subscriber basis * refactor: make subscribers identifiable * refactor: move dispatch logic to Subscriber * add: AsyncClosure Callback type * WIP: EventRepeater * add: EventRepeater * refactor: event subscribe method names * refactor: move subscription into own module * add: AsRef<Event<T>> * add: UUID, PartialEq/Eq, unsubscribe() * add: event_repeater detach(), close() * add: attach/deattach EventRepeater on start/stop of service * add: service runtime failure handling * fix: bump version to 0.2.1
1 parent 52bb11f commit 29ed46a

19 files changed

+981
-240
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "lum"
3-
version = "0.1.0"
3+
version = "0.2.1"
44
edition = "2021"
55
description = "Lum Discord Bot"
66
license= "MIT"
@@ -23,3 +23,4 @@ serenity = { version = "0.12.0", default-features=false, features = ["builder",
2323
sqlx = { version = "0.8.0", features = ["runtime-tokio", "any", "postgres", "mysql", "sqlite", "tls-native-tls", "migrate", "macros", "uuid", "chrono", "json"] }
2424
thiserror = "1.0.52"
2525
tokio = { version = "1.35.1", features = ["full"] }
26+
uuid = { version = "1.10.0", features = ["fast-rng", "macro-diagnostics", "v4"] }

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@ Beta: [![Deploy](https://github.com/Kitt3120/lum/actions/workflows/deploy_prerel
1414

1515
## Collaborating
1616

17-
Checkout out [Milestones](https://github.com/Kitt3120/lum/milestones), [Board](https://github.com/users/Kitt3120/projects/3), and [Issues](https://github.com/Kitt3120/lum/issues)
17+
Check out [Milestones](https://github.com/Kitt3120/lum/milestones), [Board](https://github.com/users/Kitt3120/projects/3), and [Issues](https://github.com/Kitt3120/lum/issues)

build.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
fn main() {
33
// trigger recompilation when a new migration is added
44
println!("cargo:rerun-if-changed=migrations");
5-
}
5+
}

src/bot.rs

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,27 @@
1-
use std::sync::Arc;
1+
use core::fmt;
2+
use std::{fmt::Display, sync::Arc};
23

3-
use tokio::sync::RwLock;
4+
use log::error;
5+
use tokio::{signal, sync::Mutex};
46

5-
use crate::service::{PinnedBoxedFuture, Service, ServiceManager, ServiceManagerBuilder};
7+
use crate::service::{
8+
types::LifetimedPinnedBoxedFuture, OverallStatus, Service, ServiceManager, ServiceManagerBuilder,
9+
};
10+
11+
#[derive(Debug, Clone, Copy)]
12+
pub enum ExitReason {
13+
SIGINT,
14+
EssentialServiceFailed,
15+
}
16+
17+
impl Display for ExitReason {
18+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
19+
match self {
20+
Self::SIGINT => write!(f, "SIGINT"),
21+
Self::EssentialServiceFailed => write!(f, "Essential Service Failed"),
22+
}
23+
}
24+
}
625

726
pub struct BotBuilder {
827
name: String,
@@ -17,13 +36,13 @@ impl BotBuilder {
1736
}
1837
}
1938

20-
pub async fn with_service(mut self, service: Arc<RwLock<dyn Service>>) -> Self {
39+
pub async fn with_service(mut self, service: Arc<Mutex<dyn Service>>) -> Self {
2140
self.service_manager = self.service_manager.with_service(service).await; // The ServiceManagerBuilder itself will warn when adding a service multiple times
2241

2342
self
2443
}
2544

26-
pub async fn with_services(mut self, services: Vec<Arc<RwLock<dyn Service>>>) -> Self {
45+
pub async fn with_services(mut self, services: Vec<Arc<Mutex<dyn Service>>>) -> Self {
2746
for service in services {
2847
self.service_manager = self.service_manager.with_service(service).await;
2948
}
@@ -50,18 +69,56 @@ impl Bot {
5069
}
5170

5271
//TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a future
53-
pub fn start(&mut self) -> PinnedBoxedFuture<'_, ()> {
72+
pub fn start(&mut self) -> LifetimedPinnedBoxedFuture<'_, ()> {
5473
Box::pin(async move {
5574
self.service_manager.start_services().await;
5675
//TODO: Potential for further initialization here, like modules
5776
})
5877
}
5978

6079
//TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a future
61-
pub fn stop(&mut self) -> PinnedBoxedFuture<'_, ()> {
80+
pub fn stop(&mut self) -> LifetimedPinnedBoxedFuture<'_, ()> {
6281
Box::pin(async move {
6382
self.service_manager.stop_services().await;
6483
//TODO: Potential for further deinitialization here, like modules
6584
})
6685
}
86+
87+
pub async fn join(&self) -> ExitReason {
88+
let name_clone = self.name.clone();
89+
let signal_task = tokio::spawn(async move {
90+
let name = name_clone;
91+
92+
let result = signal::ctrl_c().await;
93+
if let Err(error) = result {
94+
error!(
95+
"Error receiving SIGINT: {}. {} will exit ungracefully immediately to prevent undefined behavior.",
96+
error, name
97+
);
98+
panic!("Error receiving SIGINT: {}", error);
99+
}
100+
});
101+
102+
let service_manager_clone = self.service_manager.clone();
103+
let mut receiver = self
104+
.service_manager
105+
.on_status_change
106+
.event
107+
.subscribe_channel("t", 2, true, true)
108+
.await;
109+
let status_task = tokio::spawn(async move {
110+
let service_manager = service_manager_clone;
111+
while (receiver.receiver.recv().await).is_some() {
112+
let overall_status = service_manager.overall_status().await;
113+
if overall_status == OverallStatus::Unhealthy {
114+
return;
115+
}
116+
}
117+
});
118+
119+
tokio::select! {
120+
_ = signal_task => ExitReason::SIGINT,
121+
_ = status_task => ExitReason::EssentialServiceFailed,
122+
}
123+
}
67124
}

src/event.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
pub mod arc_observable;
2+
pub mod event;
3+
pub mod event_repeater;
4+
pub mod observable;
5+
pub mod subscriber;
6+
pub mod subscription;
7+
8+
pub use arc_observable::ArcObservable;
9+
pub use event::Event;
10+
pub use event_repeater::EventRepeater;
11+
pub use observable::{Observable, ObservableResult};
12+
pub use subscriber::{Callback, DispatchError, Subscriber};
13+
pub use subscription::{ReceiverSubscription, Subscription};

src/event/arc_observable.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
use std::{
2+
hash::{DefaultHasher, Hash, Hasher},
3+
sync::Arc,
4+
};
5+
6+
use tokio::sync::Mutex;
7+
8+
use super::{Event, ObservableResult};
9+
10+
#[derive(Debug)]
11+
pub struct ArcObservable<T>
12+
where
13+
T: Send + 'static + Hash,
14+
{
15+
value: Arc<Mutex<T>>,
16+
on_change: Event<Mutex<T>>,
17+
}
18+
19+
impl<T> ArcObservable<T>
20+
where
21+
T: Send + 'static + Hash,
22+
{
23+
pub fn new(value: T, event_name: impl Into<String>) -> Self {
24+
Self {
25+
value: Arc::new(Mutex::new(value)),
26+
on_change: Event::new(event_name),
27+
}
28+
}
29+
30+
pub async fn get(&self) -> Arc<Mutex<T>> {
31+
Arc::clone(&self.value)
32+
}
33+
34+
pub async fn set(&self, value: T) -> ObservableResult<Mutex<T>> {
35+
let mut lock = self.value.lock().await;
36+
37+
let mut hasher = DefaultHasher::new();
38+
(*lock).hash(&mut hasher);
39+
let current_value = hasher.finish();
40+
41+
let mut hasher = DefaultHasher::new();
42+
value.hash(&mut hasher);
43+
let new_value = hasher.finish();
44+
45+
if current_value == new_value {
46+
return ObservableResult::Unchanged;
47+
}
48+
49+
*lock = value;
50+
drop(lock);
51+
52+
let value = Arc::clone(&self.value);
53+
let dispatch_result = self.on_change.dispatch(value).await;
54+
55+
match dispatch_result {
56+
Ok(_) => ObservableResult::Changed(Ok(())),
57+
Err(errors) => ObservableResult::Changed(Err(errors)),
58+
}
59+
}
60+
}

0 commit comments

Comments
 (0)