forked from bytebeamio/rumqtt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mod.rs
690 lines (620 loc) · 22 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
use std::usize;
use std::{collections::VecDeque, io};
mod segment;
pub mod utils;
use segment::{Segment, SegmentPosition};
use tracing::warn;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum Position {
Next { start: (u64, u64), end: (u64, u64) },
Done { start: (u64, u64), end: (u64, u64) },
}
pub trait Storage {
fn size(&self) -> usize;
}
/// There are 3 limits which are enforced:
/// - limit on size of each segment created by this log in bytes (this will not be enforced on
/// logs which are already existing in the directory provided for disk persistence)
/// - limit on number of segments in memory
/// - limit on number of segments stored in disk
///
/// When the active_segment if filled up, we move it to memory segments and empty it for new logs.
/// When limit on number of memory segments is reached, we move a single segment from memory onto
/// the disk. When limit on number of elements on disk is also reached, we move a single segment
/// from memory onto the disk.
///
/// This shifting of segments happens everytime the limit on size of segment exceeds the limit.
/// Note that the size of segment might go beyond the limit if the single last log was put at the
/// offset which is within the limit but logs size was large enough to be beyond the limit. Only
/// when another log is appended will we flush the active segment onto memory segments.
///
/// ### Invariants
/// - The active segment should have index `tail`.
/// - Segment throughout should be contiguous in their indices, even if it means that some files on
/// disk are invalid.
/// - The total size in bytes for each segment in memory should not increase beyond the
/// max_segment_size by more than the overflowing bytes of the last packet.
///
/// ### Seperation of implementation
/// - `index` & `segment` - everything directly related to files, no bounds check except when bounds
/// exceed file's existing size
/// - `chunk` - abstraction to deal with index and segment combined. Basically we only need
/// stuff from segment file, and thus hides away the index file under abstraction.
/// - `segment` - abstracts away the disk and memory segments for ease of access.
/// - 'disk_handle' - hold onto the things needed when dealing with disk, for example the
/// hashing struct, path, files with invalid names etc.
pub struct CommitLog<T> {
/// The index at which segments start.
head: u64,
/// The index at which the current active segment is, and also marks the last valid segment as
/// well as last segment in memory.
tail: u64,
/// Maximum size of any segment in memory in bytes.
max_segment_size: usize,
/// Maximum number of segments in memory, apart from the active segment.
max_mem_segments: usize,
/// Total size of active segment, used for enforcing the contraints.
segments: VecDeque<Segment<T>>,
}
impl<T> CommitLog<T>
where
T: Storage + Clone,
{
/// Create a new `CommitLog` with the given contraints. If `None` if passed in for `disk`
/// parameter, no disk persistence is provided. If `max_mem_segments` is 0, then only the
/// active segment is maintained.
///
/// `disk` is an optional argument, which is an 2-tuple whose first element is the path of the
/// directory you want to keep logs in and second argument is the maximum number of segments
/// allowed to be stored on the disk.
///
/// If disk is opened and the limit on disk size is reached, the head file will be deleted from
/// filesystem as well.
pub fn new(max_segment_size: usize, max_mem_segments: usize) -> io::Result<Self> {
if max_segment_size < 1024 {
panic!("given max_segment_size {} bytes < 1KB", max_segment_size);
}
if max_mem_segments < 1 {
panic!("atleast 1 segment needs to exist in memory else what's the point of log");
}
let mut segments = VecDeque::with_capacity(max_mem_segments);
segments.push_back(Segment::new());
Ok(Self {
head: 0,
tail: 0,
max_segment_size,
max_mem_segments,
segments,
})
}
#[inline]
pub fn next_offset(&self) -> (u64, u64) {
// `unwrap` fine as we are guaranteed that active segment always exist and is at the end
(self.tail, self.active_segment().next_offset())
}
#[inline]
pub fn head_and_tail(&self) -> (u64, u64) {
(self.head, self.tail)
}
#[inline]
pub fn memory_segments_count(&self) -> usize {
self.segments.len()
}
/// Size of data in all the segments
#[allow(dead_code)]
pub fn size(&self) -> u64 {
let mut size = 0;
for segment in self.segments.iter() {
size += segment.size();
}
size
}
/// Number of segments
#[allow(dead_code)]
#[inline]
pub fn len(&self) -> usize {
self.segments.len()
}
/// Number of packets
#[inline]
#[allow(dead_code)]
pub fn entries(&self) -> u64 {
self.active_segment().next_offset()
}
#[inline]
fn active_segment(&self) -> &Segment<T> {
self.segments.back().unwrap()
}
#[inline]
fn active_segment_mut(&mut self) -> &mut Segment<T> {
self.segments.back_mut().unwrap()
}
/// Append a new [`T`] to the active segment.
#[inline]
pub fn append(&mut self, message: T) -> (u64, u64) {
self.apply_retention();
let active_segment = self.active_segment_mut();
active_segment.push(message);
let absolute_offset = self.active_segment().next_offset();
(self.tail, absolute_offset)
}
fn apply_retention(&mut self) {
if self.active_segment().size() >= self.max_segment_size as u64 {
// If active segment is full and segments are full, apply retention policy
if self.memory_segments_count() >= self.max_mem_segments {
self.segments.pop_front();
self.head += 1;
}
// Pushing a new segment into segments and updating tail automatically changes active
// segment to new empty one.
let absolute_offset = self.active_segment().next_offset();
self.segments
.push_back(Segment::with_offset(absolute_offset));
self.tail += 1;
}
}
#[inline]
pub fn last(&self) -> Option<T> {
self.active_segment().last()
}
/// Read `len` Ts at once. More efficient that reading 1 at a time. Returns
/// the next offset to read data from. The Position::start returned need not
/// be a valid index if the start given is not valid either.
pub fn readv(
&self,
mut start: (u64, u64),
mut len: u64,
out: &mut Vec<T>,
) -> io::Result<Position> {
let mut cursor = start;
let _orig_cursor = cursor;
if cursor.0 > self.tail {
return Ok(Position::Done { start, end: start });
}
if cursor.0 < self.head {
let head_absolute_offset = self.segments.front().unwrap().absolute_offset;
warn!(
"given index {} less than head {}, jumping to head",
cursor.0, head_absolute_offset
);
cursor = (self.head, head_absolute_offset);
start = cursor;
}
let mut idx = (cursor.0 - self.head) as usize;
let mut curr_segment = &self.segments[idx];
if curr_segment.absolute_offset > cursor.1 {
warn!(
"offset specified {} if less than actual {}, jumping",
cursor.1, curr_segment.absolute_offset
);
start.1 = curr_segment.absolute_offset;
cursor.1 = curr_segment.absolute_offset;
}
while cursor.0 < self.tail {
// `Segment::readv` handles conversion from absolute index to relative
// index and it returns the absolute offset.
// absolute cursor not to be confused with absolute offset
match curr_segment.readv(cursor.1, len, out)? {
// an offset returned -> we didn't read till end -> len fulfilled -> return
SegmentPosition::Next(offset) => {
return Ok(Position::Next {
start,
end: (cursor.0, offset),
});
}
// no offset returned -> we reached end / invalid file
// if len unfulfilled -> try next segment with remaining length
SegmentPosition::Done(next_offset) => {
// this condition is needed in case cursor.1 > 0 (when user provies cursor.1
// beyond segment's last offset which can happen due to next readv offset
// being off by 1 before jumping to next segment or while manually reading
// from a particular cursor). In such case, no. of read data points is
// 0 and hence we don't decrement len.
if next_offset >= cursor.1 {
len -= next_offset - cursor.1;
}
cursor = (cursor.0 + 1, next_offset);
}
}
if len == 0 {
// debug!("start: {:?}, end: ({}, {})", orig_cursor, cursor.0, cursor.1 - 1);
return Ok(Position::Next { start, end: cursor });
}
idx += 1;
curr_segment = &self.segments[idx];
}
if curr_segment.next_offset() <= cursor.1 {
return Ok(Position::Done { start, end: cursor });
}
// we need to read seperately from active segment because if `None` returned for active
// segment's `readv` then we should return `None` as well as not possible to read further,
// whereas for older segments we simply jump onto the new one to read more.
match curr_segment.readv(cursor.1, len, out)? {
SegmentPosition::Next(v) => {
// debug!("start: {:?}, end: ({}, {})", orig_cursor, cursor.0, cursor.1 + v - 1);
Ok(Position::Next {
start,
end: (cursor.0, v),
})
}
SegmentPosition::Done(absolute_offset) => {
// debug!("start: {:?}, end: ({}, {}) done", orig_cursor, cursor.0, absolute_offset);
Ok(Position::Done {
start,
end: (cursor.0, absolute_offset),
})
}
}
}
}
#[cfg(test)]
mod tests {
use super::{Position::*, *};
use bytes::Bytes;
use pretty_assertions::assert_eq;
fn random_payload(id: u8, size: u64) -> Bytes {
Bytes::from(vec![id; size as usize])
}
fn verify(expected_id: usize, expected_size: u64, out: Bytes) {
let expected = Bytes::from(vec![expected_id as u8; expected_size as usize]);
// dbg!(expected_id, &expected);
assert_eq!(out, expected);
}
#[test]
fn reading_at_invalid_cursor_returns_none() {
// 1 as active only
let log: CommitLog<Bytes> = CommitLog::new(1024, 1).unwrap();
let mut out = Vec::new();
assert_eq!(log.head, 0);
assert_eq!(log.tail, 0);
assert_eq!(
log.readv((0, 1), 2, &mut out).unwrap(),
Done {
start: (0, 1),
end: (0, 1)
}
);
assert_eq!(
log.readv((100, 1), 2, &mut out).unwrap(),
Done {
start: (100, 1),
end: (100, 1)
}
);
}
#[test]
fn inmemory_appends_and_retention_policy_works() {
let max_segment_size = 1024 * 100; // 100K
let packet_size = 1024;
// 1 as active 1 as inactive but in mem
let mut log: CommitLog<Bytes> = CommitLog::new(max_segment_size, 2).unwrap();
// Fill the active segment
for i in 0..100 {
let offset = log.append(random_payload(i as u8, packet_size));
assert_eq!(offset, (0, i as u64 + 1))
}
assert_eq!(log.size(), max_segment_size as u64);
assert_eq!(log.head, 0);
assert_eq!(log.tail, 0);
assert_eq!(log.len(), 1);
// Append more data to trigger new segment creation
log.append(random_payload(100, packet_size));
assert_eq!(log.head, 0);
assert_eq!(log.tail, 1);
assert_eq!(log.len(), 2);
// Fill the rest of new active segment
for (i, v) in (101..200).enumerate() {
let offset = log.append(random_payload(v, packet_size));
assert_eq!(offset, (1, i as u64 + 102))
}
assert_eq!(log.head, 0);
assert_eq!(log.tail, 1);
assert_eq!(log.len(), 2);
// Append more data to trigger new segment creation and retention policy
log.append(random_payload(200, packet_size));
assert_eq!(log.head, 1);
assert_eq!(log.tail, 2);
assert_eq!(log.len(), 2);
}
#[test]
fn active_segment_appends_and_reads_works() {
let max_segment_size = 1024 * 100; // 100K
let packet_size: u64 = 1024;
// 1 as active only
let mut log = CommitLog::new(max_segment_size, 1).unwrap();
for i in 0..10 {
log.append(random_payload(i, packet_size));
}
assert_eq!(log.active_segment().len(), 10);
assert_eq!(log.active_segment().size(), packet_size * 10);
// Read one by one
let mut out = Vec::new();
for i in 0..10 {
let offset = i as u64;
let next = log.readv((0, offset), 1, &mut out).unwrap();
let data = out.pop().unwrap();
verify(i, packet_size, data);
if i == 9 {
assert_eq!(
next,
Done {
start: (0, 9),
end: (0, 10)
}
);
continue;
}
assert_eq!(
next,
Next {
start: (0, i as u64),
end: (0, i as u64 + 1)
}
);
}
// Read in bulk 1. Trying to read less than appended
let mut out = Vec::new();
let next = log.readv((0, 0), 5, &mut out).unwrap();
assert_eq!(out.len(), 5);
out.into_iter()
.enumerate()
.for_each(|(i, v)| verify(i, packet_size, v));
assert_eq!(
next,
Next {
start: (0, 0),
end: (0, 5)
}
);
// Read in bulk 2. Trying to read exactly appended elements
let mut out = Vec::new();
let next = log.readv((0, 0), 10, &mut out).unwrap();
assert_eq!(out.len(), 10);
out.into_iter()
.enumerate()
.for_each(|(i, v)| verify(i, packet_size, v));
assert_eq!(
next,
Done {
start: (0, 0),
end: (0, 10)
}
);
// Read in bulk 3. Trying to read greater than appended
let mut out = Vec::new();
let next = log.readv((0, 0), 20, &mut out).unwrap();
assert_eq!(out.len(), 10);
out.into_iter()
.enumerate()
.for_each(|(i, v)| verify(i, packet_size, v));
assert_eq!(
next,
Done {
start: (0, 0),
end: (0, 10)
}
);
// Read in bulk 4. Trying to read greater than appended but from middle of the segment
let mut out = Vec::new();
let next = log.readv((0, 5), 20, &mut out).unwrap();
assert_eq!(out.len(), 5);
out.into_iter()
.enumerate()
.for_each(|(i, v)| verify(i + 5, packet_size, v));
assert_eq!(
next,
Done {
start: (0, 5),
end: (0, 10)
}
);
// Read again after after appending again
let mut out = Vec::new();
let next = log.readv((0, 10), 20, &mut out).unwrap();
assert_eq!(
next,
Done {
start: (0, 10),
end: (0, 10)
}
);
for i in 10..20 {
log.append(random_payload(i, packet_size));
}
let next = log.readv((0, 10), 20, &mut out).unwrap();
assert_eq!(out.len(), 10);
out.into_iter()
.enumerate()
.for_each(|(i, v)| verify(i + 10, packet_size, v));
assert_eq!(
next,
Done {
start: (0, 10),
end: (0, 20)
}
);
}
#[test]
fn read_switch_from_active_to_inactive_to_active_segment_works() {
let max_segment_size = 1024 * 100; // 100K
let packet_size: u64 = 1024;
// 1 as active, 3 as inactive but in mem
let mut log = CommitLog::new(max_segment_size, 4).unwrap();
// Fill active segment
for i in 0..100 {
log.append(random_payload(i, packet_size));
}
assert_eq!(log.head, 0);
assert_eq!(log.tail, 0);
assert_eq!(log.active_segment().len(), 100);
assert_eq!(log.active_segment().size(), packet_size * 100);
// Read partially from active segment
let mut out = Vec::new();
let next = log.readv((0, 0), 50, &mut out).unwrap();
assert_eq!(out.len(), 50);
assert_eq!(
next,
Next {
start: (0, 0),
end: (0, 50)
}
);
// Fill with data worth 2 more segments. Active segment will change
let mut out = Vec::new();
for i in 0..200 {
log.append(random_payload(i, packet_size));
}
assert_eq!(log.head, 0);
assert_eq!(log.tail, 2);
// Read from previous next
let next = log.readv((0, 50), 50, &mut out).unwrap();
assert_eq!(
next,
Next {
start: (0, 50),
end: (1, 100)
}
);
let next = log.readv((1, 100), 100, &mut out).unwrap();
assert_eq!(
next,
Next {
start: (1, 100),
end: (2, 200)
}
);
let next = log.readv((2, 200), 100, &mut out).unwrap();
assert_eq!(
next,
Done {
start: (2, 200),
end: (2, 300)
}
);
}
#[test]
fn read_with_jumps_works() {
let max_segment_size = 1024 * 10; // 10K
let packet_size: u64 = 1024;
// 1 as active, 4 as inactive but in mem
let mut log = CommitLog::new(max_segment_size, 5).unwrap();
// Fill active segment + 3 more memory segments
for i in 0..40 {
log.append(random_payload(i, packet_size));
}
// One big jump
let mut out = Vec::new();
let next = log.readv((0, 0), 35, &mut out).unwrap();
assert_eq!(
next,
Next {
start: (0, 0),
end: (3, 35)
}
);
// Each readv less than segment count. Segment count = 10. Readv = 5
let mut out = Vec::new();
let next = log.readv((0, 0), 5, &mut out).unwrap();
assert_eq!(
next,
Next {
start: (0, 0),
end: (0, 5)
}
);
let next = log.readv((3, 5), 5, &mut out).unwrap();
assert_eq!(
next,
Next {
start: (3, 30),
end: (3, 35)
}
);
let next = log.readv((3, 40), 5, &mut out).unwrap();
assert_eq!(
next,
Done {
start: (3, 40),
end: (3, 40)
}
);
let next = log.readv((4, 40), 5, &mut out).unwrap();
assert_eq!(
next,
Done {
start: (4, 40),
end: (4, 40)
}
);
let next = log.readv((4, 41), 5, &mut out).unwrap();
assert_eq!(
next,
Done {
start: (4, 41),
end: (4, 41)
}
);
// Each readv greater than segment count. Segment count = 10. Readv = 5
let mut out = Vec::new();
let next = log.readv((0, 0), 15, &mut out).unwrap();
assert_eq!(
next,
Next {
start: (0, 0),
end: (1, 15)
}
);
let next = log.readv((1, 15), 15, &mut out).unwrap();
assert_eq!(
next,
Next {
start: (1, 15),
end: (3, 30)
}
);
let next = log.readv((3, 30), 10, &mut out).unwrap();
assert_eq!(
next,
Done {
start: (3, 30),
end: (3, 40)
}
);
}
#[test]
fn read_jump_from_deleted_segment_works() {
let max_segment_size = 1024 * 10; // 10K
let packet_size: u64 = 1024;
// 1 as active, 9 as inactive but in mem
let mut log = CommitLog::new(max_segment_size, 10).unwrap();
// Fill all 10 in memory segments
for i in 0..100 {
log.append(random_payload(i, packet_size));
}
assert_eq!(log.head, 0);
assert_eq!(log.tail, 9);
let mut out = Vec::new();
let next = log.readv((0, 0), 5, &mut out).unwrap();
assert_eq!(
next,
Next {
start: (0, 0),
end: (0, 5)
}
);
// Fill 10 more inmemory segment pushing previous into retention policy
for i in 0..100 {
log.append(random_payload(i, packet_size));
}
assert_eq!(log.head, 10);
assert_eq!(log.tail, 19);
let next = log.readv((0, 0), 5, &mut out).unwrap();
assert_eq!(
next,
Next {
start: (10, 100),
end: (10, 105)
}
);
}
}