Skip to content

Commit f2907a2

Browse files
authored
add yielder type to axum examples (#4)
1 parent 1ab7523 commit f2907a2

File tree

4 files changed

+181
-167
lines changed

4 files changed

+181
-167
lines changed

examples/axum-activity-feed.rs

Lines changed: 55 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use {
2-
asynk_strim::stream_fn,
2+
asynk_strim::{Yielder, stream_fn},
33
axum::{
44
Router,
55
extract::Path,
6-
response::{Html, IntoResponse, Sse},
6+
response::{Html, IntoResponse, Sse, sse::Event},
77
routing::{get, post},
88
},
99
core::{convert::Infallible, error::Error, time::Duration},
@@ -80,36 +80,38 @@ async fn generate(ReadSignals(signals): ReadSignals<Signals>) -> impl IntoRespon
8080
let mut done = signals.done;
8181

8282
// Start the SSE stream
83-
Sse::new(stream_fn(move |mut yielder| async move {
84-
// Signal event generation start
85-
let patch = PatchSignals::new(r#"{{"generating": true}}"#);
86-
let sse_event = patch.write_as_axum_sse_event();
87-
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
88-
89-
// Yield the events elements and signals to the stream
90-
for _ in 1..=signals.events {
91-
total += 1;
92-
done += 1;
93-
// Append a new entry to the activity feed
94-
let elements = event_entry(&Status::Done, total, "Auto");
95-
let patch = PatchElements::new(elements)
96-
.selector("#feed")
97-
.mode(ElementPatchMode::After);
83+
Sse::new(stream_fn(
84+
move |mut yielder: Yielder<Result<Event, Infallible>>| async move {
85+
// Signal event generation start
86+
let patch = PatchSignals::new(r#"{{"generating": true}}"#);
9887
let sse_event = patch.write_as_axum_sse_event();
99-
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
100-
101-
// Update the event counts
102-
let patch = PatchSignals::new(format!(r#"{{"total": {total}, "done": {done}}}"#));
88+
yielder.yield_item(Ok(sse_event)).await;
89+
90+
// Yield the events elements and signals to the stream
91+
for _ in 1..=signals.events {
92+
total += 1;
93+
done += 1;
94+
// Append a new entry to the activity feed
95+
let elements = event_entry(&Status::Done, total, "Auto");
96+
let patch = PatchElements::new(elements)
97+
.selector("#feed")
98+
.mode(ElementPatchMode::After);
99+
let sse_event = patch.write_as_axum_sse_event();
100+
yielder.yield_item(Ok(sse_event)).await;
101+
102+
// Update the event counts
103+
let patch = PatchSignals::new(format!(r#"{{"total": {total}, "done": {done}}}"#));
104+
let sse_event = patch.write_as_axum_sse_event();
105+
yielder.yield_item(Ok(sse_event)).await;
106+
tokio::time::sleep(Duration::from_millis(signals.interval)).await;
107+
}
108+
109+
// Signal event generation end
110+
let patch = PatchSignals::new(r#"{{"generating": false}}"#);
103111
let sse_event = patch.write_as_axum_sse_event();
104-
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
105-
tokio::time::sleep(Duration::from_millis(signals.interval)).await;
106-
}
107-
108-
// Signal event generation end
109-
let patch = PatchSignals::new(r#"{{"generating": false}}"#);
110-
let sse_event = patch.write_as_axum_sse_event();
111-
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
112-
}))
112+
yielder.yield_item(Ok(sse_event)).await;
113+
},
114+
))
113115
}
114116

115117
/// Creates one event with a given status
@@ -118,27 +120,29 @@ async fn event(
118120
ReadSignals(signals): ReadSignals<Signals>,
119121
) -> impl IntoResponse {
120122
// Create the event stream, since we're patching both an element and a signal.
121-
Sse::new(stream_fn(move |mut yielder| async move {
122-
// Signal the updated event counts
123-
let total = signals.total + 1;
124-
let signals = match status {
125-
Status::Done => format!(r#"{{"total": {total}, "done": {}}}"#, signals.done + 1),
126-
Status::Warn => format!(r#"{{"total": {total}, "warn": {}}}"#, signals.warn + 1),
127-
Status::Fail => format!(r#"{{"total": {total}, "fail": {}}}"#, signals.fail + 1),
128-
Status::Info => format!(r#"{{"total": {total}, "info": {}}}"#, signals.info + 1),
129-
};
130-
let patch = PatchSignals::new(signals);
131-
let sse_signal = patch.write_as_axum_sse_event();
132-
yielder.yield_item(Ok::<_, Infallible>(sse_signal)).await;
133-
134-
// Patch an element and append it to the feed
135-
let elements = event_entry(&status, total, "Manual");
136-
let patch = PatchElements::new(elements)
137-
.selector("#feed")
138-
.mode(ElementPatchMode::After);
139-
let sse_event = patch.write_as_axum_sse_event();
140-
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
141-
}))
123+
Sse::new(stream_fn(
124+
move |mut yielder: Yielder<Result<Event, Infallible>>| async move {
125+
// Signal the updated event counts
126+
let total = signals.total + 1;
127+
let signals = match status {
128+
Status::Done => format!(r#"{{"total": {total}, "done": {}}}"#, signals.done + 1),
129+
Status::Warn => format!(r#"{{"total": {total}, "warn": {}}}"#, signals.warn + 1),
130+
Status::Fail => format!(r#"{{"total": {total}, "fail": {}}}"#, signals.fail + 1),
131+
Status::Info => format!(r#"{{"total": {total}, "info": {}}}"#, signals.info + 1),
132+
};
133+
let patch = PatchSignals::new(signals);
134+
let sse_signal = patch.write_as_axum_sse_event();
135+
yielder.yield_item(Ok(sse_signal)).await;
136+
137+
// Patch an element and append it to the feed
138+
let elements = event_entry(&status, total, "Manual");
139+
let patch = PatchElements::new(elements)
140+
.selector("#feed")
141+
.mode(ElementPatchMode::After);
142+
let sse_event = patch.write_as_axum_sse_event();
143+
yielder.yield_item(Ok(sse_event)).await;
144+
},
145+
))
142146
}
143147

144148
/// Returns an HTML string for the entry

examples/axum-hello.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use {
2-
asynk_strim::stream_fn,
2+
asynk_strim::{Yielder, stream_fn},
33
axum::{
44
Router,
5-
response::{Html, IntoResponse, Sse},
5+
response::{Html, IntoResponse, Sse, sse::Event},
66
routing::get,
77
},
88
core::{convert::Infallible, error::Error, time::Duration},
@@ -49,15 +49,17 @@ pub struct Signals {
4949
}
5050

5151
async fn hello_world(ReadSignals(signals): ReadSignals<Signals>) -> impl IntoResponse {
52-
Sse::new(stream_fn(move |mut yielder| async move {
53-
for i in 0..MESSAGE.len() {
54-
let elements = format!("<div id='message'>{}</div>", &MESSAGE[0..i + 1]);
55-
let patch = PatchElements::new(elements);
56-
let sse_event = patch.write_as_axum_sse_event();
52+
Sse::new(stream_fn(
53+
move |mut yielder: Yielder<Result<Event, Infallible>>| async move {
54+
for i in 0..MESSAGE.len() {
55+
let elements = format!("<div id='message'>{}</div>", &MESSAGE[0..i + 1]);
56+
let patch = PatchElements::new(elements);
57+
let sse_event = patch.write_as_axum_sse_event();
5758

58-
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
59+
yielder.yield_item(Ok(sse_event)).await;
5960

60-
tokio::time::sleep(Duration::from_millis(signals.delay)).await;
61-
}
62-
}))
61+
tokio::time::sleep(Duration::from_millis(signals.delay)).await;
62+
}
63+
},
64+
))
6365
}

examples/axum-live-reload.rs

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use {
2-
asynk_strim::stream_fn,
2+
asynk_strim::{Yielder, stream_fn},
33
axum::{
44
Router,
5-
response::{Html, IntoResponse, Sse},
5+
response::{Html, IntoResponse, Sse, sse::Event},
66
routing::get,
77
},
88
core::{convert::Infallible, error::Error, time::Duration},
@@ -72,17 +72,21 @@ async fn hotreload() -> impl IntoResponse {
7272
// tracking against a date or version stored in a cookie
7373
// or by some other means.
7474

75+
use asynk_strim::Yielder;
76+
use axum::response::sse;
7577
use datastar::prelude::ExecuteScript;
7678
static ONCE: atomic::AtomicBool = atomic::AtomicBool::new(false);
7779

78-
Sse::new(stream_fn(move |mut yielder| async move {
79-
if !ONCE.swap(true, atomic::Ordering::SeqCst) {
80-
let script = ExecuteScript::new("window.location.reload()");
81-
let sse_event = script.write_as_axum_sse_event();
82-
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
83-
}
84-
std::future::pending().await
85-
}))
80+
Sse::new(stream_fn(
81+
|mut yielder: Yielder<Result<sse::Event, Infallible>>| async move {
82+
if !ONCE.swap(true, atomic::Ordering::SeqCst) {
83+
let script = ExecuteScript::new("window.location.reload()");
84+
let sse_event = script.write_as_axum_sse_event();
85+
yielder.yield_item(Ok(sse_event)).await;
86+
}
87+
std::future::pending().await
88+
},
89+
))
8690
}
8791

8892
const MESSAGE: &str = "Hello, world!";
@@ -93,15 +97,17 @@ pub struct Signals {
9397
}
9498

9599
async fn hello_world(ReadSignals(signals): ReadSignals<Signals>) -> impl IntoResponse {
96-
Sse::new(stream_fn(move |mut yielder| async move {
97-
for i in 0..MESSAGE.len() {
98-
let elements = format!("<div id='message'>{}</div>", &MESSAGE[0..i + 1]);
99-
let patch = PatchElements::new(elements);
100-
let sse_event = patch.write_as_axum_sse_event();
101-
102-
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
103-
104-
tokio::time::sleep(Duration::from_millis(signals.delay)).await;
105-
}
106-
}))
100+
Sse::new(stream_fn(
101+
move |mut yielder: Yielder<Result<Event, Infallible>>| async move {
102+
for i in 0..MESSAGE.len() {
103+
let elements = format!("<div id='message'>{}</div>", &MESSAGE[0..i + 1]);
104+
let patch = PatchElements::new(elements);
105+
let sse_event = patch.write_as_axum_sse_event();
106+
107+
yielder.yield_item(Ok(sse_event)).await;
108+
109+
tokio::time::sleep(Duration::from_millis(signals.delay)).await;
110+
}
111+
},
112+
))
107113
}

0 commit comments

Comments
 (0)