From 4245e6e10377715724666c5bdae25a976433651a Mon Sep 17 00:00:00 2001 From: tekjar Date: Sun, 11 Dec 2022 20:21:43 +0530 Subject: [PATCH 1/5] Reset cursor of unacked packets in DataRequest --- rumqttd/src/router/iobufs.rs | 20 +++++++++++++++++--- rumqttd/src/router/routing.rs | 12 +++++++++++- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/rumqttd/src/router/iobufs.rs b/rumqttd/src/router/iobufs.rs index b3b90b4d2..5700c5e7b 100644 --- a/rumqttd/src/router/iobufs.rs +++ b/rumqttd/src/router/iobufs.rs @@ -1,4 +1,7 @@ -use std::{collections::VecDeque, sync::Arc}; +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, +}; use flume::{Receiver, Sender}; use parking_lot::Mutex; @@ -7,7 +10,7 @@ use tracing::{error, warn}; use crate::{ protocol::Packet, router::{FilterIdx, MAX_CHANNEL_CAPACITY}, - Cursor, Notification, + Cursor, Notification, Offset, }; use super::{Forward, IncomingMeter, OutgoingMeter}; @@ -56,7 +59,7 @@ pub struct Outgoing { /// Handle which is given to router to allow router to communicate with this connection pub(crate) handle: Sender<()>, /// The buffer to keep track of inflight packets. - inflight_buffer: VecDeque<(u16, FilterIdx, Cursor)>, + pub(crate) inflight_buffer: VecDeque<(u16, FilterIdx, Cursor)>, /// Last packet id last_pkid: u16, /// Metrics of outgoing messages of this connection @@ -171,6 +174,17 @@ impl Outgoing { Some(()) } + + pub fn retrasmission_map(&self) -> HashMap { + let mut o = HashMap::new(); + for (_, filter_idx, cursor) in self.inflight_buffer.iter() { + if !o.contains_key(filter_idx) { + o.insert(*filter_idx, *cursor); + } + } + + o + } } #[cfg(test)] diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index 9eb6f4b67..1b7df3ae1 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -328,7 +328,7 @@ impl Router { // Remove connection from router let mut connection = self.connections.remove(id); let _incoming = self.ibufs.remove(id); - let _outgoing = self.obufs.remove(id); + let outgoing = self.obufs.remove(id); let mut tracker = self.scheduler.remove(id); self.connection_map.remove(&client_id); self.ackslog.remove(id); @@ -339,6 +339,7 @@ impl Router { // self.readyqueue.remove(id); let inflight_data_requests = self.datalog.clean(id); + let retransmissions = outgoing.retrasmission_map(); // Remove this connection from subscriptions for filter in connection.subscriptions.iter() { @@ -367,6 +368,15 @@ impl Router { .into_iter() .for_each(|r| tracker.register_data_request(r)); + debug!("{:?}, {:?} ", &tracker, retransmissions); + + for request in tracker.data_requests.iter_mut() { + if let Some(cursor) = retransmissions.get(&request.filter_idx) { + request.cursor = *cursor; + } + } + + debug!("{:?}, {:?} ", &tracker, retransmissions); self.graveyard .save(tracker, connection.subscriptions, connection.events); } else { From c2d8e71bbb212028179753df6c464cd3a7b7c7d0 Mon Sep 17 00:00:00 2001 From: henil Date: Fri, 16 Dec 2022 12:37:20 +0530 Subject: [PATCH 2/5] fix cursor indexes stored in inflight buffer --- rumqttd/src/router/iobufs.rs | 2 +- rumqttd/src/router/logs.rs | 2 +- rumqttd/src/router/routing.rs | 4 ++-- rumqttd/src/segments/mod.rs | 8 +++++--- rumqttd/src/segments/segment.rs | 31 +++++++++++++++++++++---------- 5 files changed, 30 insertions(+), 17 deletions(-) diff --git a/rumqttd/src/router/iobufs.rs b/rumqttd/src/router/iobufs.rs index 5700c5e7b..955f2d256 100644 --- a/rumqttd/src/router/iobufs.rs +++ b/rumqttd/src/router/iobufs.rs @@ -10,7 +10,7 @@ use tracing::{error, warn}; use crate::{ protocol::Packet, router::{FilterIdx, MAX_CHANNEL_CAPACITY}, - Cursor, Notification, Offset, + Cursor, Notification, }; use super::{Forward, IncomingMeter, OutgoingMeter}; diff --git a/rumqttd/src/router/logs.rs b/rumqttd/src/router/logs.rs index 340330528..f771313e5 100644 --- a/rumqttd/src/router/logs.rs +++ b/rumqttd/src/router/logs.rs @@ -152,7 +152,7 @@ impl DataLog { filter_idx: FilterIdx, offset: Offset, len: u64, - ) -> io::Result<(Position, Vec)> { + ) -> io::Result<(Position, Vec<(Publish, Offset)>)> { // unwrap to get index of `self.native` is fine here, because when a new subscribe packet // arrives in `Router::handle_device_payload`, it first calls the function // `next_native_offset` which creates a new commitlog if one doesn't exist. So any new diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index 1b7df3ae1..c47b50dc2 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -1068,10 +1068,10 @@ fn forward_device_data( } // Fill and notify device data - let forwards = publishes.into_iter().map(|mut publish| { + let forwards = publishes.into_iter().map(|(mut publish, offset)| { publish.qos = protocol::qos(qos).unwrap(); Forward { - cursor: next, + cursor: offset, size: 0, publish, } diff --git a/rumqttd/src/segments/mod.rs b/rumqttd/src/segments/mod.rs index 17126c3f3..7e15e9ec0 100644 --- a/rumqttd/src/segments/mod.rs +++ b/rumqttd/src/segments/mod.rs @@ -1,3 +1,4 @@ +use crate::Offset; use std::usize; use std::{collections::VecDeque, io}; @@ -186,7 +187,7 @@ where &self, mut start: (u64, u64), mut len: u64, - out: &mut Vec, + out: &mut Vec<(T, Offset)>, ) -> io::Result { let mut cursor = start; let _orig_cursor = cursor; @@ -217,11 +218,12 @@ where cursor.1 = curr_segment.absolute_offset; } + dbg!(&self.segments.len()); while cursor.0 < self.tail { // `Segment::readv` handles conversion from absolute index to relative // index and it returns the absolute offset. // absolute cursor not to be confused with absolute offset - match curr_segment.readv(cursor.1, len, out)? { + match curr_segment.readv(cursor, len, out)? { // an offset returned -> we didn't read till end -> len fulfilled -> return SegmentPosition::Next(offset) => { return Ok(Position::Next { @@ -261,7 +263,7 @@ where // segment's `readv` then we should return `None` as well as not possible to read further, // whereas for older segments we simply jump onto the new one to read more. - match curr_segment.readv(cursor.1, len, out)? { + match curr_segment.readv(cursor, len, out)? { SegmentPosition::Next(v) => { // debug!("start: {:?}, end: ({}, {})", orig_cursor, cursor.0, cursor.1 + v - 1); Ok(Position::Next { diff --git a/rumqttd/src/segments/segment.rs b/rumqttd/src/segments/segment.rs index 19be75dd8..7466d5d51 100644 --- a/rumqttd/src/segments/segment.rs +++ b/rumqttd/src/segments/segment.rs @@ -1,3 +1,5 @@ +use crate::{Cursor, Offset}; + use super::Storage; use std::io; @@ -57,13 +59,13 @@ where /// relative offset to absolute offset and vice-versa. pub(crate) fn readv( &self, - absolute_index: u64, + cursor: Cursor, len: u64, - out: &mut Vec, + out: &mut Vec<(T, Offset)>, ) -> io::Result { // this substraction can never overflow as checking of offset happens at // `CommitLog::readv`. - let idx = absolute_index - self.absolute_offset; + let idx = cursor.1 - self.absolute_offset; let mut ret: Option; @@ -78,7 +80,13 @@ where ret = None; limit = self.len(); } - out.extend(self.data[idx as usize..limit as usize].iter().cloned()); + let tmp = std::iter::repeat(cursor.0).zip(cursor.1..cursor.1 + limit); + dbg!(&tmp); + let o = self.data[idx as usize..limit as usize] + .iter() + .cloned() + .zip(tmp); + out.extend(o); } match ret { @@ -135,11 +143,14 @@ mod tests { segment.push(Bytes::from_static(b"test9")); assert_eq!(segment.len(), 9); - let mut out: Vec = Vec::new(); - let _ = segment.readv(0, 2, &mut out).unwrap(); + let mut out: Vec<(Bytes, Offset)> = Vec::new(); + let _ = segment.readv((0, 0), 2, &mut out).unwrap(); assert_eq!( out, - vec![Bytes::from_static(b"test1"), Bytes::from_static(b"test2")] + vec![ + (Bytes::from_static(b"test1"), (0, 0)), + (Bytes::from_static(b"test2"), (0, 1)) + ] ); } @@ -157,8 +168,8 @@ mod tests { segment.push(vec![9u8]); assert_eq!(segment.len(), 9); - let mut out: Vec> = Vec::new(); - let _ = segment.readv(0, 2, &mut out).unwrap(); - assert_eq!(out, vec![vec![1u8], vec![2u8]]); + let mut out: Vec<(Vec, Offset)> = Vec::new(); + let _ = segment.readv((0, 0), 2, &mut out).unwrap(); + assert_eq!(out, vec![(vec![1u8], (0, 0)), (vec![2u8], (0, 1))]); } } From 3e9163867d938a3cdc824747d228af182c6aa489 Mon Sep 17 00:00:00 2001 From: henil Date: Fri, 16 Dec 2022 12:50:40 +0530 Subject: [PATCH 3/5] fix clippy warnings and remove dbg! --- rumqttd/src/router/iobufs.rs | 2 +- rumqttd/src/segments/mod.rs | 7 +++---- rumqttd/src/segments/segment.rs | 1 - 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/rumqttd/src/router/iobufs.rs b/rumqttd/src/router/iobufs.rs index 955f2d256..4f7ec2d4a 100644 --- a/rumqttd/src/router/iobufs.rs +++ b/rumqttd/src/router/iobufs.rs @@ -71,7 +71,7 @@ impl Outgoing { pub(crate) fn new(client_id: String) -> (Self, Receiver<()>) { let (handle, rx) = flume::bounded(MAX_CHANNEL_CAPACITY); let data_buffer = VecDeque::with_capacity(MAX_CHANNEL_CAPACITY); - let inflight_buffer = VecDeque::with_capacity(MAX_INFLIGHT as usize); + let inflight_buffer = VecDeque::with_capacity(MAX_INFLIGHT); // Ensure that there won't be any new allocations assert!(MAX_INFLIGHT <= inflight_buffer.capacity()); diff --git a/rumqttd/src/segments/mod.rs b/rumqttd/src/segments/mod.rs index 7e15e9ec0..e988afaba 100644 --- a/rumqttd/src/segments/mod.rs +++ b/rumqttd/src/segments/mod.rs @@ -128,7 +128,7 @@ where #[allow(dead_code)] #[inline] pub fn len(&self) -> usize { - self.segments.len() as usize + self.segments.len() } /// Number of packets @@ -218,7 +218,6 @@ where cursor.1 = curr_segment.absolute_offset; } - dbg!(&self.segments.len()); while cursor.0 < self.tail { // `Segment::readv` handles conversion from absolute index to relative // index and it returns the absolute offset. @@ -292,10 +291,10 @@ mod tests { Bytes::from(vec![id; size as usize]) } - fn verify(expected_id: usize, expected_size: u64, out: Bytes) { + fn verify(expected_id: usize, expected_size: u64, out: (Bytes, Offset)) { let expected = Bytes::from(vec![expected_id as u8; expected_size as usize]); // dbg!(expected_id, &expected); - assert_eq!(out, expected); + assert_eq!(out.0, expected); } #[test] diff --git a/rumqttd/src/segments/segment.rs b/rumqttd/src/segments/segment.rs index 7466d5d51..c72b4bee3 100644 --- a/rumqttd/src/segments/segment.rs +++ b/rumqttd/src/segments/segment.rs @@ -81,7 +81,6 @@ where limit = self.len(); } let tmp = std::iter::repeat(cursor.0).zip(cursor.1..cursor.1 + limit); - dbg!(&tmp); let o = self.data[idx as usize..limit as usize] .iter() .cloned() From cd4a761f12735ebf68c77b7323c1474b6ca67279 Mon Sep 17 00:00:00 2001 From: henil Date: Fri, 16 Dec 2022 13:50:25 +0530 Subject: [PATCH 4/5] give more appropriate name to variable --- rumqttd/src/segments/segment.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rumqttd/src/segments/segment.rs b/rumqttd/src/segments/segment.rs index c72b4bee3..66fc36e83 100644 --- a/rumqttd/src/segments/segment.rs +++ b/rumqttd/src/segments/segment.rs @@ -80,11 +80,11 @@ where ret = None; limit = self.len(); } - let tmp = std::iter::repeat(cursor.0).zip(cursor.1..cursor.1 + limit); + let offsets = std::iter::repeat(cursor.0).zip(cursor.1..cursor.1 + limit); let o = self.data[idx as usize..limit as usize] .iter() .cloned() - .zip(tmp); + .zip(offsets); out.extend(o); } From 9c455aa47799feebf134c6c166693a36f14365f2 Mon Sep 17 00:00:00 2001 From: henil Date: Fri, 16 Dec 2022 20:44:58 +0530 Subject: [PATCH 5/5] clean up and add unit test for retransmission_map --- rumqttd/src/router/iobufs.rs | 36 +++++++++++++++++++++++++++++++++-- rumqttd/src/router/routing.rs | 5 +---- 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/rumqttd/src/router/iobufs.rs b/rumqttd/src/router/iobufs.rs index 4f7ec2d4a..04526c1ec 100644 --- a/rumqttd/src/router/iobufs.rs +++ b/rumqttd/src/router/iobufs.rs @@ -59,7 +59,7 @@ pub struct Outgoing { /// Handle which is given to router to allow router to communicate with this connection pub(crate) handle: Sender<()>, /// The buffer to keep track of inflight packets. - pub(crate) inflight_buffer: VecDeque<(u16, FilterIdx, Cursor)>, + inflight_buffer: VecDeque<(u16, FilterIdx, Cursor)>, /// Last packet id last_pkid: u16, /// Metrics of outgoing messages of this connection @@ -175,7 +175,9 @@ impl Outgoing { Some(()) } - pub fn retrasmission_map(&self) -> HashMap { + // Here we are assuming that the first unique filter_idx we find while iterating will have the + // least corresponding cursor because of the way we insert into the inflight_buffer + pub fn retransmission_map(&self) -> HashMap { let mut o = HashMap::new(); for (_, filter_idx, cursor) in self.inflight_buffer.iter() { if !o.contains_key(filter_idx) { @@ -189,6 +191,36 @@ impl Outgoing { #[cfg(test)] mod test { + use super::*; + + #[test] + fn retransmission_map_is_calculated_accurately() { + let (mut outgoing, _) = Outgoing::new("retransmission-test".to_string()); + let mut result = HashMap::new(); + + result.insert(0, (0, 8)); + result.insert(1, (0, 1)); + result.insert(2, (1, 1)); + result.insert(3, (1, 0)); + + let buf = vec![ + (1, 0, (0, 8)), + (1, 0, (0, 10)), + (1, 1, (0, 1)), + (3, 1, (0, 4)), + (2, 2, (1, 1)), + (1, 2, (2, 6)), + (1, 2, (2, 1)), + (1, 3, (1, 0)), + (1, 3, (1, 1)), + (1, 3, (1, 3)), + (1, 3, (1, 3)), + ]; + + outgoing.inflight_buffer.extend(buf); + assert_eq!(outgoing.retransmission_map(), result); + } + // use super::{Outgoing, MAX_INFLIGHT}; // use crate::protocol::{Publish, QoS}; // use crate::router::Forward; diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index c47b50dc2..74b731955 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -339,7 +339,7 @@ impl Router { // self.readyqueue.remove(id); let inflight_data_requests = self.datalog.clean(id); - let retransmissions = outgoing.retrasmission_map(); + let retransmissions = outgoing.retransmission_map(); // Remove this connection from subscriptions for filter in connection.subscriptions.iter() { @@ -368,15 +368,12 @@ impl Router { .into_iter() .for_each(|r| tracker.register_data_request(r)); - debug!("{:?}, {:?} ", &tracker, retransmissions); - for request in tracker.data_requests.iter_mut() { if let Some(cursor) = retransmissions.get(&request.filter_idx) { request.cursor = *cursor; } } - debug!("{:?}, {:?} ", &tracker, retransmissions); self.graveyard .save(tracker, connection.subscriptions, connection.events); } else {