Skip to content

Commit

Permalink
webgpu: Use WGPU poller thread for poll_all_devices (#32266)
Browse files Browse the repository at this point in the history
* Use special WGPU poller thread for poll_all_devices

* Switch to latest wgpu

This is required to fix some deadlocks.

* non-blocking poll unconditionally

* small fixes
  • Loading branch information
sagudev committed May 15, 2024
1 parent bb5906e commit 00f267e
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 35 deletions.
16 changes: 5 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Expand Up @@ -125,8 +125,8 @@ webpki-roots = "0.25"
webrender = { git = "https://github.com/servo/webrender", branch = "0.64", features = ["capture"] }
webrender_api = { git = "https://github.com/servo/webrender", branch = "0.64" }
webrender_traits = { path = "components/shared/webrender" }
wgpu-core = "0.20"
wgpu-types = "0.20"
wgpu-core = { git = "https://github.com/gfx-rs/wgpu", rev = "d0a5e48aa7e84683114c3870051cc414ae92ac03" }
wgpu-types = { git = "https://github.com/gfx-rs/wgpu", rev = "d0a5e48aa7e84683114c3870051cc414ae92ac03" }
winapi = "0.3"
xi-unicode = "0.1.0"
xml5ever = "0.18"
Expand Down
3 changes: 2 additions & 1 deletion components/webgpu/lib.rs
Expand Up @@ -8,6 +8,7 @@ use wgpu_thread::WGPU;
pub use {wgpu_core as wgc, wgpu_types as wgt};

pub mod identity;
mod poll_thread;
mod wgpu_thread;

use std::borrow::Cow;
Expand Down Expand Up @@ -86,7 +87,7 @@ impl WebGPU {
.run();
})
{
warn!("Failed to spwan WGPU thread ({})", e);
warn!("Failed to spawn WGPU thread ({})", e);
return None;
}
Some((WebGPU(sender), script_recv))
Expand Down
126 changes: 126 additions & 0 deletions components/webgpu/poll_thread.rs
@@ -0,0 +1,126 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */

//! Data and main loop of WGPU poll thread.
//!
//! This is roughly based on <https://github.com/LucentFlux/wgpu-async/blob/1322c7e3fcdfc1865a472c7bbbf0e2e06dcf4da8/src/wgpu_future.rs>

use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::JoinHandle;

use log::warn;

use crate::wgc::global::Global;

/// Polls devices while there is something to poll.
///
/// This objects corresponds to a thread that parks itself when there is no work,
/// waiting on it, and then calls `poll_all_devices` repeatedly to block.
///
/// The thread dies when this object is dropped, and all work in submission is done.
///
/// ## Example
/// ```no_run
/// let token = self.poller.token(); // create a new token
/// let callback = SubmittedWorkDoneClosure::from_rust(Box::from(move || {
/// drop(token); // drop token as closure has been fired
/// // ...
/// }));
/// let result = gfx_select!(queue_id => global.queue_on_submitted_work_done(queue_id, callback));
/// self.poller.wake(); // wake poller thread to actually poll
/// ```
#[derive(Debug)]
pub(crate) struct Poller {
/// The number of closures that still needs to be fired.
/// When this is 0, the thread can park itself.
work_count: Arc<AtomicUsize>,
/// True if thread should die after all work in submission is done
is_done: Arc<AtomicBool>,
/// Handle to the WGPU poller thread (to be used for unparking the thread)
handle: Option<JoinHandle<()>>,
}

#[inline]
fn poll_all_devices(global: &Arc<Global>, more_work: &mut bool, force_wait: bool) {
match global.poll_all_devices(force_wait) {
Ok(all_queue_empty) => *more_work = !all_queue_empty,
Err(e) => warn!("Poller thread got `{e}` on poll_all_devices."),
}
}

impl Poller {
pub(crate) fn new(global: Arc<Global>) -> Self {
let work_count = Arc::new(AtomicUsize::new(0));
let is_done = Arc::new(AtomicBool::new(false));
let work = work_count.clone();
let done = is_done.clone();
Self {
work_count,
is_done,
handle: Some(
std::thread::Builder::new()
.name("WGPU poller".into())
.spawn(move || {
while !done.load(Ordering::Acquire) {
let mut more_work = false;
// Do non-blocking poll unconditionally
// so every `ẁake` (even spurious) will do at least one poll.
// this is mostly useful for stuff that is deferred
// to maintain calls in wgpu (device resource destruction)
poll_all_devices(&global, &mut more_work, false);
while more_work || work.load(Ordering::Acquire) != 0 {
poll_all_devices(&global, &mut more_work, true);
}
std::thread::park(); //TODO: should we use timeout here
}
})
.expect("Spawning thread should not fail"),
),
}
}

/// Creates a token of work
pub(crate) fn token(&self) -> WorkToken {
let prev = self.work_count.fetch_add(1, Ordering::AcqRel);
debug_assert!(
prev < usize::MAX,
"cannot have more than `usize::MAX` outstanding operations on the GPU"
);
WorkToken {
work_count: Arc::clone(&self.work_count),
}
}

/// Wakes the poller thread to start polling.
pub(crate) fn wake(&self) {
self.handle
.as_ref()
.expect("Poller thread does not exist!")
.thread()
.unpark();
}
}

impl Drop for Poller {
fn drop(&mut self) {
self.is_done.store(true, Ordering::Release);

let handle = self.handle.take().expect("Poller dropped twice");
handle.thread().unpark();
handle.join().expect("Poller thread panicked");
}
}

/// RAII indicating that there is some work enqueued (closure to be fired),
/// while this token is held.
pub(crate) struct WorkToken {
work_count: Arc<AtomicUsize>,
}

impl Drop for WorkToken {
fn drop(&mut self) {
self.work_count.fetch_sub(1, Ordering::AcqRel);
}
}
42 changes: 21 additions & 21 deletions components/webgpu/wgpu_thread.rs
Expand Up @@ -8,7 +8,6 @@ use std::cell::RefCell;
use std::collections::HashMap;
use std::slice;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use arrayvec::ArrayVec;
use euclid::default::Size2D;
Expand All @@ -27,12 +26,12 @@ use wgc::{gfx_select, id};
use wgt::InstanceDescriptor;
pub use {wgpu_core as wgc, wgpu_types as wgt};

use crate::poll_thread::Poller;
use crate::{
ErrorScopeId, PresentationData, Transmute, WebGPU, WebGPUAdapter, WebGPUDevice, WebGPUMsg,
WebGPUOpResult, WebGPUQueue, WebGPURequest, WebGPUResponse,
};

const DEVICE_POLL_INTERVAL: Duration = Duration::from_millis(50);
pub const PRESENTATION_BUFFER_COUNT: usize = 10;

#[allow(clippy::upper_case_acronyms)] // Name of the library
Expand All @@ -51,7 +50,7 @@ pub(crate) struct WGPU {
webrender_document: DocumentId,
external_images: Arc<Mutex<WebrenderExternalImageRegistry>>,
wgpu_image_map: Arc<Mutex<HashMap<u64, PresentationData>>>,
last_poll: Instant,
poller: Poller,
}

impl WGPU {
Expand All @@ -64,17 +63,19 @@ impl WGPU {
external_images: Arc<Mutex<WebrenderExternalImageRegistry>>,
wgpu_image_map: Arc<Mutex<HashMap<u64, PresentationData>>>,
) -> Self {
let global = Arc::new(wgc::global::Global::new(
"wgpu-core",
InstanceDescriptor {
backends: wgt::Backends::PRIMARY,
..Default::default()
},
));
WGPU {
poller: Poller::new(Arc::clone(&global)),
receiver,
sender,
script_sender,
global: Arc::new(wgc::global::Global::new(
"wgpu-core",
InstanceDescriptor {
backends: wgt::Backends::PRIMARY,
..Default::default()
},
)),
global,
adapters: Vec::new(),
devices: HashMap::new(),
_invalid_adapters: Vec::new(),
Expand All @@ -83,21 +84,12 @@ impl WGPU {
webrender_document,
external_images,
wgpu_image_map,
last_poll: Instant::now(),
}
}

pub(crate) fn run(&mut self) {
loop {
let diff = DEVICE_POLL_INTERVAL.checked_sub(self.last_poll.elapsed());
if diff.is_none() {
let _ = self.global.poll_all_devices(false);
self.last_poll = Instant::now();
}
if let Ok((scope_id, msg)) = self
.receiver
.try_recv_timeout(diff.unwrap_or(DEVICE_POLL_INTERVAL))
{
if let Ok((scope_id, msg)) = self.receiver.recv() {
match msg {
WebGPURequest::BufferMapAsync {
sender,
Expand All @@ -109,7 +101,9 @@ impl WGPU {
} => {
let glob = Arc::clone(&self.global);
let resp_sender = sender.clone();
let token = self.poller.token();
let callback = BufferMapCallback::from_rust(Box::from(move |result| {
drop(token);
match result {
Ok(()) => {
let global = &glob;
Expand Down Expand Up @@ -151,6 +145,7 @@ impl WGPU {
size,
operation
));
self.poller.wake();
if let Err(ref e) = result {
if let Err(w) = sender.send(Some(Err(format!("{:?}", e)))) {
warn!("Failed to send BufferMapAsync Response ({:?})", w);
Expand Down Expand Up @@ -818,7 +813,9 @@ impl WGPU {
let wgpu_image_map = Arc::clone(&self.wgpu_image_map);
let webrender_api = Arc::clone(&self.webrender_api);
let webrender_document = self.webrender_document;
let token = self.poller.token();
let callback = BufferMapCallback::from_rust(Box::from(move |result| {
drop(token);
match result {
Ok(()) => {
let global = &glob;
Expand Down Expand Up @@ -866,6 +863,7 @@ impl WGPU {
};
let _ = gfx_select!(buffer_id
=> global.buffer_map_async(buffer_id, 0, Some(buffer_size), map_op));
self.poller.wake();
},
WebGPURequest::UnmapBuffer {
buffer_id,
Expand Down Expand Up @@ -928,14 +926,16 @@ impl WGPU {
},
WebGPURequest::QueueOnSubmittedWorkDone { sender, queue_id } => {
let global = &self.global;

let token = self.poller.token();
let callback = SubmittedWorkDoneClosure::from_rust(Box::from(move || {
drop(token);
if let Err(e) = sender.send(Some(Ok(WebGPUResponse::SubmittedWorkDone)))
{
warn!("Could not send SubmittedWorkDone Response ({})", e);
}
}));
let result = gfx_select!(queue_id => global.queue_on_submitted_work_done(queue_id, callback));
self.poller.wake();
self.send_result(queue_id.transmute(), scope_id, result);
},
WebGPURequest::DropTexture(id) => {
Expand Down

0 comments on commit 00f267e

Please sign in to comment.