Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(scale): introduce deterministic scaling tests #5657

Merged
merged 25 commits into from
Oct 8, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -57,3 +57,5 @@ src/tests/regress/output/*
src/tests/sync_point/slt/e2e_test
# generated e2e tests
e2e_test/generated
# generated scale tests by `cargo nextest`
scale-test.tar.zst
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion Cargo.toml
Expand Up @@ -30,6 +30,7 @@ members = [
"src/test_runner",
"src/tests/regress",
"src/tests/simulation",
"src/tests/simulation_scale",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer merging it into the existing simulation crate, maybe in a future PR.

"src/tests/sqlsmith",
"src/tests/sync_point",
"src/tracing",
Expand Down Expand Up @@ -74,10 +75,16 @@ overflow-checks = true
[profile.ci-dev]
inherits = "dev"
incremental = false
lto = 'off'
[profile.ci-dev.package."*"] # external dependencies
opt-level = 1

# The profile used for deterministic simulation tests in CI.
# The simulator can only run single-threaded, so optimization is required to make the running time
# reasonable. The optimization level is set to 1 to speed up the build.
[profile.ci-sim]
inherits = "dev"
opt-level = 1

# Patch third-party crates for deterministic simulation.
[patch.crates-io]
quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "a819877" }
Expand Down
19 changes: 17 additions & 2 deletions Makefile.toml
Expand Up @@ -495,11 +495,26 @@ cargo nextest run \
"$@"
"""

[tasks.sarchive-scale-test]
category = "RiseDev - Archive simulation scaling tests"
description = "Archive integration scaling tests in deterministic simulation mode"
dependencies = ["warn-on-missing-tools"]
env = { RUSTFLAGS = "-Ctarget-cpu=native --cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim" }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't decide how to enable SIMD this way through environments. Thus, the results of JSON parsing might be different due to precision errors like #5487. 🥵

script = """
#!/bin/bash
set -e

cargo nextest archive \
-p risingwave_simulation_scale \
--archive-file scale-test.tar.zst \
"$@"
"""

[tasks.sslt]
category = "RiseDev - Deterministic Simulation End-to-end Test"
description = "Run e2e tests in deterministic simulation mode"
dependencies = ["warn-on-missing-tools"]
env = { RUSTFLAGS = "--cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim" }
env = { RUSTFLAGS = "-Ctarget-cpu=native --cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim" }
script = """
#!/bin/bash
set -e
Expand All @@ -511,7 +526,7 @@ cargo run -p risingwave_simulation "$@"
category = "RiseDev - Deterministic Simulation End-to-end Test"
description = "Run e2e tests in deterministic simulation mode and report code coverage"
dependencies = ["warn-on-missing-tools"]
env = { RUSTFLAGS = "--cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim-cov" }
env = { RUSTFLAGS = "-Ctarget-cpu=native --cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim-cov" }
script = """
#!/bin/bash
set -e
Expand Down
8 changes: 6 additions & 2 deletions ci/scripts/build-simulation.sh
Expand Up @@ -9,8 +9,12 @@ echo "--- Generate RiseDev CI config"
cp ci/risedev-components.ci.env risedev-components.user.env

echo "--- Build deterministic simulation e2e test runner"
cargo make sslt --profile ci-release -- --help
cargo make sslt --profile ci-sim -- --help

echo "--- Build and archive deterministic scaling imulation tests"
cargo make sarchive-scale-test --cargo-profile ci-sim

echo "--- Upload artifacts"
cp target/sim/ci-release/risingwave_simulation ./risingwave_simulation
cp target/sim/ci-sim/risingwave_simulation ./risingwave_simulation
buildkite-agent artifact upload risingwave_simulation
buildkite-agent artifact upload scale-test.tar.zst
12 changes: 12 additions & 0 deletions ci/scripts/deterministic-scale-test.sh
@@ -0,0 +1,12 @@
#!/bin/bash

# Exits as soon as any line fails.
set -euo pipefail

source ci/scripts/common.env.sh

echo "--- Download artifacts"
buildkite-agent artifact download scale-test.tar.zst .

echo "--- Run scaling tests in deterministic simulation mode"
MADSIM_TEST_NUM=5 cargo nextest run --archive-file scale-test.tar.zst --no-fail-fast
2 changes: 1 addition & 1 deletion ci/workflows/main.yml
Expand Up @@ -43,7 +43,7 @@ steps:
mount-buildkite-agent: true
timeout_in_minutes: 10
retry: *auto-retry

- label: "docslt"
command: "ci/scripts/docslt.sh"
key: "docslt"
Expand Down
14 changes: 13 additions & 1 deletion ci/workflows/pull-request.yml
Expand Up @@ -52,7 +52,7 @@ steps:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
timeout_in_minutes: 10
timeout_in_minutes: 15
retry: *auto-retry

- label: "docslt"
Expand Down Expand Up @@ -173,6 +173,18 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry

- label: "scaling test (deterministic simulation)"
command: "ci/scripts/deterministic-scale-test.sh"
depends_on: "build-simulation"
plugins:
- gencer/cache#v2.4.10: *cargo-cache
- docker-compose#v3.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end test (deterministic simulation)"
command: "timeout 8m ci/scripts/deterministic-e2e-test.sh"
depends_on: "build-simulation"
Expand Down
21 changes: 16 additions & 5 deletions src/connector/src/source/nexmark/source/generator.rs
Expand Up @@ -14,6 +14,7 @@

use std::time::{Duration, SystemTime, UNIX_EPOCH};

use futures::future::pending;
use futures_async_stream::try_stream;
use risingwave_common::bail;

Expand Down Expand Up @@ -46,7 +47,6 @@ impl NexmarkEventGenerator {
let mut last_event = None;
loop {
let mut msgs: Vec<SourceMessage> = vec![];
let mut num_event = 0;
let old_events_so_far = self.events_so_far;

// Get unix timestamp in milliseconds
Expand All @@ -60,15 +60,17 @@ impl NexmarkEventGenerator {
};

if let Some(event) = last_event.take() {
num_event += 1;
msgs.push(
NexmarkMessage::new(self.split_id.clone(), self.events_so_far as u64, event)
.into(),
);
}

while num_event < self.max_chunk_size {
let mut finished = false;

while (msgs.len() as u64) < self.max_chunk_size {
if self.event_num > 0 && self.events_so_far >= self.event_num as u64 {
finished = true;
break;
}

Expand Down Expand Up @@ -99,13 +101,17 @@ impl NexmarkEventGenerator {
break;
}

num_event += 1;
msgs.push(
NexmarkMessage::new(self.split_id.clone(), self.events_so_far as u64, event)
.into(),
);
}
yield msgs;

if finished && msgs.is_empty() {
break;
} else {
yield msgs;
}

if !self.use_real_time && self.min_event_gap_in_ns > 0 {
tokio::time::sleep(Duration::from_nanos(
Expand All @@ -114,5 +120,10 @@ impl NexmarkEventGenerator {
.await;
}
}

// The connector assumes that the stream will never end, so if the `event_num` is hit, we
// pend the stream forever.
// TODO: should we allow the stream to finish?
let () = pending().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the stream generator themselves should always end the stream gracefully, as long as they are not infinite streams. Even for those down-streams, they should end themselves as well when the upstream is closed. This way the errors can be propagated to the sink. The system won't be blocking implicitly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've discussed this issue and decided not to use the Stream terminate or TryStream error to represent the stream state in RisingWave as we don't want to propagate the error through the network. 🥵 The error yielded in actor internally should be collected by the Actor instance and find a way to report it to meta service if possible. cc @fuyufjh

For the stream reader, I find that the refactor just merged has removed the assumption (as we're all-in async stream), so maybe the workaround can be removed. Note that there's a select of source reader and barrier receiver, the source executor will work correctly after the source is gracefully terminated. On error, it will propagate to the actor of this source executor.

}
}
9 changes: 7 additions & 2 deletions src/ctl/src/cmd_impl/meta/cluster_info.rs
Expand Up @@ -20,16 +20,21 @@ use risingwave_pb::meta::GetClusterInfoResponse;

use crate::common::MetaServiceOpts;

pub async fn cluster_info() -> anyhow::Result<()> {
pub async fn get_cluster_info() -> anyhow::Result<GetClusterInfoResponse> {
let meta_opts = MetaServiceOpts::from_env()?;
let meta_client = meta_opts.create_meta_client().await?;

let response = meta_client.get_cluster_info().await?;
Ok(response)
}

pub async fn cluster_info() -> anyhow::Result<()> {
let GetClusterInfoResponse {
worker_nodes,
table_fragments,
actor_splits: _,
stream_source_infos: _,
} = meta_client.get_cluster_info().await?;
} = get_cluster_info().await?;

// Fragment ID -> [Parallel Unit ID -> (Parallel Unit, Actor)]
let mut fragments = BTreeMap::new();
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/lib.rs
Expand Up @@ -18,7 +18,7 @@ use cmd_impl::bench::BenchCommands;

use crate::cmd_impl::hummock::{list_pinned_snapshots, list_pinned_versions};

mod cmd_impl;
pub mod cmd_impl;
pub(crate) mod common;

/// risectl provides internal access to the RisingWave cluster. Generally, you will need
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/rpc/server.rs
Expand Up @@ -119,7 +119,7 @@ pub async fn rpc_serve(
.await
}
MetaStoreBackend::Mem => {
let meta_store = Arc::new(MemStore::shared());
let meta_store = Arc::new(MemStore::new());
rpc_serve_with_store(
meta_store,
address_info,
Expand Down
9 changes: 4 additions & 5 deletions src/meta/src/storage/mem_meta_store.rs
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, LazyLock};
use std::sync::Arc;

use async_trait::async_trait;
use tokio::sync::{OwnedRwLockReadGuard, RwLock};
Expand Down Expand Up @@ -68,10 +68,9 @@ impl Snapshot for MemSnapshot {
}

impl MemStore {
/// Get a global shared in-memory store.
pub fn shared() -> Self {
static STORE: LazyLock<MemStore> = LazyLock::new(MemStore::default);
STORE.clone()
/// Create a new in-memory store.
pub fn new() -> Self {
wangrunji0408 marked this conversation as resolved.
Show resolved Hide resolved
Self::default()
}
}

Expand Down
29 changes: 29 additions & 0 deletions src/tests/simulation_scale/Cargo.toml
@@ -0,0 +1,29 @@
[package]
name = "risingwave_simulation_scale"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0"
async-trait = "0.1"
clap = "3"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
glob = "0.3"
itertools = "0.10"
madsim = "0.2.7"
rand = "0.8"
risingwave_compactor = { path = "../../storage/compactor" }
risingwave_compute = { path = "../../compute" }
risingwave_ctl = { path = "../../ctl" }
risingwave_frontend = { path = "../../frontend" }
risingwave_meta = { path = "../../meta" }
risingwave_pb = { path = "../../prost" }
sqllogictest = "0.6.4"
tempfile = "3"
tokio = { version = "0.2", package = "madsim-tokio" }
tokio-postgres = "0.7.7"
tracing = "0.1"

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { version = "0.1", path = "../../workspace-hack" }