Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset cursor of unacked packets in DataRequest #534

Merged
merged 5 commits into from Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 48 additions & 2 deletions rumqttd/src/router/iobufs.rs
@@ -1,4 +1,7 @@
use std::{collections::VecDeque, sync::Arc};
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
};

use flume::{Receiver, Sender};
use parking_lot::Mutex;
Expand Down Expand Up @@ -68,7 +71,7 @@ impl Outgoing {
pub(crate) fn new(client_id: String) -> (Self, Receiver<()>) {
let (handle, rx) = flume::bounded(MAX_CHANNEL_CAPACITY);
let data_buffer = VecDeque::with_capacity(MAX_CHANNEL_CAPACITY);
let inflight_buffer = VecDeque::with_capacity(MAX_INFLIGHT as usize);
let inflight_buffer = VecDeque::with_capacity(MAX_INFLIGHT);

// Ensure that there won't be any new allocations
assert!(MAX_INFLIGHT <= inflight_buffer.capacity());
Expand Down Expand Up @@ -171,10 +174,53 @@ impl Outgoing {

Some(())
}

// Here we are assuming that the first unique filter_idx we find while iterating will have the
// least corresponding cursor because of the way we insert into the inflight_buffer
pub fn retransmission_map(&self) -> HashMap<FilterIdx, Cursor> {
let mut o = HashMap::new();
for (_, filter_idx, cursor) in self.inflight_buffer.iter() {
if !o.contains_key(filter_idx) {
o.insert(*filter_idx, *cursor);
}
}

o
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn retransmission_map_is_calculated_accurately() {
let (mut outgoing, _) = Outgoing::new("retransmission-test".to_string());
let mut result = HashMap::new();

result.insert(0, (0, 8));
result.insert(1, (0, 1));
result.insert(2, (1, 1));
result.insert(3, (1, 0));

let buf = vec![
(1, 0, (0, 8)),
(1, 0, (0, 10)),
(1, 1, (0, 1)),
(3, 1, (0, 4)),
(2, 2, (1, 1)),
(1, 2, (2, 6)),
(1, 2, (2, 1)),
(1, 3, (1, 0)),
(1, 3, (1, 1)),
(1, 3, (1, 3)),
(1, 3, (1, 3)),
];

outgoing.inflight_buffer.extend(buf);
assert_eq!(outgoing.retransmission_map(), result);
}

// use super::{Outgoing, MAX_INFLIGHT};
// use crate::protocol::{Publish, QoS};
// use crate::router::Forward;
Expand Down
2 changes: 1 addition & 1 deletion rumqttd/src/router/logs.rs
Expand Up @@ -152,7 +152,7 @@ impl DataLog {
filter_idx: FilterIdx,
offset: Offset,
len: u64,
) -> io::Result<(Position, Vec<Publish>)> {
) -> io::Result<(Position, Vec<(Publish, Offset)>)> {
// unwrap to get index of `self.native` is fine here, because when a new subscribe packet
// arrives in `Router::handle_device_payload`, it first calls the function
// `next_native_offset` which creates a new commitlog if one doesn't exist. So any new
Expand Down
13 changes: 10 additions & 3 deletions rumqttd/src/router/routing.rs
Expand Up @@ -328,7 +328,7 @@ impl Router {
// Remove connection from router
let mut connection = self.connections.remove(id);
let _incoming = self.ibufs.remove(id);
let _outgoing = self.obufs.remove(id);
let outgoing = self.obufs.remove(id);
let mut tracker = self.scheduler.remove(id);
self.connection_map.remove(&client_id);
self.ackslog.remove(id);
Expand All @@ -339,6 +339,7 @@ impl Router {
// self.readyqueue.remove(id);

let inflight_data_requests = self.datalog.clean(id);
let retransmissions = outgoing.retransmission_map();

// Remove this connection from subscriptions
for filter in connection.subscriptions.iter() {
Expand Down Expand Up @@ -367,6 +368,12 @@ impl Router {
.into_iter()
.for_each(|r| tracker.register_data_request(r));

for request in tracker.data_requests.iter_mut() {
if let Some(cursor) = retransmissions.get(&request.filter_idx) {
request.cursor = *cursor;
}
}

self.graveyard
.save(tracker, connection.subscriptions, connection.events);
} else {
Expand Down Expand Up @@ -1058,10 +1065,10 @@ fn forward_device_data(
}

// Fill and notify device data
let forwards = publishes.into_iter().map(|mut publish| {
let forwards = publishes.into_iter().map(|(mut publish, offset)| {
publish.qos = protocol::qos(qos).unwrap();
Forward {
cursor: next,
cursor: offset,
size: 0,
publish,
}
Expand Down
13 changes: 7 additions & 6 deletions rumqttd/src/segments/mod.rs
@@ -1,3 +1,4 @@
use crate::Offset;
use std::usize;
use std::{collections::VecDeque, io};

Expand Down Expand Up @@ -127,7 +128,7 @@ where
#[allow(dead_code)]
#[inline]
pub fn len(&self) -> usize {
self.segments.len() as usize
self.segments.len()
}

/// Number of packets
Expand Down Expand Up @@ -186,7 +187,7 @@ where
&self,
mut start: (u64, u64),
mut len: u64,
out: &mut Vec<T>,
out: &mut Vec<(T, Offset)>,
) -> io::Result<Position> {
let mut cursor = start;
let _orig_cursor = cursor;
Expand Down Expand Up @@ -221,7 +222,7 @@ where
// `Segment::readv` handles conversion from absolute index to relative
// index and it returns the absolute offset.
// absolute cursor not to be confused with absolute offset
match curr_segment.readv(cursor.1, len, out)? {
match curr_segment.readv(cursor, len, out)? {
// an offset returned -> we didn't read till end -> len fulfilled -> return
SegmentPosition::Next(offset) => {
return Ok(Position::Next {
Expand Down Expand Up @@ -261,7 +262,7 @@ where
// segment's `readv` then we should return `None` as well as not possible to read further,
// whereas for older segments we simply jump onto the new one to read more.

match curr_segment.readv(cursor.1, len, out)? {
match curr_segment.readv(cursor, len, out)? {
SegmentPosition::Next(v) => {
// debug!("start: {:?}, end: ({}, {})", orig_cursor, cursor.0, cursor.1 + v - 1);
Ok(Position::Next {
Expand Down Expand Up @@ -290,10 +291,10 @@ mod tests {
Bytes::from(vec![id; size as usize])
}

fn verify(expected_id: usize, expected_size: u64, out: Bytes) {
fn verify(expected_id: usize, expected_size: u64, out: (Bytes, Offset)) {
let expected = Bytes::from(vec![expected_id as u8; expected_size as usize]);
// dbg!(expected_id, &expected);
assert_eq!(out, expected);
assert_eq!(out.0, expected);
}

#[test]
Expand Down
30 changes: 20 additions & 10 deletions rumqttd/src/segments/segment.rs
@@ -1,3 +1,5 @@
use crate::{Cursor, Offset};

use super::Storage;
use std::io;

Expand Down Expand Up @@ -57,13 +59,13 @@ where
/// relative offset to absolute offset and vice-versa.
pub(crate) fn readv(
&self,
absolute_index: u64,
cursor: Cursor,
len: u64,
out: &mut Vec<T>,
out: &mut Vec<(T, Offset)>,
) -> io::Result<SegmentPosition> {
// this substraction can never overflow as checking of offset happens at
// `CommitLog::readv`.
let idx = absolute_index - self.absolute_offset;
let idx = cursor.1 - self.absolute_offset;

let mut ret: Option<u64>;

Expand All @@ -78,7 +80,12 @@ where
ret = None;
limit = self.len();
}
out.extend(self.data[idx as usize..limit as usize].iter().cloned());
let offsets = std::iter::repeat(cursor.0).zip(cursor.1..cursor.1 + limit);
let o = self.data[idx as usize..limit as usize]
.iter()
.cloned()
.zip(offsets);
out.extend(o);
}

match ret {
Expand Down Expand Up @@ -135,11 +142,14 @@ mod tests {
segment.push(Bytes::from_static(b"test9"));
assert_eq!(segment.len(), 9);

let mut out: Vec<Bytes> = Vec::new();
let _ = segment.readv(0, 2, &mut out).unwrap();
h3nill marked this conversation as resolved.
Show resolved Hide resolved
let mut out: Vec<(Bytes, Offset)> = Vec::new();
let _ = segment.readv((0, 0), 2, &mut out).unwrap();
assert_eq!(
out,
vec![Bytes::from_static(b"test1"), Bytes::from_static(b"test2")]
vec![
(Bytes::from_static(b"test1"), (0, 0)),
(Bytes::from_static(b"test2"), (0, 1))
]
);
}

Expand All @@ -157,8 +167,8 @@ mod tests {
segment.push(vec![9u8]);
assert_eq!(segment.len(), 9);

let mut out: Vec<Vec<u8>> = Vec::new();
let _ = segment.readv(0, 2, &mut out).unwrap();
assert_eq!(out, vec![vec![1u8], vec![2u8]]);
let mut out: Vec<(Vec<u8>, Offset)> = Vec::new();
let _ = segment.readv((0, 0), 2, &mut out).unwrap();
assert_eq!(out, vec![(vec![1u8], (0, 0)), (vec![2u8], (0, 1))]);
}
}