Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(scale): introduce deterministic scaling tests #5657

Merged
merged 25 commits into from
Oct 8, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -57,3 +57,5 @@ src/tests/regress/output/*
src/tests/sync_point/slt/e2e_test
# generated e2e tests
e2e_test/generated
# generated scale tests
scale-test.tar.zst
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -30,6 +30,7 @@ members = [
"src/test_runner",
"src/tests/regress",
"src/tests/simulation",
"src/tests/simulation_scale",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer merging it into the existing simulation crate, maybe in a future PR.

"src/tests/sqlsmith",
"src/tests/sync_point",
"src/tracing",
Expand Down
19 changes: 17 additions & 2 deletions Makefile.toml
Expand Up @@ -495,11 +495,26 @@ cargo nextest run \
"$@"
"""

[tasks.sarchive-scale-test]
category = "RiseDev - Archive simulation scaling tests"
description = "Archive integration scaling tests in deterministic simulation mode"
dependencies = ["warn-on-missing-tools"]
env = { RUSTFLAGS = "-Ctarget-cpu=native --cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim" }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't decide how to enable SIMD this way through environments. Thus, the results of JSON parsing might be different due to precision errors like #5487. 🥵

script = """
#!/bin/bash
set -e

cargo nextest archive \
-p risingwave_simulation_scale \
--archive-file scale-test.tar.zst \
"$@"
"""

[tasks.sslt]
category = "RiseDev - Deterministic Simulation End-to-end Test"
description = "Run e2e tests in deterministic simulation mode"
dependencies = ["warn-on-missing-tools"]
env = { RUSTFLAGS = "--cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim" }
env = { RUSTFLAGS = "-Ctarget-cpu=native --cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim" }
script = """
#!/bin/bash
set -e
Expand All @@ -511,7 +526,7 @@ cargo run -p risingwave_simulation "$@"
category = "RiseDev - Deterministic Simulation End-to-end Test"
description = "Run e2e tests in deterministic simulation mode and report code coverage"
dependencies = ["warn-on-missing-tools"]
env = { RUSTFLAGS = "--cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim-cov" }
env = { RUSTFLAGS = "-Ctarget-cpu=native --cfg tokio_unstable --cfg madsim", RUSTDOCFLAGS = "--cfg madsim", CARGO_TARGET_DIR = "target/sim-cov" }
script = """
#!/bin/bash
set -e
Expand Down
4 changes: 4 additions & 0 deletions ci/scripts/build-simulation.sh
Expand Up @@ -11,6 +11,10 @@ cp ci/risedev-components.ci.env risedev-components.user.env
echo "--- Build deterministic simulation e2e test runner"
cargo make sslt --profile ci-release -- --help

echo "--- Build and archive deterministic scaling imulation tests"
cargo make sarchive-scale-test --cargo-profile ci-release

echo "--- Upload artifacts"
cp target/sim/ci-release/risingwave_simulation ./risingwave_simulation
buildkite-agent artifact upload risingwave_simulation
buildkite-agent artifact upload scale-test.tar.zst
12 changes: 12 additions & 0 deletions ci/scripts/deterministic-scale-test.sh
@@ -0,0 +1,12 @@
#!/bin/bash

# Exits as soon as any line fails.
set -euo pipefail

source ci/scripts/common.env.sh

echo "--- Download artifacts"
buildkite-agent artifact download scale-test.tar.zst .

echo "--- Run scaling tests in deterministic simulation mode"
MADSIM_TEST_NUM=5 cargo nextest run --archive-file scale-test.tar.zst --no-fail-fast
2 changes: 1 addition & 1 deletion ci/workflows/main.yml
Expand Up @@ -43,7 +43,7 @@ steps:
mount-buildkite-agent: true
timeout_in_minutes: 10
retry: *auto-retry

- label: "docslt"
command: "ci/scripts/docslt.sh"
key: "docslt"
Expand Down
14 changes: 13 additions & 1 deletion ci/workflows/pull-request.yml
Expand Up @@ -52,7 +52,7 @@ steps:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
timeout_in_minutes: 10
timeout_in_minutes: 15
retry: *auto-retry

- label: "docslt"
Expand Down Expand Up @@ -173,6 +173,18 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry

- label: "scaling test (deterministic simulation)"
command: "ci/scripts/deterministic-scale-test.sh"
depends_on: "build-simulation"
plugins:
- gencer/cache#v2.4.10: *cargo-cache
- docker-compose#v3.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end test (deterministic simulation)"
command: "timeout 8m ci/scripts/deterministic-e2e-test.sh"
depends_on: "build-simulation"
Expand Down
8 changes: 7 additions & 1 deletion src/connector/src/source/nexmark/source/generator.rs
Expand Up @@ -15,6 +15,7 @@
use std::time::{SystemTime, UNIX_EPOCH};

use anyhow::{Ok, Result};
use futures::future::pending;
use futures_async_stream::try_stream;
use risingwave_common::bail;

Expand Down Expand Up @@ -43,7 +44,12 @@ impl NexmarkEventGenerator {
#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
pub async fn into_stream(mut self) {
loop {
yield self.next().await?
let chunk = self.next().await?;
if chunk.is_empty() {
yield pending().await;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be CPU intensive if there are no remaining records to generate, which is problematic with madsim. 🤣

} else {
yield chunk;
}
}
}

Expand Down
9 changes: 7 additions & 2 deletions src/ctl/src/cmd_impl/meta/cluster_info.rs
Expand Up @@ -20,16 +20,21 @@ use risingwave_pb::meta::GetClusterInfoResponse;

use crate::common::MetaServiceOpts;

pub async fn cluster_info() -> anyhow::Result<()> {
pub async fn get_cluster_info() -> anyhow::Result<GetClusterInfoResponse> {
let meta_opts = MetaServiceOpts::from_env()?;
let meta_client = meta_opts.create_meta_client().await?;

let response = meta_client.get_cluster_info().await?;
Ok(response)
}

pub async fn cluster_info() -> anyhow::Result<()> {
let GetClusterInfoResponse {
worker_nodes,
table_fragments,
actor_splits: _,
stream_source_infos: _,
} = meta_client.get_cluster_info().await?;
} = get_cluster_info().await?;

// Fragment ID -> [Parallel Unit ID -> (Parallel Unit, Actor)]
let mut fragments = BTreeMap::new();
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/lib.rs
Expand Up @@ -18,7 +18,7 @@ use cmd_impl::bench::BenchCommands;

use crate::cmd_impl::hummock::{list_pinned_snapshots, list_pinned_versions};

mod cmd_impl;
pub mod cmd_impl;
pub(crate) mod common;

/// risectl provides internal access to the RisingWave cluster. Generally, you will need
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/rpc/server.rs
Expand Up @@ -120,7 +120,7 @@ pub async fn rpc_serve(
.await
}
MetaStoreBackend::Mem => {
let meta_store = Arc::new(MemStore::shared());
let meta_store = Arc::new(MemStore::new());
rpc_serve_with_store(
meta_store,
address_info,
Expand Down
9 changes: 4 additions & 5 deletions src/meta/src/storage/mem_meta_store.rs
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, LazyLock};
use std::sync::Arc;

use async_trait::async_trait;
use tokio::sync::{OwnedRwLockReadGuard, RwLock};
Expand Down Expand Up @@ -68,10 +68,9 @@ impl Snapshot for MemSnapshot {
}

impl MemStore {
/// Get a global shared in-memory store.
pub fn shared() -> Self {
static STORE: LazyLock<MemStore> = LazyLock::new(MemStore::default);
STORE.clone()
/// Create a new in-memory store.
pub fn new() -> Self {
wangrunji0408 marked this conversation as resolved.
Show resolved Hide resolved
Self::default()
}
}

Expand Down
29 changes: 29 additions & 0 deletions src/tests/simulation_scale/Cargo.toml
@@ -0,0 +1,29 @@
[package]
name = "risingwave_simulation_scale"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0"
async-trait = "0.1"
clap = "3"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
glob = "0.3"
itertools = "0.10"
madsim = "0.2.7"
rand = "0.8"
risingwave_compactor = { path = "../../storage/compactor" }
risingwave_compute = { path = "../../compute" }
risingwave_ctl = { path = "../../ctl" }
risingwave_frontend = { path = "../../frontend" }
risingwave_meta = { path = "../../meta" }
risingwave_pb = { path = "../../prost" }
sqllogictest = "0.6.4"
tempfile = "3"
tokio = { version = "0.2", package = "madsim-tokio" }
tokio-postgres = "0.7.7"
tracing = "0.1"

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { version = "0.1", path = "../../workspace-hack" }