Skip to content

Commit

Permalink
Merge pull request #1426 from spacejam/tyler_simplify_before_rewrite
Browse files Browse the repository at this point in the history
Remove zstd in anticipation for it being re-added through marble
  • Loading branch information
spacejam committed Oct 2, 2022
2 parents 96b2170 + b19e34d commit 69294e5
Show file tree
Hide file tree
Showing 23 changed files with 65 additions and 176 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
default.sled
crash_*
*db
*conf
*snap.*
Expand Down
9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ default = []
# test-only configurations that cause performance to drop significantly.
# It will cause your tests to take much more time, and possibly time out etc...
testing = ["event_log", "lock_free_delays", "light_testing"]
light_testing = ["compression", "failpoints", "backtrace", "memshred"]
compression = ["zstd"]
light_testing = ["failpoints", "backtrace", "memshred"]
lock_free_delays = []
failpoints = []
event_log = []
Expand All @@ -41,20 +40,20 @@ no_logs = ["log/max_level_off"]
no_inline = []
pretty_backtrace = ["color-backtrace"]
docs = []
no_zstd = []
miri_optimizations = []
mutex = []
memshred = []

[dependencies]
libc = "0.2.96"
zstd = { version = "0.11.2", optional = true }
crc32fast = "1.2.1"
log = "0.4.14"
parking_lot = "0.12.0"
parking_lot = "0.12.1"
color-backtrace = { version = "0.5.1", optional = true }
num-format = { version = "0.4.0", optional = true }
backtrace = { version = "0.3.60", optional = true }
im = "15.0.0"
im = "15.1.0"

[target.'cfg(any(target_os = "linux", target_os = "macos", target_os="windows"))'.dependencies]
fs2 = "0.4.3"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ extreme::run(async move {

# minimum supported Rust version (MSRV)

We support Rust 1.57.0 and up.
We support Rust 1.62 and up.

# architecture

Expand Down
1 change: 0 additions & 1 deletion benchmarks/stress2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ overflow-checks = true
default = []
lock_free_delays = ["sled/lock_free_delays"]
event_log = ["sled/event_log"]
compression = ["sled/compression"]
no_logs = ["sled/no_logs"]
metrics = ["sled/metrics"]
jemalloc = ["jemallocator"]
Expand Down
4 changes: 2 additions & 2 deletions scripts/cross_compile.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ rustup update --no-self-update

RUSTFLAGS="--cfg miri" cargo check

rustup toolchain install 1.57.0 --no-self-update
rustup toolchain install 1.62 --no-self-update
cargo clean
rm Cargo.lock
cargo +1.57.0 check
cargo +1.62 check

for target in $targets; do
echo "setting up $target..."
Expand Down
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ impl Config {
);
if self.use_compression {
supported!(
cfg!(feature = "compression"),
"the 'compression' feature must be enabled"
!cfg!(feature = "no_zstd"),
"the 'no_zstd' feature is set, but Config.use_compression is also set to true"
);
}
supported!(
Expand Down
3 changes: 1 addition & 2 deletions src/doc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
//! * forward and reverse iterators
//! * a monotonic ID generator capable of giving out 75-125+ million unique IDs
//! per second, never double allocating even in the presence of crashes
//! * [zstd](https://github.com/facebook/zstd) compression (use the zstd build
//! feature)
//! * [zstd](https://github.com/facebook/zstd) compression
//! * cpu-scalable lock-free implementation
//! * SSD-optimized log-structured storage
//!
Expand Down
4 changes: 2 additions & 2 deletions src/ebr/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl CompareAndSetOrdering for (Ordering, Ordering) {

/// Returns a bitmask containing the unused least significant bits of an aligned pointer to `T`.
#[inline]
fn low_bits<T: ?Sized + Pointable>() -> usize {
const fn low_bits<T: ?Sized + Pointable>() -> usize {
(1 << T::ALIGN.trailing_zeros()) - 1
}

Expand Down Expand Up @@ -720,7 +720,7 @@ impl<'g, T> Shared<'g, T> {

impl<'g, T: ?Sized + Pointable> Shared<'g, T> {
/// Returns a new null pointer.
pub(crate) fn null() -> Shared<'g, T> {
pub(crate) const fn null() -> Shared<'g, T> {
Shared { data: 0, _marker: PhantomData }
}

Expand Down
2 changes: 1 addition & 1 deletion src/ebr/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl Global {

for _ in 0..steps {
match self.queue.try_pop_if(
&|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
guard,
) {
None => break,
Expand Down
2 changes: 1 addition & 1 deletion src/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ fn multithreaded() {
}));
}

for t in threads.into_iter() {
for t in threads {
t.join().unwrap();
}

Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ mod compile_time_assertions {
use crate::*;

#[allow(unreachable_code)]
fn _assert_public_types_send_sync() {
const fn _assert_public_types_send_sync() {
_assert_send::<Subscriber>();

_assert_send_sync::<Iter>();
Expand All @@ -513,9 +513,9 @@ mod compile_time_assertions {
_assert_send_sync::<Mode>();
}

fn _assert_send<S: Send>() {}
const fn _assert_send<S: Send>() {}

fn _assert_send_sync<S: Send + Sync>() {}
const fn _assert_send_sync<S: Send + Sync>() {}
}

#[cfg(all(unix, not(miri)))]
Expand Down
6 changes: 1 addition & 5 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,10 @@ impl Metrics {
ret.push_str(&format!("hit ratio: {}%\n", hit_ratio));

ret.push_str(&format!("{}\n", "-".repeat(134)));
ret.push_str("serialization and compression:\n");
ret.push_str("serialization:\n");
ret.push_str(&p(vec![
lat("serialize", &self.serialize),
lat("deserialize", &self.deserialize),
#[cfg(feature = "compression")]
lat("compress", &self.compress),
#[cfg(feature = "compression")]
lat("decompress", &self.decompress),
]));

ret.push_str(&format!("{}\n", "-".repeat(134)));
Expand Down
2 changes: 1 addition & 1 deletion src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2629,7 +2629,7 @@ mod test {
}

let key_ref = KeyRef::Computed { base: &[2, 253], distance: 8 };
let mut buf = &mut [0, 0][..];
let buf = &mut [0, 0][..];
key_ref.write_into(buf);
assert_eq!(buf, &[3, 5]);
}
Expand Down
20 changes: 3 additions & 17 deletions src/pagecache/heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,18 +234,10 @@ impl Heap {
}
}

pub fn read(
&self,
heap_id: HeapId,
use_compression: bool,
) -> Result<(MessageKind, Vec<u8>)> {
pub fn read(&self, heap_id: HeapId) -> Result<(MessageKind, Vec<u8>)> {
log::trace!("Heap::read({:?})", heap_id);
let (slab_id, slab_idx, original_lsn) = heap_id.decompose();
self.slabs[slab_id as usize].read(
slab_idx,
original_lsn,
use_compression,
)
self.slabs[slab_id as usize].read(slab_idx, original_lsn)
}

pub fn free(&self, heap_id: HeapId) {
Expand Down Expand Up @@ -300,7 +292,6 @@ impl Slab {
&self,
slab_idx: SlabIdx,
original_lsn: Lsn,
use_compression: bool,
) -> Result<(MessageKind, Vec<u8>)> {
let bs = slab_id_to_size(self.slab_id);
let offset = u64::from(slab_idx) * bs;
Expand Down Expand Up @@ -332,12 +323,7 @@ impl Slab {
return Err(Error::corruption(None));
}
let buf = heap_buf[13..].to_vec();
let buf2 = if use_compression {
crate::pagecache::decompress(buf)
} else {
buf
};
Ok((MessageKind::from(heap_buf[0]), buf2))
Ok((MessageKind::from(heap_buf[0]), buf))
} else {
log::debug!(
"heap message CRC does not match contents. stored: {} actual: {}",
Expand Down
69 changes: 17 additions & 52 deletions src/pagecache/logger.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::fs::File;

use super::{
arr_to_lsn, arr_to_u32, assert_usize, decompress, header, iobuf,
lsn_to_arr, pread_exact, pread_exact_or_eof, roll_iobuf, u32_to_arr, Arc,
BasedBuf, DiskPtr, HeapId, IoBuf, IoBufs, LogKind, LogOffset, Lsn,
MessageKind, Reservation, Serialize, Snapshot, BATCH_MANIFEST_PID,
COUNTER_PID, MAX_MSG_HEADER_LEN, META_PID, SEG_HEADER_LEN,
arr_to_lsn, arr_to_u32, assert_usize, header, iobuf, lsn_to_arr,
pread_exact, pread_exact_or_eof, roll_iobuf, u32_to_arr, Arc, BasedBuf,
DiskPtr, HeapId, IoBuf, IoBufs, LogKind, LogOffset, Lsn, MessageKind,
Reservation, Serialize, Snapshot, BATCH_MANIFEST_PID, COUNTER_PID,
MAX_MSG_HEADER_LEN, META_PID, SEG_HEADER_LEN,
};

use crate::*;
Expand Down Expand Up @@ -70,18 +70,16 @@ impl Log {
// here because it might not still
// exist in the inline log.
let heap_id = ptr.heap_id().unwrap();
self.config.heap.read(heap_id, self.config.use_compression).map(
|(kind, buf)| {
let header = MessageHeader {
kind,
pid,
segment_number: expected_segment_number,
crc32: 0,
len: 0,
};
LogRead::Heap(header, buf, heap_id, 0)
},
)
self.config.heap.read(heap_id).map(|(kind, buf)| {
let header = MessageHeader {
kind,
pid,
segment_number: expected_segment_number,
crc32: 0,
len: 0,
};
LogRead::Heap(header, buf, heap_id, 0)
})
}
}

Expand Down Expand Up @@ -129,43 +127,13 @@ impl Log {
/// completed or aborted later. Useful for maintaining
/// linearizability across CAS operations that may need to
/// persist part of their operation.
#[allow(unused)]
pub fn reserve<T: Serialize + Debug>(
&self,
log_kind: LogKind,
pid: PageId,
item: &T,
guard: &Guard,
) -> Result<Reservation<'_>> {
#[cfg(feature = "compression")]
{
if self.config.use_compression && pid != BATCH_MANIFEST_PID {
use zstd::bulk::compress;

let buf = item.serialize();

#[cfg(feature = "metrics")]
let _measure = Measure::new(&M.compress);

let compressed_buf =
compress(&buf, self.config.compression_factor).unwrap();

let ret = self.reserve_inner(
log_kind,
pid,
&IVec::from(compressed_buf),
None,
guard,
);

if let Err(e) = &ret {
self.iobufs.set_global_error(*e);
}

return ret;
}
}

let ret = self.reserve_inner(log_kind, pid, item, None, guard);

if let Err(e) = &ret {
Expand Down Expand Up @@ -860,7 +828,7 @@ pub(crate) fn read_message<R: ReadAt>(
assert_eq!(buf.len(), 16);
let heap_id = HeapId::deserialize(&mut &buf[..]).unwrap();

match config.heap.read(heap_id, config.use_compression) {
match config.heap.read(heap_id) {
Ok((kind, buf2)) => {
assert_eq!(header.kind, kind);
trace!(
Expand All @@ -883,10 +851,7 @@ pub(crate) fn read_message<R: ReadAt>(
| MessageKind::Free
| MessageKind::Counter => {
trace!("read a successful inline message");
let buf2 =
if config.use_compression { decompress(buf) } else { buf };

Ok(LogRead::Inline(header, buf2, inline_len))
Ok(LogRead::Inline(header, buf, inline_len))
}
MessageKind::BatchManifest => {
assert_eq!(buf.len(), std::mem::size_of::<Lsn>());
Expand Down
26 changes: 0 additions & 26 deletions src/pagecache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,32 +230,6 @@ pub(crate) const fn u32_to_arr(number: u32) -> [u8; 4] {
number.to_le_bytes()
}

#[allow(clippy::needless_pass_by_value)]
#[allow(clippy::needless_return)]
pub(in crate::pagecache) fn decompress(in_buf: Vec<u8>) -> Vec<u8> {
#[cfg(feature = "compression")]
{
use zstd::stream::decode_all;

let scootable_in_buf = &mut &*in_buf;
let raw: IVec = IVec::deserialize(scootable_in_buf)
.expect("this had to be serialized with an extra length frame");
#[cfg(feature = "metrics")]
let _measure = Measure::new(&M.decompress);
let out_buf = decode_all(&raw[..]).expect(
"failed to decompress data. \
This is not expected, please open an issue on \
https://github.com/spacejam/sled so we can \
fix this critical issue ASAP. Thank you :)",
);

return out_buf;
}

#[cfg(not(feature = "compression"))]
in_buf
}

#[derive(Debug, Clone, Copy)]
pub struct NodeView<'g>(pub(crate) PageView<'g>);

Expand Down
3 changes: 1 addition & 2 deletions src/pagecache/reservation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ impl<'a> Reservation<'a> {

/// Refills the reservation buffer with new data.
/// Must supply a buffer of an identical length
/// as the one initially provided. Don't use this
/// on messages subject to compression etc...
/// as the one initially provided.
///
/// # Panics
///
Expand Down

0 comments on commit 69294e5

Please sign in to comment.