Skip to content

Commit 38e1db1

Browse files
committed
docs: document revolt-coalesced
1 parent 5b1a78f commit 38e1db1

File tree

12 files changed

+154
-75
lines changed

12 files changed

+154
-75
lines changed

crates/core/coalesced/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ tokio = ["dep:tokio"]
1111
queue = ["dep:indexmap"]
1212
cache = ["dep:lru"]
1313

14-
default = ["tokio", "queue", "cache"]
14+
default = ["tokio"]
1515

1616
[dependencies]
1717
tokio = { version = "1.47.0", features = ["sync"], optional = true }

crates/core/coalesced/src/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#[derive(Clone, PartialEq, Eq, Debug)]
2+
/// Config values for [`CoalescionService`].
23
pub struct CoalescionServiceConfig {
34
/// How many tasks are running at once
45
pub max_concurrent: Option<usize>,

crates/core/coalesced/src/error.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,25 @@
1-
use std::fmt::Display;
1+
use std::fmt;
22

3-
#[derive(Clone, PartialEq, Eq, Debug)]
3+
#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash)]
4+
/// Coalescion service error.
45
pub enum Error {
6+
/// Failed to receive the actions return from the channel for unknown reason
57
RecvError,
8+
/// Reached the `max_concurrent` amount of actions running at once and could not queue the action
69
MaxConcurrent,
10+
/// Reached the `max_queue` amount of actions in the queue
711
MaxQueue,
12+
/// Failed to downcast the type to the current type being returned, this will be most likely an ID collision
13+
DowncastError,
814
}
915

10-
impl Display for Error {
11-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16+
impl fmt::Display for Error {
17+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1218
match self {
1319
Error::RecvError => write!(f, "Unable to receive data from the channel"),
1420
Error::MaxConcurrent => write!(f, "Max number of tasks running at once"),
1521
Error::MaxQueue => write!(f, "Max number of tasks in queue"),
22+
Error::DowncastError => write!(f, "Failed to downcast type, possible key collision with different types")
1623
}
1724
}
1825
}

crates/core/coalesced/src/lib.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,39 @@
1+
//! # Coalesced
2+
//!
3+
//! Coalescion service to group, caching and queue duplicate actions.
4+
//! useful for deduplicating web requests, database lookups and other similar resource
5+
//! intensive or rate-limited actions.
6+
//!
7+
//! ## Features
8+
//! - `tokio`: Uses tokio for the async backend, this is currently the only backend.
9+
//! - `queue`: Whether to support queueing requests to only allow X amount of actions running at once.
10+
//! - `cache`: Whether to cache the actions results for future actions with the same id, uses an LRU cache internally.
11+
//!
12+
//! [`CoalescionService`] uses both [`Arc`] and [`RwLock`] internally and can be cheaply cloned to
13+
//! use in your codebase.
14+
//!
15+
//! It is common practice to wrap the service and in your own which delegates the executions to ensure all ids are tracked in one location across your codebase.
16+
//!
17+
//! All values are stored using [`Any`] and must be [`'static`] + [`Send`] + [`Sync`], if there is an id mismatch
18+
//! and a type is wrong the library will return an error, values returned from the service are also
19+
//! wrapped in an [`Arc`] as they are shared to each duplicate action.
20+
//!
21+
//! ## Example:
22+
//! ```rs
23+
//! use revolt_coalesced::CoalescionService;
24+
//!
25+
//! let service = CoalescionService::new();
26+
//!
27+
//! let user_id = "my_user_id";
28+
//! let user = service.execute(user_id, || async move {
29+
//! database.fetch_user(user_id).await.unwrap()
30+
//! }).await;
31+
//! ```
32+
133
mod config;
234
mod error;
335
mod service;
436

537
pub use config::CoalescionServiceConfig;
638
pub use error::Error;
7-
pub use service::CoalescionService;
39+
pub use service::CoalescionService;

crates/core/coalesced/src/service.rs

Lines changed: 64 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
use std::{collections::{HashMap}, fmt::Debug, hash::Hash, sync::Arc, future::Future};
1+
use std::{any::Any, collections::HashMap, fmt::Debug, future::Future, hash::Hash, sync::Arc};
22

3-
use tokio::{sync::{watch::{channel as watch_channel, Receiver}, RwLock, Mutex}};
3+
use tokio::sync::{
4+
watch::{channel as watch_channel, Receiver},
5+
RwLock,
6+
};
47

58
#[cfg(feature = "cache")]
69
use lru::LruCache;
@@ -10,20 +13,23 @@ use indexmap::IndexMap;
1013

1114
use crate::{CoalescionServiceConfig, Error};
1215

13-
#[derive(Clone, Debug)]
16+
#[derive(Debug, Clone)]
1417
#[allow(clippy::type_complexity)]
15-
pub struct CoalescionService<Id: Hash + Eq, Value> {
18+
/// # Coalescion service
19+
///
20+
/// See module description for example usage.
21+
pub struct CoalescionService<Id: Hash + Clone + Eq> {
1622
config: Arc<CoalescionServiceConfig>,
17-
watchers: Arc<RwLock<HashMap<Id, Receiver<Option<Result<Arc<Value>, Error>>>>>>,
23+
watchers: Arc<RwLock<HashMap<Id, Receiver<Option<Result<Arc<dyn Any + Send + Sync>, Error>>>>>>,
1824
#[cfg(feature = "queue")]
19-
queue: Arc<RwLock<IndexMap<Id, Receiver<Option<Result<Arc<Value>, Error>>>>>>,
25+
queue: Arc<RwLock<IndexMap<Id, Receiver<Option<Result<Arc<dyn Any + Send + Sync>, Error>>>>>>,
2026
#[cfg(feature = "cache")]
21-
cache: Option<Arc<Mutex<LruCache<Id, Arc<Value>>>>>,
27+
cache: Option<Arc<tokio::sync::Mutex<LruCache<Id, Arc<dyn Any + Send + Sync>>>>>,
2228
}
2329

24-
impl<Id: Hash + PartialEq + Eq + Clone + Ord, Value> CoalescionService<Id, Value> {
30+
impl<Id: Hash + Clone + Eq> CoalescionService<Id> {
2531
pub fn new() -> Self {
26-
Self::default()
32+
Default::default()
2733
}
2834

2935
pub fn from_config(config: CoalescionServiceConfig) -> Self {
@@ -38,30 +44,45 @@ impl<Id: Hash + PartialEq + Eq + Clone + Ord, Value> CoalescionService<Id, Value
3844
}
3945

4046
#[cfg(feature = "cache")]
41-
pub fn from_cache(config: CoalescionServiceConfig, cache: LruCache<Id, Arc<Value>>) -> Self {
47+
pub fn from_cache(
48+
config: CoalescionServiceConfig,
49+
cache: LruCache<Id, Arc<dyn Any + Send + Sync>>,
50+
) -> Self {
4251
Self {
4352
cache: Some(Arc::new(Mutex::new(cache))),
4453
..Self::from_config(config)
4554
}
4655
}
4756

48-
async fn wait_for(&self, mut receiver: Receiver<Option<Result<Arc<Value>, Error>>>) -> Result<Arc<Value>, Error> {
57+
async fn wait_for<Value: Any + Send + Sync>(
58+
&self,
59+
mut receiver: Receiver<Option<Result<Arc<dyn Any + Send + Sync>, Error>>>,
60+
) -> Result<Arc<Value>, Error> {
4961
receiver
5062
.wait_for(|v| v.is_some())
5163
.await
5264
.map_err(|_| Error::RecvError)
5365
.and_then(|r| r.clone().unwrap())
66+
.and_then(|arc| Arc::downcast(arc).map_err(|_| Error::DowncastError))
5467
}
5568

56-
async fn insert_and_execute<F: FnOnce() -> Fut, Fut: Future<Output = Value>>(&self, id: Id, func: F) -> Result<Arc<Value>, Error> {
69+
async fn insert_and_execute<
70+
Value: Send + Sync + 'static,
71+
F: FnOnce() -> Fut,
72+
Fut: Future<Output = Value>,
73+
>(
74+
&self,
75+
id: Id,
76+
func: F,
77+
) -> Result<Arc<Value>, Error> {
5778
let (send, recv) = watch_channel(None);
5879

5980
self.watchers.write().await.insert(id.clone(), recv);
6081

6182
let value = Ok(Arc::new(func().await));
6283

6384
send.send_modify(|opt| {
64-
opt.replace(value.clone());
85+
opt.replace(value.clone().map(|v| v as Arc<dyn Any + Send + Sync>));
6586
});
6687

6788
#[cfg(feature = "cache")]
@@ -76,11 +97,21 @@ impl<Id: Hash + PartialEq + Eq + Clone + Ord, Value> CoalescionService<Id, Value
7697
value
7798
}
7899

79-
pub async fn execute<F: FnOnce() -> Fut, Fut: Future<Output = Value>>(&self, id: Id, func: F) -> Result<Arc<Value>, Error> {
100+
/// Coalesces an function, the actual function may not run if one with the same id is already running,
101+
/// queued to be ran, or cached, the id should be globally unique for this specific action.
102+
pub async fn execute<
103+
Value: Send + Sync + 'static,
104+
F: FnOnce() -> Fut,
105+
Fut: Future<Output = Value>,
106+
>(
107+
&self,
108+
id: Id,
109+
func: F,
110+
) -> Result<Arc<Value>, Error> {
80111
#[cfg(feature = "cache")]
81112
if let Some(cache) = self.cache.as_ref() {
82113
if let Some(value) = cache.lock().await.get(&id) {
83-
return Ok(value.clone())
114+
return Arc::downcast::<Value>(value.clone()).map_err(|_| Error::DowncastError);
84115
}
85116
};
86117

@@ -105,10 +136,14 @@ impl<Id: Hash + PartialEq + Eq + Clone + Ord, Value> CoalescionService<Id, Value
105136
};
106137

107138
if let Some(receiver) = receiver {
108-
return self.wait_for(receiver).await
139+
return self.wait_for(receiver).await;
109140
} else {
110-
if self.config.max_queue.is_some_and(|max_queue| max_queue >= length) {
111-
return Err(Error::MaxQueue)
141+
if self
142+
.config
143+
.max_queue
144+
.is_some_and(|max_queue| max_queue >= length)
145+
{
146+
return Err(Error::MaxQueue);
112147
};
113148

114149
let (send, recv) = watch_channel(None);
@@ -130,10 +165,14 @@ impl<Id: Hash + PartialEq + Eq + Clone + Ord, Value> CoalescionService<Id, Value
130165
let response = self.insert_and_execute(id, func).await;
131166

132167
send.send_modify(|opt| {
133-
opt.replace(response.clone());
168+
opt.replace(
169+
response
170+
.clone()
171+
.map(|v| v as Arc<dyn Any + Send + Sync>),
172+
);
134173
});
135174

136-
return response
175+
return response;
137176
}
138177
}
139178
}
@@ -145,23 +184,24 @@ impl<Id: Hash + PartialEq + Eq + Clone + Ord, Value> CoalescionService<Id, Value
145184
#[cfg(not(feature = "queue"))]
146185
Err(Error::MaxConcurrent)
147186
}
148-
_ => {
149-
self.insert_and_execute(id, func).await
150-
}
187+
_ => self.insert_and_execute(id, func).await,
151188
}
152189
}
153190
}
154191

192+
/// Fetches the amount of currently running tasks
155193
pub async fn current_task_count(&self) -> usize {
156194
self.watchers.read().await.len()
157195
}
158196

197+
#[cfg(feature = "queue")]
198+
/// Fetches the current length of the queue
159199
pub async fn current_queue_len(&self) -> usize {
160200
self.queue.read().await.len()
161201
}
162202
}
163203

164-
impl<Id: Hash + PartialEq + Eq + Clone + Ord, Value> Default for CoalescionService<Id, Value> {
204+
impl<Id: Hash + Clone + Eq> Default for CoalescionService<Id> {
165205
fn default() -> Self {
166206
Self::from_config(CoalescionServiceConfig::default())
167207
}

crates/services/gifbox/Cargo.toml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,13 @@ revolt-result = { version = "0.8.8", path = "../../core/result", features = [
2222
"utoipa",
2323
"axum",
2424
] }
25-
revolt-coalesced = { version = "0.8.8", path = "../../core/coalesced" }
26-
revolt-database = { version = "0.8.8", path = "../../core/database", features = ["axum-impl"] }
25+
revolt-coalesced = { version = "0.8.8", path = "../../core/coalesced", features = [
26+
"queue",
27+
] }
28+
revolt-database = { version = "0.8.8", path = "../../core/database", features = [
29+
"axum-impl",
30+
] }
31+
2732
# Axum / web server
2833
axum = { version = "0.7.5" }
2934
axum-extra = { version = "0.9", features = ["typed-header"] }
@@ -37,4 +42,4 @@ tracing = "0.1"
3742

3843
# Utils
3944
lru_time_cache = "0.11.11"
40-
urlencoding = "2.1.3"
45+
urlencoding = "2.1.3"

crates/services/gifbox/src/routes/categories.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::{tenor, types};
1111

1212
#[derive(Deserialize, IntoParams)]
1313
pub struct CategoriesQueryParams {
14+
/// Users locale
1415
#[param(example = "en_US")]
1516
pub locale: String,
1617
}

crates/services/gifbox/src/routes/root.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,4 @@ pub async fn root() -> Json<types::RootResponse<'static>> {
1919
message: "Gifbox lives on!",
2020
version: CRATE_VERSION,
2121
})
22-
}
22+
}

crates/services/gifbox/src/routes/search.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,17 @@ use crate::{tenor, types};
1111

1212
#[derive(Deserialize, IntoParams)]
1313
pub struct SearchQueryParams {
14+
/// Search query
1415
#[param(example = "Wave")]
1516
pub query: String,
17+
/// Users locale
1618
#[param(example = "en_US")]
1719
pub locale: String,
20+
/// Amount of results to respond with
1821
pub limit: Option<u32>,
22+
/// Flag for if searching in a gif category
1923
pub is_category: Option<bool>,
24+
/// Value of `next` for getting the next page of results with the current search query
2025
pub position: Option<String>,
2126
}
2227

crates/services/gifbox/src/routes/trending.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,20 @@ use axum::{
44
};
55
use revolt_database::User;
66
use revolt_result::{create_error, Result};
7-
use serde::{Deserialize};
8-
use utoipa::{IntoParams};
7+
use serde::Deserialize;
8+
use utoipa::IntoParams;
99

10-
use crate::{
11-
tenor,
12-
types,
13-
};
10+
use crate::{tenor, types};
1411

1512
#[derive(Deserialize, IntoParams)]
1613
pub struct TrendingQueryParams {
1714
#[param(example = "en_US")]
15+
/// Users locale
1816
pub locale: String,
17+
/// Amount of results to respond with
1918
pub limit: Option<u32>,
20-
pub position: Option<String>
19+
/// Value of `next` for getting the next page of results of featured gifs
20+
pub position: Option<String>,
2121
}
2222

2323
/// Trending GIFs
@@ -37,7 +37,11 @@ pub async fn trending(
3737
State(tenor): State<tenor::Tenor>,
3838
) -> Result<Json<types::PaginatedMediaResponse>> {
3939
tenor
40-
.featured(&params.locale, params.limit.unwrap_or(50), params.position.as_deref().unwrap_or_default())
40+
.featured(
41+
&params.locale,
42+
params.limit.unwrap_or(50),
43+
params.position.as_deref().unwrap_or_default(),
44+
)
4145
.await
4246
.map_err(|_| create_error!(InternalError))
4347
.map(|results| results.as_ref().clone().into())

0 commit comments

Comments
 (0)