From 9cb7ca9bd317429420c930b7e2d5febd59aa0c55 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Fri, 17 Jun 2022 23:01:26 +0400 Subject: [PATCH 1/5] Add "GC" for dispatcher workers --- src/dispatching/dispatcher.rs | 96 ++++++++++++++++++++++++++++----- src/dispatching/distribution.rs | 2 +- 2 files changed, 85 insertions(+), 13 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index f458c23f5..7dd875c8f 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -16,7 +16,10 @@ use std::{ fmt::Debug, hash::Hash, ops::{ControlFlow, Deref}, - sync::Arc, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, }; use tokio::time::timeout; use tokio_stream::wrappers::ReceiverStream; @@ -32,6 +35,7 @@ pub struct DispatcherBuilder { error_handler: Arc + Send + Sync>, distribution_f: fn(&Update) -> Option, worker_queue_size: usize, + gc_worker_count_trigger: usize, } impl DispatcherBuilder @@ -83,6 +87,17 @@ where Self { worker_queue_size: size, ..self } } + /// Maximum number of inactive workers. + /// + /// When number of workers exceeds this limit dispatcher will try to remove + /// inactive workers. + /// + /// By default it's 32. + #[must_use] + pub fn gc_worker_count_trigger(self, count: usize) -> Self { + Self { gc_worker_count_trigger: count, ..self } + } + /// Specifies the distribution function that decides how updates are grouped /// before execution. pub fn distribution_function( @@ -100,6 +115,7 @@ where error_handler, distribution_f: _, worker_queue_size, + gc_worker_count_trigger: worker_count_gc, } = self; DispatcherBuilder { @@ -110,6 +126,7 @@ where error_handler, distribution_f: f, worker_queue_size, + gc_worker_count_trigger: worker_count_gc, } } @@ -124,6 +141,7 @@ where error_handler, distribution_f, worker_queue_size, + gc_worker_count_trigger, } = self; Dispatcher { @@ -137,6 +155,7 @@ where worker_queue_size, workers: HashMap::new(), default_worker: None, + gc_worker_count_trigger, } } } @@ -158,6 +177,7 @@ pub struct Dispatcher { distribution_f: fn(&Update) -> Option, worker_queue_size: usize, + gc_worker_count_trigger: usize, // Tokio TX channel parts associated with chat IDs that consume updates sequentially. workers: HashMap, // The default TX part that consume updates concurrently. @@ -171,6 +191,7 @@ pub struct Dispatcher { struct Worker { tx: tokio::sync::mpsc::Sender, handle: tokio::task::JoinHandle<()>, + is_waiting: Arc, } // TODO: it is allowed to return message as response on telegram request in @@ -194,6 +215,7 @@ where Err: Debug, { const DEFAULT_WORKER_QUEUE_SIZE: usize = 64; + const DEFAULT_GC_WORKER_COUNT_TRIGGER: usize = 32; DispatcherBuilder { bot, @@ -206,6 +228,7 @@ where error_handler: LoggingErrorHandler::new(), worker_queue_size: DEFAULT_WORKER_QUEUE_SIZE, distribution_f: default_distribution_function, + gc_worker_count_trigger: DEFAULT_GC_WORKER_COUNT_TRIGGER, } } } @@ -214,7 +237,7 @@ impl Dispatcher where R: Requester + Clone + Send + Sync + 'static, Err: Send + Sync + 'static, - Key: Hash + Eq, + Key: Hash + Eq + Clone, { /// Starts your bot with the default parameters. /// @@ -280,6 +303,8 @@ where tokio::pin!(stream); loop { + self.gc_workers_if_needed().await; + // False positive #[allow(clippy::collapsible_match)] if let Ok(upd) = timeout(shutdown_check_timeout, stream.next()).await { @@ -367,6 +392,45 @@ where } } + async fn gc_workers_if_needed(&mut self) { + if self.workers.len() <= self.gc_worker_count_trigger { + return; + } + + self.gc_workers().await; + } + + #[inline(never)] + async fn gc_workers(&mut self) { + let handles = self + .workers + .iter() + .filter(|(_, worker)| { + worker.tx.capacity() == self.worker_queue_size + && worker.is_waiting.load(Ordering::Relaxed) + }) + .map(|(k, _)| k) + .cloned() + .collect::>() + .into_iter() + .map(|key| { + let Worker { tx, handle, .. } = self.workers.remove(&key).unwrap(); + + // Close channel, worker should stop almost immediately + // (it's been supposedly waiting on the channel) + drop(tx); + + handle + }); + + for handle in handles { + // We must wait for worker to stop anyway, even though it should stop + // immediately. This helps in case if we've checked that the worker + // is waiting in between it received the update and set the flag. + let _ = handle.await; + } + } + /// Setups the `^C` handler that [`shutdown`]s dispatching. /// /// [`shutdown`]: ShutdownToken::shutdown @@ -410,20 +474,28 @@ fn spawn_worker( where Err: Send + Sync + 'static, { - let (tx, rx) = tokio::sync::mpsc::channel(queue_size); + let (tx, mut rx) = tokio::sync::mpsc::channel(queue_size); + let is_waiting = Arc::new(AtomicBool::new(true)); + let is_waiting_local = Arc::clone(&is_waiting); let deps = Arc::new(deps); - let handle = tokio::spawn(ReceiverStream::new(rx).for_each(move |update| { - let deps = Arc::clone(&deps); - let handler = Arc::clone(&handler); - let default_handler = Arc::clone(&default_handler); - let error_handler = Arc::clone(&error_handler); + let handle = tokio::spawn(async move { + while let Some(update) = rx.recv().await { + is_waiting_local.store(false, Ordering::Relaxed); - handle_update(update, deps, handler, default_handler, error_handler) - })); + let deps = Arc::clone(&deps); + let handler = Arc::clone(&handler); + let default_handler = Arc::clone(&default_handler); + let error_handler = Arc::clone(&error_handler); + + handle_update(update, deps, handler, default_handler, error_handler).await; + + is_waiting_local.store(true, Ordering::Relaxed); + } + }); - Worker { tx, handle } + Worker { tx, handle, is_waiting } } fn spawn_default_worker( @@ -449,7 +521,7 @@ where handle_update(update, deps, handler, default_handler, error_handler) })); - Worker { tx, handle } + Worker { tx, handle, is_waiting: Arc::new(AtomicBool::new(true)) } } async fn handle_update( diff --git a/src/dispatching/distribution.rs b/src/dispatching/distribution.rs index 208e0018f..2089b7b6d 100644 --- a/src/dispatching/distribution.rs +++ b/src/dispatching/distribution.rs @@ -1,7 +1,7 @@ use teloxide_core::types::{ChatId, Update}; /// Default distribution key for dispatching. -#[derive(Debug, Hash, PartialEq, Eq)] +#[derive(Debug, Hash, PartialEq, Eq, Clone)] pub struct DefaultKey(ChatId); pub(crate) fn default_distribution_function(update: &Update) -> Option { From a820dedd50b87541d23053a2ffdaf67a0282fe8e Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Sun, 26 Jun 2022 22:53:41 +0400 Subject: [PATCH 2/5] Auto-magically detect how much workers need to be kept alive --- src/dispatching/dispatcher.rs | 47 ++++++++++++++++------------------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 7dd875c8f..a8178be16 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -11,20 +11,20 @@ use crate::{ use dptree::di::{DependencyMap, DependencySupplier}; use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; +use tokio::time::timeout; +use tokio_stream::wrappers::ReceiverStream; + use std::{ collections::HashMap, fmt::Debug, + future::Future, hash::Hash, ops::{ControlFlow, Deref}, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU32, Ordering}, Arc, }, }; -use tokio::time::timeout; -use tokio_stream::wrappers::ReceiverStream; - -use std::future::Future; /// The builder for [`Dispatcher`]. pub struct DispatcherBuilder { @@ -35,7 +35,6 @@ pub struct DispatcherBuilder { error_handler: Arc + Send + Sync>, distribution_f: fn(&Update) -> Option, worker_queue_size: usize, - gc_worker_count_trigger: usize, } impl DispatcherBuilder @@ -87,17 +86,6 @@ where Self { worker_queue_size: size, ..self } } - /// Maximum number of inactive workers. - /// - /// When number of workers exceeds this limit dispatcher will try to remove - /// inactive workers. - /// - /// By default it's 32. - #[must_use] - pub fn gc_worker_count_trigger(self, count: usize) -> Self { - Self { gc_worker_count_trigger: count, ..self } - } - /// Specifies the distribution function that decides how updates are grouped /// before execution. pub fn distribution_function( @@ -115,7 +103,6 @@ where error_handler, distribution_f: _, worker_queue_size, - gc_worker_count_trigger: worker_count_gc, } = self; DispatcherBuilder { @@ -126,7 +113,6 @@ where error_handler, distribution_f: f, worker_queue_size, - gc_worker_count_trigger: worker_count_gc, } } @@ -141,7 +127,6 @@ where error_handler, distribution_f, worker_queue_size, - gc_worker_count_trigger, } = self; Dispatcher { @@ -155,7 +140,8 @@ where worker_queue_size, workers: HashMap::new(), default_worker: None, - gc_worker_count_trigger, + current_number_of_active_workers: Default::default(), + max_number_of_active_workers: Default::default(), } } } @@ -177,7 +163,8 @@ pub struct Dispatcher { distribution_f: fn(&Update) -> Option, worker_queue_size: usize, - gc_worker_count_trigger: usize, + current_number_of_active_workers: Arc, + max_number_of_active_workers: Arc, // Tokio TX channel parts associated with chat IDs that consume updates sequentially. workers: HashMap, // The default TX part that consume updates concurrently. @@ -215,7 +202,6 @@ where Err: Debug, { const DEFAULT_WORKER_QUEUE_SIZE: usize = 64; - const DEFAULT_GC_WORKER_COUNT_TRIGGER: usize = 32; DispatcherBuilder { bot, @@ -228,7 +214,6 @@ where error_handler: LoggingErrorHandler::new(), worker_queue_size: DEFAULT_WORKER_QUEUE_SIZE, distribution_f: default_distribution_function, - gc_worker_count_trigger: DEFAULT_GC_WORKER_COUNT_TRIGGER, } } } @@ -367,6 +352,8 @@ where handler, default_handler, error_handler, + Arc::clone(&self.current_number_of_active_workers), + Arc::clone(&self.max_number_of_active_workers), self.worker_queue_size, ) }), @@ -393,7 +380,10 @@ where } async fn gc_workers_if_needed(&mut self) { - if self.workers.len() <= self.gc_worker_count_trigger { + let workers = self.workers.len(); + let max = self.max_number_of_active_workers.load(Ordering::Relaxed) as usize; + + if workers <= max { return; } @@ -469,6 +459,8 @@ fn spawn_worker( handler: Arc>, default_handler: DefaultHandler, error_handler: Arc + Send + Sync>, + current_number_of_active_workers: Arc, + max_number_of_active_workers: Arc, queue_size: usize, ) -> Worker where @@ -483,6 +475,10 @@ where let handle = tokio::spawn(async move { while let Some(update) = rx.recv().await { is_waiting_local.store(false, Ordering::Relaxed); + { + let current = current_number_of_active_workers.fetch_add(1, Ordering::Relaxed) + 1; + max_number_of_active_workers.fetch_max(current, Ordering::Relaxed); + } let deps = Arc::clone(&deps); let handler = Arc::clone(&handler); @@ -491,6 +487,7 @@ where handle_update(update, deps, handler, default_handler, error_handler).await; + current_number_of_active_workers.fetch_sub(1, Ordering::Relaxed); is_waiting_local.store(true, Ordering::Relaxed); } }); From 8489464bd3c47a6252dce1914cfa1411b0ea2b98 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 27 Jun 2022 00:01:31 +0400 Subject: [PATCH 3/5] Apply suggestions from the review --- src/dispatching/dispatcher.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index a8178be16..74eace826 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -288,7 +288,7 @@ where tokio::pin!(stream); loop { - self.gc_workers_if_needed().await; + self.remove_inactive_workers_if_needed().await; // False positive #[allow(clippy::collapsible_match)] @@ -379,7 +379,7 @@ where } } - async fn gc_workers_if_needed(&mut self) { + async fn remove_inactive_workers_if_needed(&mut self) { let workers = self.workers.len(); let max = self.max_number_of_active_workers.load(Ordering::Relaxed) as usize; @@ -387,11 +387,11 @@ where return; } - self.gc_workers().await; + self.remove_inactive_workers().await; } - #[inline(never)] - async fn gc_workers(&mut self) { + #[inline(never)] // Cold function. + async fn remove_inactive_workers(&mut self) { let handles = self .workers .iter() From b45472973c40b905bcab9cef1d320703ccc2c554 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 27 Jun 2022 00:02:27 +0400 Subject: [PATCH 4/5] Update changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e5fa7404..3ab583ec0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## unreleased +### Fixed + +- `Dispatcher` no longer "leaks" memory for every user + ## 0.9.2 - 2022-06-07 ### Fixed From 3e35d40e84cb0b97e10a6c39e3fb1456b9bdac34 Mon Sep 17 00:00:00 2001 From: Hirrolot Date: Mon, 27 Jun 2022 02:30:39 +0600 Subject: [PATCH 5/5] Update CHANGELOG.md --- CHANGELOG.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ab583ec0..53ff1e5e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- `Dispatcher` no longer "leaks" memory for every user + - `Dispatcher` no longer "leaks" memory for every inactive user ([PR 657](https://github.com/teloxide/teloxide/pull/657)). + +### Changed + + - Add the `Key: Clone` requirement for `impl Dispatcher` [**BC**]. ## 0.9.2 - 2022-06-07