Async system functions and first-class async support? #2677
Replies: 10 comments 20 replies
-
This is a great analysis. @Ratysz and @TheRawMeatball do you have thoughts on how this might fit into the scheduling API? |
Beta Was this translation helpful? Give feedback.
-
The implementation of async systems you are advocating for (where they are automatically blocked on in bevy's own code then dropped within the same frame) would be a massive footgun. I think the correct way to do this is something like: fn handle_tasks(
mut commands: Commands,
mut transform_tasks: Query<(Entity, &mut Task<Transform>)>,
box_mesh_handle: Res<BoxMeshHandle>,
box_material_handle: Res<BoxMaterialHandle>,
) {
let task = async move {
for (entity, mut task) in transform_tasks.iter_mut() {
if let Some(transform) = future::poll_once(&mut *task).await {
// Add our new PbrBundle of components to our tagged entity
commands.entity(entity).insert_bundle(PbrBundle {
mesh: box_mesh_handle.0.clone(),
material: box_material_handle.0.clone(),
transform,
..Default::default()
});
// Task is complete, so remove task component from entity
commands.entity(entity).remove::<Task<Transform>>();
}
}
}
futures::block_on(task);
} |
Beta Was this translation helpful? Give feedback.
-
I don't think we need to do anything on Bevy side for this: there is no one-size-fits-all way of applying task results because this is entirely user code, and the amount of boilerplate a built-in general solution would remove is next to nothing. If I were to develop that example from scratch, I would try something like a local |
Beta Was this translation helpful? Give feedback.
-
@Ratysz That's true for the logic inside a user system function but I don't think the scheduler/ BTW, I've got another PR to enable alternative async schedulers like tokio, is that something feasible? |
Beta Was this translation helpful? Give feedback.
-
@DJMcNab Could you give a concrete example? My take is otherwise, like ppl may use some other scheduler / async frameworks and cause |
Beta Was this translation helpful? Give feedback.
-
Something important that doesn't seem to have been mentioned is the Something else that's very important is, references to anything that lives in the world, be it a Query or a Res can never live across an |
Beta Was this translation helpful? Give feedback.
-
Here is another example. We have an async system that runs at the same time as it's sync counterpart would. So that we can use ecs directly. Inside the game loop we poll once on every update. We are basically making a system that polls a future with captured references at correct time. But how can we change a reference that is captured by a future? We capture a reference to a reference. Before polling a future we must update all references to the correct location. But now we have another problem. User could copy inner reference and hold it across an await. For that we use guards. We make them We could use a proc macro to hide boilerplate of updating references. For an optimization we could store a reference to a structure of references that is generated by a proc macro. And remove Pros
Cons
// How it could look like
#[async_system]
async fn async_system(name: AsyncRes<String>) {
println!("before");
{
let guard = name.get();
dbg!(guard.deref());
}
MyFuture(false).await;
println!("after");
let guard = name.get();
dbg!(guard.deref());
} Working demouse std::{
future::Future,
marker::PhantomData,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use bevy_ecs::prelude::*;
use futures_lite::future;
fn wrapper_system<'w>(
mut query: Query<(Entity, Option<&mut BevyFuture<()>>)>,
mut res: ResMut<'w, Vec<String>>,
mut commands: Commands,
mut polled: Local<u8>,
) {
match query.get_single_mut() {
Ok((id, Some(mut fut))) => {
let captured_reference = &*fut.captured.0;
if *polled == 1 {
println!("changing");
res.push(String::from("This string has been changed and moved"));
unsafe {
*(captured_reference as *const _ as *mut &String) = &res[1];
}
}
match future::block_on(future::poll_once(&mut fut.future)) {
None => {}
Some(_) => {
commands.entity(id).despawn();
}
}
*polled += 1;
}
_ => {
println!("spawning another future");
let reference = &res[0];
commands.spawn().insert(desugared_async_system(reference));
}
}
}
fn desugared_async_system<'a>(item: &'a String) -> BevyFuture<()> {
let mut item = AsyncRef::new(item);
BevyFuture {
captured: item.clone(),
future: Box::pin(async move {
println!("before");
{
let guard = item.get();
dbg!(guard.deref());
}
MyFuture(false).await;
println!("after");
let guard = item.get();
dbg!(guard.deref());
}),
}
}
#[derive(Component)]
struct BevyFuture<F> {
captured: AsyncRef<String>,
future: Pin<Box<dyn Future<Output = F> + Send + Sync>>,
}
struct AsyncRef<T: 'static>(Arc<&'static T>);
impl<T> Clone for AsyncRef<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
struct Guard<'a, T> {
reference: &'a T,
_t: PhantomData<*const ()>,
}
impl<'a, T> Guard<'a, T> {
pub fn deref(&'a self) -> &'a T {
self.reference
}
}
impl<T: 'static> AsyncRef<T> {
pub fn new(value: &T) -> Self {
// change to 'static lifetime
Self(Arc::new(unsafe { std::mem::transmute(value) }))
}
pub fn get<'a>(&'a mut self) -> Guard<'a, T> {
let a = *self.0;
Guard {
reference: *self.0,
_t: Default::default(),
}
}
}
struct MyFuture(bool);
impl Future for MyFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.0 {
return Poll::Ready(());
}
self.0 = true;
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use bevy::prelude::App;
use crate::wrapper_system;
#[test]
fn test() {
let mut app = App::empty();
app.add_default_stages()
.insert_resource(vec!["Hello world".to_string()]);
app.add_system(wrapper_system);
app.update();
app.update();
app.update();
app.update();
app.update();
app.update();
}
} |
Beta Was this translation helpful? Give feedback.
-
While there is not about first-class async functions, but could be related. commands.add(
Promise::start(asyn!(state, time: Res<Time> => {
info!("Wait a second..");
let started_at = time.elapsed_seconds();
state.with(started_at).asyn().timeout(1.0)
}))
.then(asyn!(state, _ => {
info!("How large is is the Bevy main web page?");
state.asyn().http().get("https://bevyengine.org")
}))
.then(asyn!(state, result => {
match result {
Ok(response) => info!("It is {} bytes!", response.bytes.len()),
Err(err) => info!("Ahhh... something goes wrong: {err}")
}
state.pass()
}))
.then(asyn!(state, _, time: Res<Time>, mut exit: EventWriter<AppExit> => {
let duration = time.elapsed_seconds() - state.value;
info!("It tooks {duration:0.2}s to do this job.");
info!("Exiting now");
exit.send(AppExit);
state.pass()
})),
); |
Beta Was this translation helpful? Give feedback.
-
I also have https://github.com/frewsxcv/bevy_jobs which can be used to spawn async jobs in the background |
Beta Was this translation helpful? Give feedback.
-
Has anyone used Bevy to make an online game in practice? Networking should rely on Rust’s async ecosystem, right? Of course, using the Rust standard library’s sync APIs is also possible. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
In the
async_compute
example, users need toblock_on
the result ofpoll_once
, which is inconvenient and may get big performance penalty if they use multipleblock_on
(s) instead a single one, the performance gap can be consistently verified with the benchmark program in this PRProposed API change: #2691
It's ideal if a single
block_on
can be performed inside ECS by allowing async system functions, and I think using a singleblock_on
for all async system functions on every frame update is the best way (in terms of performance).Regarding the performance gap, a bevy Task is not a system thread (coordinated by OS) but actually a fiber or green thread or coroutine, you name it, which should ideally be coordinated with a single scheduler(
block_on
) to get the best performance.And in terms of first-class async support, I would consider it's missing from (current) bevy, as long as it requires users to write ad-hoc
block_on
(s).As @TheRawMeatball has pointed out, tasks in this example are scheduled with ECS scheduler and then I think there should not be any
poll_once
orblock_on
in user code at all. Another spawn method in TaskPool that returnsTaskWrapper
below which contains the Option result will work, in apoll_once
andblock_on
free way, and my test shows it has the best performance as there's no extra overhead and the API is much cleaner.Beta Was this translation helpful? Give feedback.
All reactions