Skip to content

Commit

Permalink
Automatically clean up timers for deallocated components. Fixes #44
Browse files Browse the repository at this point in the history
  • Loading branch information
Bathtor committed Mar 25, 2020
1 parent a16e5c9 commit 1e5fc6c
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 31 deletions.
4 changes: 2 additions & 2 deletions core/src/messaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -858,9 +858,9 @@ macro_rules! match_deser {
#[cfg(test)]
mod deser_macro_tests {
use super::*;
use crate::{net::buffer::BufferChunk, serialisation::Serialiser};
use crate::serialisation::Serialiser;
use bytes::{Buf, BufMut};
use std::{str::FromStr, sync::Arc};
use std::str::FromStr;

#[test]
fn simple_macro_test() {
Expand Down
48 changes: 37 additions & 11 deletions core/src/timer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub trait Timer {
action: F,
) -> ()
where
F: Fn(Uuid) + Send + 'static;
F: Fn(Uuid) -> TimerReturn + Send + 'static;

/// Cancel the timer indicated by the `id`
///
Expand All @@ -62,6 +62,29 @@ pub trait Timer {
fn cancel(&mut self, id: Uuid);
}

/// Indicate whether or not to reschedule a periodic timer
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum TimerReturn {
/// Reschedule the timer
Reschedule,
/// Do not reschedule the timer
Cancel,
}
impl TimerReturn {
/// Whether or not this timer should be rescheduled
pub fn should_reschedule(&self) -> bool {
match self {
TimerReturn::Reschedule => true,
TimerReturn::Cancel => false,
}
}
}
impl Default for TimerReturn {
fn default() -> Self {
TimerReturn::Reschedule
}
}

/// A concrete entry for an outstanding timout
pub enum TimerEntry {
/// A one-off timer
Expand All @@ -82,7 +105,7 @@ pub enum TimerEntry {
/// The time between `action` invocations
period: Duration,
/// The action to invoke whenever the timeout expires
action: Box<dyn Fn(Uuid) + Send + 'static>,
action: Box<dyn Fn(Uuid) -> TimerReturn + Send + 'static>,
},
}

Expand Down Expand Up @@ -151,15 +174,18 @@ impl TimerEntry {
TimerEntry::Periodic {
id, action, period, ..
} => {
action.as_ref()(id);
let next = TimerEntry::Periodic {
id,
delay: period,
period,
action,
};
//FnBox::call_box(&action, id);
Some(next)
let res = action.as_ref()(id);
if res.should_reschedule() {
let next = TimerEntry::Periodic {
id,
delay: period,
period,
action,
};
Some(next)
} else {
None
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/timer/simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl Timer for SimulationTimer {

fn schedule_periodic<F>(&mut self, id: Uuid, delay: Duration, period: Duration, action: F) -> ()
where
F: Fn(Uuid) + Send + 'static,
F: Fn(Uuid) -> TimerReturn + Send + 'static,
{
let e = TimerEntry::Periodic {
id,
Expand Down
2 changes: 1 addition & 1 deletion core/src/timer/thread_timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl Timer for TimerRef {

fn schedule_periodic<F>(&mut self, id: Uuid, delay: Duration, period: Duration, action: F) -> ()
where
F: Fn(Uuid) + Send + 'static,
F: Fn(Uuid) -> TimerReturn + Send + 'static,
{
let e = TimerEntry::Periodic {
id,
Expand Down
51 changes: 35 additions & 16 deletions core/src/timer_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::timer::Timer as TTimer;
use crate::timer::{Timer as TTimer, TimerReturn};
use std::{
collections::HashMap,
rc::Rc,
Expand Down Expand Up @@ -229,7 +229,8 @@ impl<C: ComponentDefinition> TimerManager<C> {
self.handles.insert(id, handle);
let tar = self.new_ref(component);
self.timer.schedule_once(id, timeout, move |id| {
tar.enqueue(Timeout(id));
// Ignore the Result, as we are anyway not trying to reschedule this timer
let _res = tar.enqueue(Timeout(id));
});
ScheduledTimer::from_uuid(id)
}
Expand All @@ -252,7 +253,13 @@ impl<C: ComponentDefinition> TimerManager<C> {
self.handles.insert(id, handle);
let tar = self.new_ref(component);
self.timer.schedule_periodic(id, delay, period, move |id| {
tar.enqueue(Timeout(id));
let res = tar.enqueue(Timeout(id));
if res.is_ok() {
TimerReturn::Reschedule
} else {
// Queue has probably been deallocated, so no point in trying this over and over again
TimerReturn::Cancel
}
});
ScheduledTimer::from_uuid(id)
}
Expand All @@ -263,6 +270,15 @@ impl<C: ComponentDefinition> TimerManager<C> {
}
}

impl<C: ComponentDefinition> Drop for TimerManager<C> {
fn drop(&mut self) {
// drop all handles in case someone forgets to clean up after their component
for (id, _) in self.handles.drain() {
self.timer.cancel(id);
}
}
}

// NEVER SEND THIS!
pub(crate) enum TimerHandle<C: ComponentDefinition> {
OneShot {
Expand Down Expand Up @@ -297,24 +313,27 @@ impl TimerActorRef {
}
}

fn enqueue(&self, timeout: Timeout) -> () {
fn enqueue(&self, timeout: Timeout) -> Result<(), QueueingError> {
match (self.msg_queue.upgrade(), self.component.upgrade()) {
(Some(q), Some(c)) => {
q.push(timeout);
match c.core().increment_work() {
SchedulingDecision::Schedule => {
let system = c.core().system();
system.schedule(c.clone());
}
_ => (), // nothing
if let SchedulingDecision::Schedule = c.core().increment_work() {
let system = c.core().system();
system.schedule(c.clone());
}
Ok(())
}
(q, c) => {
eprintln!("Dropping timeout as target (queue? {:?}, component? {:?}) is unavailable: {:?}",
q.is_some(),
c.is_some(),
timeout
);
Err(QueueingError)
}
(q, c) => println!(
"Dropping timeout as target (queue? {:?}, component? {:?}) is unavailable: {:?}",
q.is_some(),
c.is_some(),
timeout
),
}
}
}

#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
struct QueueingError;
1 change: 1 addition & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,4 @@ report_fixme = "Always"
ignore = []
emit_mode = "Files"
make_backup = false
edition = "2018"

0 comments on commit 1e5fc6c

Please sign in to comment.