Skip to content

Commit 3090dd8

Browse files
add cron scheduler to moose (#1742)
1 parent c18e598 commit 3090dd8

File tree

5 files changed

+105
-9
lines changed

5 files changed

+105
-9
lines changed

Cargo.lock

Lines changed: 38 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/framework-cli/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ pbkdf2 = { version = "0.12", features = ["simple"] }
7777
sha2 = "0.10.8"
7878
hex = "0.4.2"
7979
constant_time_eq = "0.3.0"
80+
tokio-cron-scheduler = "0.11.0"
8081

8182
[dev-dependencies]
8283
clickhouse = { version = "0.11.5", features = ["uuid", "test-util"] }

apps/framework-cli/src/infrastructure/processes.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use super::olap::clickhouse::{errors::ClickhouseError, mapper::std_columns_to_cl
1616

1717
pub mod aggregations_registry;
1818
pub mod consumption_registry;
19+
pub mod cron_registry;
1920
pub mod functions_registry;
2021
pub mod kafka_clickhouse_sync;
2122
pub mod process_registry;
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use std::sync::Arc;
2+
use tokio::sync::Mutex;
3+
use tokio::task;
4+
use tokio_cron_scheduler::{Job, JobScheduler, JobSchedulerError};
5+
6+
pub struct CronRegistry {
7+
scheduler: Arc<Mutex<JobScheduler>>,
8+
}
9+
10+
impl CronRegistry {
11+
pub async fn new() -> Result<Self, JobSchedulerError> {
12+
let scheduler = JobScheduler::new().await?;
13+
14+
Ok(CronRegistry {
15+
scheduler: Arc::new(Mutex::new(scheduler)),
16+
})
17+
}
18+
19+
pub async fn add_job<F>(&self, cron_expression: &str, task: F) -> Result<(), JobSchedulerError>
20+
where
21+
F: Fn() -> Result<(), String> + Send + Sync + 'static,
22+
{
23+
let task = Arc::new(task);
24+
25+
let job = Job::new_async(cron_expression, move |_uuid, _l| {
26+
let task = task.clone();
27+
Box::pin(async move {
28+
let _ = task();
29+
})
30+
})?;
31+
32+
let scheduler = self.scheduler.lock().await;
33+
scheduler.add(job).await?;
34+
35+
Ok(())
36+
}
37+
38+
pub async fn start(&self) -> Result<(), JobSchedulerError> {
39+
let scheduler: Arc<Mutex<JobScheduler>> = self.scheduler.clone();
40+
task::spawn(async move {
41+
let scheduler = scheduler.lock().await;
42+
scheduler.start().await?;
43+
44+
Ok::<(), JobSchedulerError>(())
45+
});
46+
47+
Ok(())
48+
}
49+
50+
pub async fn stop(&self) -> Result<(), JobSchedulerError> {
51+
let mut scheduler = self.scheduler.lock().await;
52+
scheduler.shutdown().await?;
53+
54+
Ok(())
55+
}
56+
}

pnpm-lock.yaml

Lines changed: 9 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)