diff --git a/gio/src/gio_future.rs b/gio/src/gio_future.rs index 2870104a7bfc..6f192553be59 100644 --- a/gio/src/gio_future.rs +++ b/gio/src/gio_future.rs @@ -11,6 +11,123 @@ use std::pin::{self, Pin}; use crate::prelude::*; use crate::Cancellable; +pub struct GioInfaliableFuture { + obj: O, + schedule_operation: Option, + cancellable: Option, + receiver: Option>, +} + +pub struct GioInfaliableResult { + sender: oneshot::Sender, +} + +impl GioInfaliableResult { + pub fn resolve(self, res: T) { + let _ = self.sender.send(res); + } +} + +impl GioInfaliableFuture +where + O: Clone + 'static, + F: FnOnce(&O, &Cancellable, GioInfaliableResult) + 'static, +{ + pub fn new(obj: &O, schedule_operation: F) -> GioInfaliableFuture { + Self { + obj: obj.clone(), + schedule_operation: Some(schedule_operation), + cancellable: Some(Cancellable::new()), + receiver: None, + } + } +} + +impl Future for GioInfaliableFuture +where + O: Clone + 'static, + F: FnOnce(&O, &Cancellable, GioInfaliableResult) + 'static, +{ + type Output = T; + + fn poll(mut self: pin::Pin<&mut Self>, ctx: &mut Context) -> Poll { + let GioInfaliableFuture { + ref obj, + ref mut schedule_operation, + ref mut cancellable, + ref mut receiver, + .. + } = *self; + + if let Some(schedule_operation) = schedule_operation.take() { + let main_context = glib::MainContext::ref_thread_default(); + assert!( + main_context.is_owner(), + "Spawning futures only allowed if the thread is owning the MainContext" + ); + + // Channel for sending back the GIO async operation + // result to our future here. + // + // In theory, we could directly continue polling the + // corresponding task from the GIO async operation + // callback, however this would break at the very + // least the g_main_current_source() API. + let (send, recv) = oneshot::channel(); + + schedule_operation( + obj, + cancellable.as_ref().unwrap(), + GioInfaliableResult { sender: send }, + ); + + *receiver = Some(recv); + } + + // At this point we must have a receiver + let res = { + let receiver = receiver.as_mut().unwrap(); + Pin::new(receiver).poll(ctx) + }; + + match res { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(_)) => panic!("Async operation sender was unexpectedly closed"), + Poll::Ready(Ok(v)) => { + // Get rid of the reference to the cancellable and receiver + let _ = cancellable.take(); + let _ = receiver.take(); + Poll::Ready(v) + } + } + } +} + +impl FusedFuture for GioInfaliableFuture +where + O: Clone + 'static, + F: FnOnce(&O, &Cancellable, GioInfaliableResult) + 'static, +{ + fn is_terminated(&self) -> bool { + self.schedule_operation.is_none() + && self + .receiver + .as_ref() + .map_or(true, |receiver| receiver.is_terminated()) + } +} + +impl Drop for GioInfaliableFuture { + fn drop(&mut self) { + if let Some(cancellable) = self.cancellable.take() { + cancellable.cancel(); + } + let _ = self.receiver.take(); + } +} + +impl Unpin for GioInfaliableFuture {} + pub struct GioFuture { obj: O, schedule_operation: Option,