Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(scale): introduce deterministic scaling tests #5657

Merged
merged 25 commits into from
Oct 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,5 @@ src/tests/regress/output/*
src/tests/sync_point/slt/e2e_test
# generated e2e tests
e2e_test/generated
# generated scale tests by `cargo nextest`
scale-test.tar.zst
74 changes: 50 additions & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ members = [
"src/test_runner",
"src/tests/regress",
"src/tests/simulation",
"src/tests/simulation_scale",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer merging it into the existing simulation crate, maybe in a future PR.

"src/tests/sqlsmith",
"src/tests/sync_point",
"src/tracing",
Expand Down Expand Up @@ -74,10 +75,16 @@ overflow-checks = true
[profile.ci-dev]
inherits = "dev"
incremental = false
lto = 'off'
[profile.ci-dev.package."*"] # external dependencies
opt-level = 1

# The profile used for deterministic simulation tests in CI.
# The simulator can only run single-threaded, so optimization is required to make the running time
# reasonable. The optimization level is customized to speed up the build.
[profile.ci-sim]
inherits = "dev"
opt-level = 2

# Patch third-party crates for deterministic simulation.
[patch.crates-io]
quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "a819877" }
Expand Down
19 changes: 17 additions & 2 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -495,11 +495,26 @@ cargo nextest run \
"$@"
"""

[tasks.sarchive-scale-test]
category = "RiseDev - Archive simulation scaling tests"
description = "Archive integration scaling tests in deterministic simulation mode"
dependencies = ["warn-on-missing-tools"]
env = { RUSTFLAGS = "-Ctarget-cpu=native --cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim" }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't decide how to enable SIMD this way through environments. Thus, the results of JSON parsing might be different due to precision errors like #5487. 🥵

script = """
#!/bin/bash
set -e

cargo nextest archive \
-p risingwave_simulation_scale \
--archive-file scale-test.tar.zst \
"$@"
"""

[tasks.sslt]
category = "RiseDev - Deterministic Simulation End-to-end Test"
description = "Run e2e tests in deterministic simulation mode"
dependencies = ["warn-on-missing-tools"]
env = { RUSTFLAGS = "--cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim" }
env = { RUSTFLAGS = "-Ctarget-cpu=native --cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim" }
script = """
#!/bin/bash
set -e
Expand All @@ -511,7 +526,7 @@ cargo run -p risingwave_simulation "$@"
category = "RiseDev - Deterministic Simulation End-to-end Test"
description = "Run e2e tests in deterministic simulation mode and report code coverage"
dependencies = ["warn-on-missing-tools"]
env = { RUSTFLAGS = "--cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim-cov" }
env = { RUSTFLAGS = "-Ctarget-cpu=native --cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim-cov" }
script = """
#!/bin/bash
set -e
Expand Down
8 changes: 6 additions & 2 deletions ci/scripts/build-simulation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ echo "--- Generate RiseDev CI config"
cp ci/risedev-components.ci.env risedev-components.user.env

echo "--- Build deterministic simulation e2e test runner"
cargo make sslt --profile ci-release -- --help
cargo make sslt --profile ci-sim -- --help

echo "--- Build and archive deterministic scaling imulation tests"
cargo make sarchive-scale-test --cargo-profile ci-sim

echo "--- Upload artifacts"
cp target/sim/ci-release/risingwave_simulation ./risingwave_simulation
cp target/sim/ci-sim/risingwave_simulation ./risingwave_simulation
buildkite-agent artifact upload risingwave_simulation
buildkite-agent artifact upload scale-test.tar.zst
12 changes: 12 additions & 0 deletions ci/scripts/deterministic-scale-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash

# Exits as soon as any line fails.
set -euo pipefail

source ci/scripts/common.env.sh

echo "--- Download artifacts"
buildkite-agent artifact download scale-test.tar.zst .

echo "--- Run scaling tests in deterministic simulation mode"
MADSIM_TEST_NUM=5 cargo nextest run --archive-file scale-test.tar.zst --no-fail-fast
2 changes: 1 addition & 1 deletion ci/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ steps:
mount-buildkite-agent: true
timeout_in_minutes: 10
retry: *auto-retry

- label: "docslt"
command: "ci/scripts/docslt.sh"
key: "docslt"
Expand Down
14 changes: 13 additions & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ steps:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
timeout_in_minutes: 10
timeout_in_minutes: 15
retry: *auto-retry

- label: "docslt"
Expand Down Expand Up @@ -173,6 +173,18 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry

- label: "scaling test (deterministic simulation)"
command: "ci/scripts/deterministic-scale-test.sh"
depends_on: "build-simulation"
plugins:
- gencer/cache#v2.4.10: *cargo-cache
- docker-compose#v3.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end test (deterministic simulation)"
command: "timeout 8m ci/scripts/deterministic-e2e-test.sh"
depends_on: "build-simulation"
Expand Down
5 changes: 2 additions & 3 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,9 @@ pub type ConnectorState = Option<Vec<SplitImpl>>;
/// [`NexmarkSplitReader`] in case that they are CPU intensive and may block the streaming actors.
pub fn spawn_data_generation_stream<T: Send + 'static>(
stream: impl Stream<Item = T> + Send + 'static,
buffer_size: usize,
) -> impl Stream<Item = T> + Send + 'static {
const GENERATION_BUFFER: usize = 1000;

let (generation_tx, generation_rx) = mpsc::channel(GENERATION_BUFFER);
let (generation_tx, generation_rx) = mpsc::channel(buffer_size);
RUNTIME.spawn(async move {
pin_mut!(stream);
while let Some(result) = stream.next().await {
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ impl SplitReader for DatagenSplitReader {
}

fn into_stream(self) -> BoxSourceStream {
spawn_data_generation_stream(self.generator.into_stream()).boxed()
// Will buffer at most 4 event chunks.
const BUFFER_SIZE: usize = 4;
spawn_data_generation_stream(self.generator.into_stream(), BUFFER_SIZE).boxed()
}
}

Expand Down
17 changes: 12 additions & 5 deletions src/connector/src/source/nexmark/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ impl NexmarkEventGenerator {
let mut last_event = None;
loop {
let mut msgs: Vec<SourceMessage> = vec![];
let mut num_event = 0;
let old_events_so_far = self.events_so_far;

// Get unix timestamp in milliseconds
Expand All @@ -60,15 +59,17 @@ impl NexmarkEventGenerator {
};

if let Some(event) = last_event.take() {
num_event += 1;
msgs.push(
NexmarkMessage::new(self.split_id.clone(), self.events_so_far as u64, event)
.into(),
);
}

while num_event < self.max_chunk_size {
let mut finished = false;

while (msgs.len() as u64) < self.max_chunk_size {
if self.event_num > 0 && self.events_so_far >= self.event_num as u64 {
finished = true;
break;
}

Expand Down Expand Up @@ -99,13 +100,17 @@ impl NexmarkEventGenerator {
break;
}

num_event += 1;
msgs.push(
NexmarkMessage::new(self.split_id.clone(), self.events_so_far as u64, event)
.into(),
);
}
yield msgs;

if finished && msgs.is_empty() {
break;
} else {
yield msgs;
}

if !self.use_real_time && self.min_event_gap_in_ns > 0 {
tokio::time::sleep(Duration::from_nanos(
Expand All @@ -114,5 +119,7 @@ impl NexmarkEventGenerator {
.await;
}
}

tracing::debug!(?self.event_type, "nexmark generator finished");
}
}
4 changes: 3 additions & 1 deletion src/connector/src/source/nexmark/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ impl SplitReader for NexmarkSplitReader {
}

fn into_stream(self) -> BoxSourceStream {
spawn_data_generation_stream(self.generator.into_stream()).boxed()
// Will buffer at most 4 event chunks.
const BUFFER_SIZE: usize = 4;
spawn_data_generation_stream(self.generator.into_stream(), BUFFER_SIZE).boxed()
}
}

Expand Down