Skip to content

Commit

Permalink
test(scale): introduce deterministic scaling tests (#5657)
Browse files Browse the repository at this point in the history
* initial scale sim

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* test q4

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* locate fragment

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* fix predicate & add cascade

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* add docs

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* add workflows

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* add license header

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* fix workflow

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* fix ci & add cfg madsim

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* try use ci-sim

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* align cfg with sslt

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* align with sslt

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* correct result & soft fail

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* do not use shared mem store

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* avoid hard coded thoughput

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* use ci-sim profile

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* remove comments

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* increase opt level for ci-sim

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* remove stream workaround

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* fix clippy

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* minor fixes

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* bump futures to 0.3.24

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
BugenZhao and mergify[bot] committed Oct 8, 2022
1 parent e14255a commit cb0d309
Show file tree
Hide file tree
Showing 29 changed files with 1,046 additions and 53 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -57,3 +57,5 @@ src/tests/regress/output/*
src/tests/sync_point/slt/e2e_test
# generated e2e tests
e2e_test/generated
# generated scale tests by `cargo nextest`
scale-test.tar.zst
74 changes: 50 additions & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion Cargo.toml
Expand Up @@ -30,6 +30,7 @@ members = [
"src/test_runner",
"src/tests/regress",
"src/tests/simulation",
"src/tests/simulation_scale",
"src/tests/sqlsmith",
"src/tests/sync_point",
"src/tracing",
Expand Down Expand Up @@ -74,10 +75,16 @@ overflow-checks = true
[profile.ci-dev]
inherits = "dev"
incremental = false
lto = 'off'
[profile.ci-dev.package."*"] # external dependencies
opt-level = 1

# The profile used for deterministic simulation tests in CI.
# The simulator can only run single-threaded, so optimization is required to make the running time
# reasonable. The optimization level is customized to speed up the build.
[profile.ci-sim]
inherits = "dev"
opt-level = 2

# Patch third-party crates for deterministic simulation.
[patch.crates-io]
quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "a819877" }
Expand Down
19 changes: 17 additions & 2 deletions Makefile.toml
Expand Up @@ -495,11 +495,26 @@ cargo nextest run \
"$@"
"""

[tasks.sarchive-scale-test]
category = "RiseDev - Archive simulation scaling tests"
description = "Archive integration scaling tests in deterministic simulation mode"
dependencies = ["warn-on-missing-tools"]
env = { RUSTFLAGS = "-Ctarget-cpu=native --cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim" }
script = """
#!/bin/bash
set -e
cargo nextest archive \
-p risingwave_simulation_scale \
--archive-file scale-test.tar.zst \
"$@"
"""

[tasks.sslt]
category = "RiseDev - Deterministic Simulation End-to-end Test"
description = "Run e2e tests in deterministic simulation mode"
dependencies = ["warn-on-missing-tools"]
env = { RUSTFLAGS = "--cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim" }
env = { RUSTFLAGS = "-Ctarget-cpu=native --cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim" }
script = """
#!/bin/bash
set -e
Expand All @@ -511,7 +526,7 @@ cargo run -p risingwave_simulation "$@"
category = "RiseDev - Deterministic Simulation End-to-end Test"
description = "Run e2e tests in deterministic simulation mode and report code coverage"
dependencies = ["warn-on-missing-tools"]
env = { RUSTFLAGS = "--cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim-cov" }
env = { RUSTFLAGS = "-Ctarget-cpu=native --cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim-cov" }
script = """
#!/bin/bash
set -e
Expand Down
8 changes: 6 additions & 2 deletions ci/scripts/build-simulation.sh
Expand Up @@ -9,8 +9,12 @@ echo "--- Generate RiseDev CI config"
cp ci/risedev-components.ci.env risedev-components.user.env

echo "--- Build deterministic simulation e2e test runner"
cargo make sslt --profile ci-release -- --help
cargo make sslt --profile ci-sim -- --help

echo "--- Build and archive deterministic scaling imulation tests"
cargo make sarchive-scale-test --cargo-profile ci-sim

echo "--- Upload artifacts"
cp target/sim/ci-release/risingwave_simulation ./risingwave_simulation
cp target/sim/ci-sim/risingwave_simulation ./risingwave_simulation
buildkite-agent artifact upload risingwave_simulation
buildkite-agent artifact upload scale-test.tar.zst
12 changes: 12 additions & 0 deletions ci/scripts/deterministic-scale-test.sh
@@ -0,0 +1,12 @@
#!/bin/bash

# Exits as soon as any line fails.
set -euo pipefail

source ci/scripts/common.env.sh

echo "--- Download artifacts"
buildkite-agent artifact download scale-test.tar.zst .

echo "--- Run scaling tests in deterministic simulation mode"
MADSIM_TEST_NUM=5 cargo nextest run --archive-file scale-test.tar.zst --no-fail-fast
2 changes: 1 addition & 1 deletion ci/workflows/main.yml
Expand Up @@ -43,7 +43,7 @@ steps:
mount-buildkite-agent: true
timeout_in_minutes: 10
retry: *auto-retry

- label: "docslt"
command: "ci/scripts/docslt.sh"
key: "docslt"
Expand Down
14 changes: 13 additions & 1 deletion ci/workflows/pull-request.yml
Expand Up @@ -52,7 +52,7 @@ steps:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
timeout_in_minutes: 10
timeout_in_minutes: 15
retry: *auto-retry

- label: "docslt"
Expand Down Expand Up @@ -173,6 +173,18 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry

- label: "scaling test (deterministic simulation)"
command: "ci/scripts/deterministic-scale-test.sh"
depends_on: "build-simulation"
plugins:
- gencer/cache#v2.4.10: *cargo-cache
- docker-compose#v3.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end test (deterministic simulation)"
command: "timeout 8m ci/scripts/deterministic-e2e-test.sh"
depends_on: "build-simulation"
Expand Down
5 changes: 2 additions & 3 deletions src/connector/src/source/base.rs
Expand Up @@ -193,10 +193,9 @@ pub type ConnectorState = Option<Vec<SplitImpl>>;
/// [`NexmarkSplitReader`] in case that they are CPU intensive and may block the streaming actors.
pub fn spawn_data_generation_stream<T: Send + 'static>(
stream: impl Stream<Item = T> + Send + 'static,
buffer_size: usize,
) -> impl Stream<Item = T> + Send + 'static {
const GENERATION_BUFFER: usize = 1000;

let (generation_tx, generation_rx) = mpsc::channel(GENERATION_BUFFER);
let (generation_tx, generation_rx) = mpsc::channel(buffer_size);
RUNTIME.spawn(async move {
pin_mut!(stream);
while let Some(result) = stream.next().await {
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/source/datagen/source/reader.rs
Expand Up @@ -119,7 +119,9 @@ impl SplitReader for DatagenSplitReader {
}

fn into_stream(self) -> BoxSourceStream {
spawn_data_generation_stream(self.generator.into_stream()).boxed()
// Will buffer at most 4 event chunks.
const BUFFER_SIZE: usize = 4;
spawn_data_generation_stream(self.generator.into_stream(), BUFFER_SIZE).boxed()
}
}

Expand Down
17 changes: 12 additions & 5 deletions src/connector/src/source/nexmark/source/generator.rs
Expand Up @@ -46,7 +46,6 @@ impl NexmarkEventGenerator {
let mut last_event = None;
loop {
let mut msgs: Vec<SourceMessage> = vec![];
let mut num_event = 0;
let old_events_so_far = self.events_so_far;

// Get unix timestamp in milliseconds
Expand All @@ -60,15 +59,17 @@ impl NexmarkEventGenerator {
};

if let Some(event) = last_event.take() {
num_event += 1;
msgs.push(
NexmarkMessage::new(self.split_id.clone(), self.events_so_far as u64, event)
.into(),
);
}

while num_event < self.max_chunk_size {
let mut finished = false;

while (msgs.len() as u64) < self.max_chunk_size {
if self.event_num > 0 && self.events_so_far >= self.event_num as u64 {
finished = true;
break;
}

Expand Down Expand Up @@ -99,13 +100,17 @@ impl NexmarkEventGenerator {
break;
}

num_event += 1;
msgs.push(
NexmarkMessage::new(self.split_id.clone(), self.events_so_far as u64, event)
.into(),
);
}
yield msgs;

if finished && msgs.is_empty() {
break;
} else {
yield msgs;
}

if !self.use_real_time && self.min_event_gap_in_ns > 0 {
tokio::time::sleep(Duration::from_nanos(
Expand All @@ -114,5 +119,7 @@ impl NexmarkEventGenerator {
.await;
}
}

tracing::debug!(?self.event_type, "nexmark generator finished");
}
}
4 changes: 3 additions & 1 deletion src/connector/src/source/nexmark/source/reader.rs
Expand Up @@ -106,7 +106,9 @@ impl SplitReader for NexmarkSplitReader {
}

fn into_stream(self) -> BoxSourceStream {
spawn_data_generation_stream(self.generator.into_stream()).boxed()
// Will buffer at most 4 event chunks.
const BUFFER_SIZE: usize = 4;
spawn_data_generation_stream(self.generator.into_stream(), BUFFER_SIZE).boxed()
}
}

Expand Down

0 comments on commit cb0d309

Please sign in to comment.