diff --git a/rumqttd/src/router/iobufs.rs b/rumqttd/src/router/iobufs.rs index b3b90b4d2..04526c1ec 100644 --- a/rumqttd/src/router/iobufs.rs +++ b/rumqttd/src/router/iobufs.rs @@ -1,4 +1,7 @@ -use std::{collections::VecDeque, sync::Arc}; +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, +}; use flume::{Receiver, Sender}; use parking_lot::Mutex; @@ -68,7 +71,7 @@ impl Outgoing { pub(crate) fn new(client_id: String) -> (Self, Receiver<()>) { let (handle, rx) = flume::bounded(MAX_CHANNEL_CAPACITY); let data_buffer = VecDeque::with_capacity(MAX_CHANNEL_CAPACITY); - let inflight_buffer = VecDeque::with_capacity(MAX_INFLIGHT as usize); + let inflight_buffer = VecDeque::with_capacity(MAX_INFLIGHT); // Ensure that there won't be any new allocations assert!(MAX_INFLIGHT <= inflight_buffer.capacity()); @@ -171,10 +174,53 @@ impl Outgoing { Some(()) } + + // Here we are assuming that the first unique filter_idx we find while iterating will have the + // least corresponding cursor because of the way we insert into the inflight_buffer + pub fn retransmission_map(&self) -> HashMap { + let mut o = HashMap::new(); + for (_, filter_idx, cursor) in self.inflight_buffer.iter() { + if !o.contains_key(filter_idx) { + o.insert(*filter_idx, *cursor); + } + } + + o + } } #[cfg(test)] mod test { + use super::*; + + #[test] + fn retransmission_map_is_calculated_accurately() { + let (mut outgoing, _) = Outgoing::new("retransmission-test".to_string()); + let mut result = HashMap::new(); + + result.insert(0, (0, 8)); + result.insert(1, (0, 1)); + result.insert(2, (1, 1)); + result.insert(3, (1, 0)); + + let buf = vec![ + (1, 0, (0, 8)), + (1, 0, (0, 10)), + (1, 1, (0, 1)), + (3, 1, (0, 4)), + (2, 2, (1, 1)), + (1, 2, (2, 6)), + (1, 2, (2, 1)), + (1, 3, (1, 0)), + (1, 3, (1, 1)), + (1, 3, (1, 3)), + (1, 3, (1, 3)), + ]; + + outgoing.inflight_buffer.extend(buf); + assert_eq!(outgoing.retransmission_map(), result); + } + // use super::{Outgoing, MAX_INFLIGHT}; // use crate::protocol::{Publish, QoS}; // use crate::router::Forward; diff --git a/rumqttd/src/router/logs.rs b/rumqttd/src/router/logs.rs index 340330528..f771313e5 100644 --- a/rumqttd/src/router/logs.rs +++ b/rumqttd/src/router/logs.rs @@ -152,7 +152,7 @@ impl DataLog { filter_idx: FilterIdx, offset: Offset, len: u64, - ) -> io::Result<(Position, Vec)> { + ) -> io::Result<(Position, Vec<(Publish, Offset)>)> { // unwrap to get index of `self.native` is fine here, because when a new subscribe packet // arrives in `Router::handle_device_payload`, it first calls the function // `next_native_offset` which creates a new commitlog if one doesn't exist. So any new diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index 9eb6f4b67..74b731955 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -328,7 +328,7 @@ impl Router { // Remove connection from router let mut connection = self.connections.remove(id); let _incoming = self.ibufs.remove(id); - let _outgoing = self.obufs.remove(id); + let outgoing = self.obufs.remove(id); let mut tracker = self.scheduler.remove(id); self.connection_map.remove(&client_id); self.ackslog.remove(id); @@ -339,6 +339,7 @@ impl Router { // self.readyqueue.remove(id); let inflight_data_requests = self.datalog.clean(id); + let retransmissions = outgoing.retransmission_map(); // Remove this connection from subscriptions for filter in connection.subscriptions.iter() { @@ -367,6 +368,12 @@ impl Router { .into_iter() .for_each(|r| tracker.register_data_request(r)); + for request in tracker.data_requests.iter_mut() { + if let Some(cursor) = retransmissions.get(&request.filter_idx) { + request.cursor = *cursor; + } + } + self.graveyard .save(tracker, connection.subscriptions, connection.events); } else { @@ -1058,10 +1065,10 @@ fn forward_device_data( } // Fill and notify device data - let forwards = publishes.into_iter().map(|mut publish| { + let forwards = publishes.into_iter().map(|(mut publish, offset)| { publish.qos = protocol::qos(qos).unwrap(); Forward { - cursor: next, + cursor: offset, size: 0, publish, } diff --git a/rumqttd/src/segments/mod.rs b/rumqttd/src/segments/mod.rs index 17126c3f3..e988afaba 100644 --- a/rumqttd/src/segments/mod.rs +++ b/rumqttd/src/segments/mod.rs @@ -1,3 +1,4 @@ +use crate::Offset; use std::usize; use std::{collections::VecDeque, io}; @@ -127,7 +128,7 @@ where #[allow(dead_code)] #[inline] pub fn len(&self) -> usize { - self.segments.len() as usize + self.segments.len() } /// Number of packets @@ -186,7 +187,7 @@ where &self, mut start: (u64, u64), mut len: u64, - out: &mut Vec, + out: &mut Vec<(T, Offset)>, ) -> io::Result { let mut cursor = start; let _orig_cursor = cursor; @@ -221,7 +222,7 @@ where // `Segment::readv` handles conversion from absolute index to relative // index and it returns the absolute offset. // absolute cursor not to be confused with absolute offset - match curr_segment.readv(cursor.1, len, out)? { + match curr_segment.readv(cursor, len, out)? { // an offset returned -> we didn't read till end -> len fulfilled -> return SegmentPosition::Next(offset) => { return Ok(Position::Next { @@ -261,7 +262,7 @@ where // segment's `readv` then we should return `None` as well as not possible to read further, // whereas for older segments we simply jump onto the new one to read more. - match curr_segment.readv(cursor.1, len, out)? { + match curr_segment.readv(cursor, len, out)? { SegmentPosition::Next(v) => { // debug!("start: {:?}, end: ({}, {})", orig_cursor, cursor.0, cursor.1 + v - 1); Ok(Position::Next { @@ -290,10 +291,10 @@ mod tests { Bytes::from(vec![id; size as usize]) } - fn verify(expected_id: usize, expected_size: u64, out: Bytes) { + fn verify(expected_id: usize, expected_size: u64, out: (Bytes, Offset)) { let expected = Bytes::from(vec![expected_id as u8; expected_size as usize]); // dbg!(expected_id, &expected); - assert_eq!(out, expected); + assert_eq!(out.0, expected); } #[test] diff --git a/rumqttd/src/segments/segment.rs b/rumqttd/src/segments/segment.rs index 19be75dd8..66fc36e83 100644 --- a/rumqttd/src/segments/segment.rs +++ b/rumqttd/src/segments/segment.rs @@ -1,3 +1,5 @@ +use crate::{Cursor, Offset}; + use super::Storage; use std::io; @@ -57,13 +59,13 @@ where /// relative offset to absolute offset and vice-versa. pub(crate) fn readv( &self, - absolute_index: u64, + cursor: Cursor, len: u64, - out: &mut Vec, + out: &mut Vec<(T, Offset)>, ) -> io::Result { // this substraction can never overflow as checking of offset happens at // `CommitLog::readv`. - let idx = absolute_index - self.absolute_offset; + let idx = cursor.1 - self.absolute_offset; let mut ret: Option; @@ -78,7 +80,12 @@ where ret = None; limit = self.len(); } - out.extend(self.data[idx as usize..limit as usize].iter().cloned()); + let offsets = std::iter::repeat(cursor.0).zip(cursor.1..cursor.1 + limit); + let o = self.data[idx as usize..limit as usize] + .iter() + .cloned() + .zip(offsets); + out.extend(o); } match ret { @@ -135,11 +142,14 @@ mod tests { segment.push(Bytes::from_static(b"test9")); assert_eq!(segment.len(), 9); - let mut out: Vec = Vec::new(); - let _ = segment.readv(0, 2, &mut out).unwrap(); + let mut out: Vec<(Bytes, Offset)> = Vec::new(); + let _ = segment.readv((0, 0), 2, &mut out).unwrap(); assert_eq!( out, - vec![Bytes::from_static(b"test1"), Bytes::from_static(b"test2")] + vec![ + (Bytes::from_static(b"test1"), (0, 0)), + (Bytes::from_static(b"test2"), (0, 1)) + ] ); } @@ -157,8 +167,8 @@ mod tests { segment.push(vec![9u8]); assert_eq!(segment.len(), 9); - let mut out: Vec> = Vec::new(); - let _ = segment.readv(0, 2, &mut out).unwrap(); - assert_eq!(out, vec![vec![1u8], vec![2u8]]); + let mut out: Vec<(Vec, Offset)> = Vec::new(); + let _ = segment.readv((0, 0), 2, &mut out).unwrap(); + assert_eq!(out, vec![(vec![1u8], (0, 0)), (vec![2u8], (0, 1))]); } }