Skip to content

Commit

Permalink
Add ParallelCommands system parameter (bevyengine#4749)
Browse files Browse the repository at this point in the history
(follow-up to bevyengine#4423)
# Objective
Currently, it isn't possible to easily fire commands from within par_for_each blocks. This PR allows for issuing commands from within parallel scopes.
  • Loading branch information
TheRawMeatball authored and james7132 committed Oct 28, 2022
1 parent ddb32e6 commit 78ee17e
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 1 deletion.
1 change: 1 addition & 0 deletions crates/bevy_ecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ bevy_utils = { path = "../bevy_utils", version = "0.8.0-dev" }
bevy_ecs_macros = { path = "macros", version = "0.8.0-dev" }

async-channel = "1.4"
thread_local = "1.1.4"
fixedbitset = "0.4"
fxhash = "0.2"
downcast-rs = "1.2"
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_ecs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub mod prelude {
},
system::{
Commands, In, IntoChainSystem, IntoExclusiveSystem, IntoSystem, Local, NonSend,
NonSendMut, ParamSet, Query, RemovedComponents, Res, ResMut, System,
NonSendMut, ParallelCommands, ParamSet, Query, RemovedComponents, Res, ResMut, System,
SystemParamFunction,
},
world::{FromWorld, Mut, World},
Expand Down
2 changes: 2 additions & 0 deletions crates/bevy_ecs/src/system/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod command_queue;
mod parallel_scope;

use crate::{
bundle::Bundle,
Expand All @@ -8,6 +9,7 @@ use crate::{
};
use bevy_utils::tracing::{error, warn};
pub use command_queue::CommandQueue;
pub use parallel_scope::*;
use std::marker::PhantomData;

use super::Resource;
Expand Down
98 changes: 98 additions & 0 deletions crates/bevy_ecs/src/system/commands/parallel_scope.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use std::cell::Cell;

use thread_local::ThreadLocal;

use crate::{
entity::Entities,
prelude::World,
system::{SystemParam, SystemParamFetch, SystemParamState},
};

use super::{CommandQueue, Commands};

#[doc(hidden)]
#[derive(Default)]
/// The internal [`SystemParamState`] of the [`ParallelCommands`] type
pub struct ParallelCommandsState {
thread_local_storage: ThreadLocal<Cell<CommandQueue>>,
}

/// An alternative to [`Commands`] that can be used in parallel contexts, such as those in [`Query::par_for_each`](crate::system::Query::par_for_each)
///
/// Note: Because command application order will depend on how many threads are ran, non-commutative commands may result in non-deterministic results.
///
/// Example:
/// ```
/// # use bevy_ecs::prelude::*;
/// # use bevy_tasks::ComputeTaskPool;
/// #
/// # #[derive(Component)]
/// # struct Velocity;
/// # impl Velocity { fn magnitude(&self) -> f32 { 42.0 } }
/// fn parallel_command_system(
/// mut query: Query<(Entity, &Velocity)>,
/// par_commands: ParallelCommands
/// ) {
/// query.par_for_each(32, |(entity, velocity)| {
/// if velocity.magnitude() > 10.0 {
/// par_commands.command_scope(|mut commands| {
/// commands.entity(entity).despawn();
/// });
/// }
/// });
/// }
/// # bevy_ecs::system::assert_is_system(parallel_command_system);
///```
pub struct ParallelCommands<'w, 's> {
state: &'s mut ParallelCommandsState,
entities: &'w Entities,
}

impl SystemParam for ParallelCommands<'_, '_> {
type Fetch = ParallelCommandsState;
}

impl<'w, 's> SystemParamFetch<'w, 's> for ParallelCommandsState {
type Item = ParallelCommands<'w, 's>;

unsafe fn get_param(
state: &'s mut Self,
_: &crate::system::SystemMeta,
world: &'w World,
_: u32,
) -> Self::Item {
ParallelCommands {
state,
entities: world.entities(),
}
}
}

// SAFE: no component or resource access to report
unsafe impl SystemParamState for ParallelCommandsState {
fn init(_: &mut World, _: &mut crate::system::SystemMeta) -> Self {
Self::default()
}

fn apply(&mut self, world: &mut World) {
for cq in self.thread_local_storage.iter_mut() {
cq.get_mut().apply(world);
}
}
}

impl<'w, 's> ParallelCommands<'w, 's> {
pub fn command_scope<R>(&self, f: impl FnOnce(Commands) -> R) -> R {
let store = &self.state.thread_local_storage;
let command_queue_cell = store.get_or_default();
let mut command_queue = command_queue_cell.take();

let r = f(Commands::new_from_entities(
&mut command_queue,
self.entities,
));

command_queue_cell.set(command_queue);
r
}
}

0 comments on commit 78ee17e

Please sign in to comment.