Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset cursor of unacked packets in DataRequest #534

Merged
merged 5 commits into from Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 17 additions & 3 deletions rumqttd/src/router/iobufs.rs
@@ -1,4 +1,7 @@
use std::{collections::VecDeque, sync::Arc};
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
};

use flume::{Receiver, Sender};
use parking_lot::Mutex;
Expand Down Expand Up @@ -56,7 +59,7 @@ pub struct Outgoing {
/// Handle which is given to router to allow router to communicate with this connection
pub(crate) handle: Sender<()>,
/// The buffer to keep track of inflight packets.
inflight_buffer: VecDeque<(u16, FilterIdx, Cursor)>,
pub(crate) inflight_buffer: VecDeque<(u16, FilterIdx, Cursor)>,
h3nill marked this conversation as resolved.
Show resolved Hide resolved
/// Last packet id
last_pkid: u16,
/// Metrics of outgoing messages of this connection
Expand All @@ -68,7 +71,7 @@ impl Outgoing {
pub(crate) fn new(client_id: String) -> (Self, Receiver<()>) {
let (handle, rx) = flume::bounded(MAX_CHANNEL_CAPACITY);
let data_buffer = VecDeque::with_capacity(MAX_CHANNEL_CAPACITY);
let inflight_buffer = VecDeque::with_capacity(MAX_INFLIGHT as usize);
let inflight_buffer = VecDeque::with_capacity(MAX_INFLIGHT);

// Ensure that there won't be any new allocations
assert!(MAX_INFLIGHT <= inflight_buffer.capacity());
Expand Down Expand Up @@ -171,6 +174,17 @@ impl Outgoing {

Some(())
}

pub fn retrasmission_map(&self) -> HashMap<FilterIdx, Cursor> {
h3nill marked this conversation as resolved.
Show resolved Hide resolved
let mut o = HashMap::new();
for (_, filter_idx, cursor) in self.inflight_buffer.iter() {
if !o.contains_key(filter_idx) {
o.insert(*filter_idx, *cursor);
}
}

o
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion rumqttd/src/router/logs.rs
Expand Up @@ -152,7 +152,7 @@ impl DataLog {
filter_idx: FilterIdx,
offset: Offset,
len: u64,
) -> io::Result<(Position, Vec<Publish>)> {
) -> io::Result<(Position, Vec<(Publish, Offset)>)> {
// unwrap to get index of `self.native` is fine here, because when a new subscribe packet
// arrives in `Router::handle_device_payload`, it first calls the function
// `next_native_offset` which creates a new commitlog if one doesn't exist. So any new
Expand Down
16 changes: 13 additions & 3 deletions rumqttd/src/router/routing.rs
Expand Up @@ -328,7 +328,7 @@ impl Router {
// Remove connection from router
let mut connection = self.connections.remove(id);
let _incoming = self.ibufs.remove(id);
let _outgoing = self.obufs.remove(id);
let outgoing = self.obufs.remove(id);
let mut tracker = self.scheduler.remove(id);
self.connection_map.remove(&client_id);
self.ackslog.remove(id);
Expand All @@ -339,6 +339,7 @@ impl Router {
// self.readyqueue.remove(id);

let inflight_data_requests = self.datalog.clean(id);
let retransmissions = outgoing.retrasmission_map();

// Remove this connection from subscriptions
for filter in connection.subscriptions.iter() {
Expand Down Expand Up @@ -367,6 +368,15 @@ impl Router {
.into_iter()
.for_each(|r| tracker.register_data_request(r));

debug!("{:?}, {:?} ", &tracker, retransmissions);
h3nill marked this conversation as resolved.
Show resolved Hide resolved

for request in tracker.data_requests.iter_mut() {
if let Some(cursor) = retransmissions.get(&request.filter_idx) {
request.cursor = *cursor;
}
}

debug!("{:?}, {:?} ", &tracker, retransmissions);
h3nill marked this conversation as resolved.
Show resolved Hide resolved
self.graveyard
.save(tracker, connection.subscriptions, connection.events);
} else {
Expand Down Expand Up @@ -1058,10 +1068,10 @@ fn forward_device_data(
}

// Fill and notify device data
let forwards = publishes.into_iter().map(|mut publish| {
let forwards = publishes.into_iter().map(|(mut publish, offset)| {
publish.qos = protocol::qos(qos).unwrap();
Forward {
cursor: next,
cursor: offset,
size: 0,
publish,
}
Expand Down
13 changes: 7 additions & 6 deletions rumqttd/src/segments/mod.rs
@@ -1,3 +1,4 @@
use crate::Offset;
use std::usize;
use std::{collections::VecDeque, io};

Expand Down Expand Up @@ -127,7 +128,7 @@ where
#[allow(dead_code)]
#[inline]
pub fn len(&self) -> usize {
self.segments.len() as usize
self.segments.len()
}

/// Number of packets
Expand Down Expand Up @@ -186,7 +187,7 @@ where
&self,
mut start: (u64, u64),
mut len: u64,
out: &mut Vec<T>,
out: &mut Vec<(T, Offset)>,
) -> io::Result<Position> {
let mut cursor = start;
let _orig_cursor = cursor;
Expand Down Expand Up @@ -221,7 +222,7 @@ where
// `Segment::readv` handles conversion from absolute index to relative
// index and it returns the absolute offset.
// absolute cursor not to be confused with absolute offset
match curr_segment.readv(cursor.1, len, out)? {
match curr_segment.readv(cursor, len, out)? {
// an offset returned -> we didn't read till end -> len fulfilled -> return
SegmentPosition::Next(offset) => {
return Ok(Position::Next {
Expand Down Expand Up @@ -261,7 +262,7 @@ where
// segment's `readv` then we should return `None` as well as not possible to read further,
// whereas for older segments we simply jump onto the new one to read more.

match curr_segment.readv(cursor.1, len, out)? {
match curr_segment.readv(cursor, len, out)? {
SegmentPosition::Next(v) => {
// debug!("start: {:?}, end: ({}, {})", orig_cursor, cursor.0, cursor.1 + v - 1);
Ok(Position::Next {
Expand Down Expand Up @@ -290,10 +291,10 @@ mod tests {
Bytes::from(vec![id; size as usize])
}

fn verify(expected_id: usize, expected_size: u64, out: Bytes) {
fn verify(expected_id: usize, expected_size: u64, out: (Bytes, Offset)) {
let expected = Bytes::from(vec![expected_id as u8; expected_size as usize]);
// dbg!(expected_id, &expected);
assert_eq!(out, expected);
assert_eq!(out.0, expected);
}

#[test]
Expand Down
30 changes: 20 additions & 10 deletions rumqttd/src/segments/segment.rs
@@ -1,3 +1,5 @@
use crate::{Cursor, Offset};

use super::Storage;
use std::io;

Expand Down Expand Up @@ -57,13 +59,13 @@ where
/// relative offset to absolute offset and vice-versa.
pub(crate) fn readv(
&self,
absolute_index: u64,
cursor: Cursor,
len: u64,
out: &mut Vec<T>,
out: &mut Vec<(T, Offset)>,
) -> io::Result<SegmentPosition> {
// this substraction can never overflow as checking of offset happens at
// `CommitLog::readv`.
let idx = absolute_index - self.absolute_offset;
let idx = cursor.1 - self.absolute_offset;

let mut ret: Option<u64>;

Expand All @@ -78,7 +80,12 @@ where
ret = None;
limit = self.len();
}
out.extend(self.data[idx as usize..limit as usize].iter().cloned());
let offsets = std::iter::repeat(cursor.0).zip(cursor.1..cursor.1 + limit);
let o = self.data[idx as usize..limit as usize]
.iter()
.cloned()
.zip(offsets);
out.extend(o);
}

match ret {
Expand Down Expand Up @@ -135,11 +142,14 @@ mod tests {
segment.push(Bytes::from_static(b"test9"));
assert_eq!(segment.len(), 9);

let mut out: Vec<Bytes> = Vec::new();
let _ = segment.readv(0, 2, &mut out).unwrap();
h3nill marked this conversation as resolved.
Show resolved Hide resolved
let mut out: Vec<(Bytes, Offset)> = Vec::new();
let _ = segment.readv((0, 0), 2, &mut out).unwrap();
assert_eq!(
out,
vec![Bytes::from_static(b"test1"), Bytes::from_static(b"test2")]
vec![
(Bytes::from_static(b"test1"), (0, 0)),
(Bytes::from_static(b"test2"), (0, 1))
]
);
}

Expand All @@ -157,8 +167,8 @@ mod tests {
segment.push(vec![9u8]);
assert_eq!(segment.len(), 9);

let mut out: Vec<Vec<u8>> = Vec::new();
let _ = segment.readv(0, 2, &mut out).unwrap();
assert_eq!(out, vec![vec![1u8], vec![2u8]]);
let mut out: Vec<(Vec<u8>, Offset)> = Vec::new();
let _ = segment.readv((0, 0), 2, &mut out).unwrap();
assert_eq!(out, vec![(vec![1u8], (0, 0)), (vec![2u8], (0, 1))]);
}
}