Skip to content

Commit 1ab7523

Browse files
committed
swap async-stream in dev-deps with asynk-strim
1 parent e6e828d commit 1ab7523

File tree

6 files changed

+109
-52
lines changed

6 files changed

+109
-52
lines changed

Cargo.lock

Lines changed: 11 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ serde_json = { version = "1", default-features = false, optional = true, feature
4343
tracing = { version = "0.1.41", optional = true }
4444

4545
[dev-dependencies]
46-
async-stream = { version = "0.3.6", default-features = false }
46+
asynk-strim = { version = "0.1" }
4747
axum = { version = "0.8.4" }
4848
indexmap = { version = "2.11", features = ["serde"] }
4949
reqwest = { version = "0.12.23", features = ["json", "stream"] }

examples/axum-activity-feed.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use {
2-
async_stream::stream,
2+
asynk_strim::stream_fn,
33
axum::{
44
Router,
55
extract::Path,
@@ -80,34 +80,36 @@ async fn generate(ReadSignals(signals): ReadSignals<Signals>) -> impl IntoRespon
8080
let mut done = signals.done;
8181

8282
// Start the SSE stream
83-
Sse::new(stream! {
83+
Sse::new(stream_fn(move |mut yielder| async move {
8484
// Signal event generation start
8585
let patch = PatchSignals::new(r#"{{"generating": true}}"#);
8686
let sse_event = patch.write_as_axum_sse_event();
87-
yield Ok::<_, Infallible>(sse_event);
87+
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
8888

8989
// Yield the events elements and signals to the stream
9090
for _ in 1..=signals.events {
9191
total += 1;
9292
done += 1;
9393
// Append a new entry to the activity feed
9494
let elements = event_entry(&Status::Done, total, "Auto");
95-
let patch = PatchElements::new(elements).selector("#feed").mode(ElementPatchMode::After);
95+
let patch = PatchElements::new(elements)
96+
.selector("#feed")
97+
.mode(ElementPatchMode::After);
9698
let sse_event = patch.write_as_axum_sse_event();
97-
yield Ok::<_, Infallible>(sse_event);
99+
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
98100

99101
// Update the event counts
100102
let patch = PatchSignals::new(format!(r#"{{"total": {total}, "done": {done}}}"#));
101103
let sse_event = patch.write_as_axum_sse_event();
102-
yield Ok::<_, Infallible>(sse_event);
104+
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
103105
tokio::time::sleep(Duration::from_millis(signals.interval)).await;
104106
}
105107

106108
// Signal event generation end
107109
let patch = PatchSignals::new(r#"{{"generating": false}}"#);
108110
let sse_event = patch.write_as_axum_sse_event();
109-
yield Ok::<_, Infallible>(sse_event);
110-
})
111+
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
112+
}))
111113
}
112114

113115
/// Creates one event with a given status
@@ -116,7 +118,7 @@ async fn event(
116118
ReadSignals(signals): ReadSignals<Signals>,
117119
) -> impl IntoResponse {
118120
// Create the event stream, since we're patching both an element and a signal.
119-
Sse::new(stream! {
121+
Sse::new(stream_fn(move |mut yielder| async move {
120122
// Signal the updated event counts
121123
let total = signals.total + 1;
122124
let signals = match status {
@@ -127,14 +129,16 @@ async fn event(
127129
};
128130
let patch = PatchSignals::new(signals);
129131
let sse_signal = patch.write_as_axum_sse_event();
130-
yield Ok::<_, Infallible>(sse_signal);
132+
yielder.yield_item(Ok::<_, Infallible>(sse_signal)).await;
131133

132134
// Patch an element and append it to the feed
133135
let elements = event_entry(&status, total, "Manual");
134-
let patch = PatchElements::new(elements).selector("#feed").mode(ElementPatchMode::After);
136+
let patch = PatchElements::new(elements)
137+
.selector("#feed")
138+
.mode(ElementPatchMode::After);
135139
let sse_event = patch.write_as_axum_sse_event();
136-
yield Ok::<_, Infallible>(sse_event);
137-
})
140+
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
141+
}))
138142
}
139143

140144
/// Returns an HTML string for the entry

examples/axum-hello.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use {
2-
async_stream::stream,
2+
asynk_strim::stream_fn,
33
axum::{
44
Router,
55
response::{Html, IntoResponse, Sse},
@@ -49,15 +49,15 @@ pub struct Signals {
4949
}
5050

5151
async fn hello_world(ReadSignals(signals): ReadSignals<Signals>) -> impl IntoResponse {
52-
Sse::new(stream! {
52+
Sse::new(stream_fn(move |mut yielder| async move {
5353
for i in 0..MESSAGE.len() {
5454
let elements = format!("<div id='message'>{}</div>", &MESSAGE[0..i + 1]);
5555
let patch = PatchElements::new(elements);
5656
let sse_event = patch.write_as_axum_sse_event();
5757

58-
yield Ok::<_, Infallible>(sse_event);
58+
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
5959

6060
tokio::time::sleep(Duration::from_millis(signals.delay)).await;
6161
}
62-
})
62+
}))
6363
}

examples/axum-live-reload.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use {
2-
async_stream::stream,
2+
asynk_strim::stream_fn,
33
axum::{
44
Router,
55
response::{Html, IntoResponse, Sse},
@@ -75,14 +75,14 @@ async fn hotreload() -> impl IntoResponse {
7575
use datastar::prelude::ExecuteScript;
7676
static ONCE: atomic::AtomicBool = atomic::AtomicBool::new(false);
7777

78-
Sse::new(stream! {
78+
Sse::new(stream_fn(move |mut yielder| async move {
7979
if !ONCE.swap(true, atomic::Ordering::SeqCst) {
8080
let script = ExecuteScript::new("window.location.reload()");
8181
let sse_event = script.write_as_axum_sse_event();
82-
yield Ok::<_, Infallible>(sse_event);
82+
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
8383
}
8484
std::future::pending().await
85-
})
85+
}))
8686
}
8787

8888
const MESSAGE: &str = "Hello, world!";
@@ -93,15 +93,15 @@ pub struct Signals {
9393
}
9494

9595
async fn hello_world(ReadSignals(signals): ReadSignals<Signals>) -> impl IntoResponse {
96-
Sse::new(stream! {
96+
Sse::new(stream_fn(move |mut yielder| async move {
9797
for i in 0..MESSAGE.len() {
9898
let elements = format!("<div id='message'>{}</div>", &MESSAGE[0..i + 1]);
9999
let patch = PatchElements::new(elements);
100100
let sse_event = patch.write_as_axum_sse_event();
101101

102-
yield Ok::<_, Infallible>(sse_event);
102+
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
103103

104104
tokio::time::sleep(Duration::from_millis(signals.delay)).await;
105105
}
106-
})
106+
}))
107107
}

examples/axum-test-suite.rs

Lines changed: 69 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use {
2-
async_stream::stream,
2+
asynk_strim::stream_fn,
33
axum::{
44
Router,
55
response::{IntoResponse, Sse},
@@ -87,22 +87,50 @@ pub enum TestCaseEvent {
8787
}
8888

8989
async fn test(ReadSignals(test_case): ReadSignals<TestCase>) -> impl IntoResponse {
90-
Sse::new(stream! {
90+
Sse::new(stream_fn(move |mut yielder| async move {
9191
for event in test_case.events {
9292
let sse_event = match event {
93-
TestCaseEvent::ExecuteScript { script, event_id, retry_duration, attributes, auto_remove } => {
94-
ExecuteScript {
95-
script,
96-
id: event_id,
97-
retry: Duration::from_millis(retry_duration.unwrap_or(consts::DEFAULT_SSE_RETRY_DURATION)),
98-
auto_remove,
99-
attributes: attributes.map(|attributes| {
100-
attributes.into_iter().map(|(key, value)| format!("{key}=\"{}\"", value.to_string().trim_matches('"'))).collect()
101-
}).unwrap_or_default(),
102-
}.into_datastar_event().write_as_axum_sse_event()
93+
TestCaseEvent::ExecuteScript {
94+
script,
95+
event_id,
96+
retry_duration,
97+
attributes,
98+
auto_remove,
99+
} => ExecuteScript {
100+
script,
101+
id: event_id,
102+
retry: Duration::from_millis(
103+
retry_duration.unwrap_or(consts::DEFAULT_SSE_RETRY_DURATION),
104+
),
105+
auto_remove,
106+
attributes: attributes
107+
.map(|attributes| {
108+
attributes
109+
.into_iter()
110+
.map(|(key, value)| {
111+
format!("{key}=\"{}\"", value.to_string().trim_matches('"'))
112+
})
113+
.collect()
114+
})
115+
.unwrap_or_default(),
103116
}
104-
TestCaseEvent::PatchElements { elements, event_id, retry_duration, mode, selector, use_view_transition } => {
105-
PatchElements { id: event_id, retry: Duration::from_millis(retry_duration.unwrap_or(consts::DEFAULT_SSE_RETRY_DURATION)), elements, selector, mode: match mode.as_deref().unwrap_or_default() {
117+
.into_datastar_event()
118+
.write_as_axum_sse_event(),
119+
TestCaseEvent::PatchElements {
120+
elements,
121+
event_id,
122+
retry_duration,
123+
mode,
124+
selector,
125+
use_view_transition,
126+
} => PatchElements {
127+
id: event_id,
128+
retry: Duration::from_millis(
129+
retry_duration.unwrap_or(consts::DEFAULT_SSE_RETRY_DURATION),
130+
),
131+
elements,
132+
selector,
133+
mode: match mode.as_deref().unwrap_or_default() {
106134
"outer" => consts::ElementPatchMode::Outer,
107135
"inner" => consts::ElementPatchMode::Inner,
108136
"remove" => consts::ElementPatchMode::Remove,
@@ -112,19 +140,34 @@ async fn test(ReadSignals(test_case): ReadSignals<TestCase>) -> impl IntoRespons
112140
"before" => consts::ElementPatchMode::Before,
113141
"after" => consts::ElementPatchMode::After,
114142
_ => consts::ElementPatchMode::Outer,
115-
}, use_view_transition: use_view_transition.unwrap_or_default() }.into_datastar_event().write_as_axum_sse_event()
116-
},
117-
TestCaseEvent::PatchSignals { signals, signals_raw, event_id, retry_duration, only_if_missing } => {
118-
PatchSignals {
119-
id: event_id,
120-
retry: Duration::from_millis(retry_duration.unwrap_or(consts::DEFAULT_SSE_RETRY_DURATION)),
121-
signals: signals_raw.unwrap_or_else(|| signals.map(|s| serde_json::to_string(&s).unwrap_or_default()).unwrap_or_default()),
122-
only_if_missing: only_if_missing.unwrap_or_default(),
123-
}.into_datastar_event().write_as_axum_sse_event()
124-
},
143+
},
144+
use_view_transition: use_view_transition.unwrap_or_default(),
145+
}
146+
.into_datastar_event()
147+
.write_as_axum_sse_event(),
148+
TestCaseEvent::PatchSignals {
149+
signals,
150+
signals_raw,
151+
event_id,
152+
retry_duration,
153+
only_if_missing,
154+
} => PatchSignals {
155+
id: event_id,
156+
retry: Duration::from_millis(
157+
retry_duration.unwrap_or(consts::DEFAULT_SSE_RETRY_DURATION),
158+
),
159+
signals: signals_raw.unwrap_or_else(|| {
160+
signals
161+
.map(|s| serde_json::to_string(&s).unwrap_or_default())
162+
.unwrap_or_default()
163+
}),
164+
only_if_missing: only_if_missing.unwrap_or_default(),
165+
}
166+
.into_datastar_event()
167+
.write_as_axum_sse_event(),
125168
};
126169

127-
yield Ok::<_, Infallible>(sse_event);
170+
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
128171
}
129-
})
172+
}))
130173
}

0 commit comments

Comments
 (0)