From 3fe678766fc989f31180fccb5e9fe36833f431e2 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 6 Sep 2022 01:23:53 +0200 Subject: [PATCH 01/12] build(toolchain): bump to 2022-09-05 --- Cargo.lock | 8 ++++---- ci/build-ci-image.sh | 2 +- ci/docker-compose.yml | 4 ++-- rust-toolchain | 2 +- src/common/src/error.rs | 4 +++- src/common/src/lib.rs | 4 +++- src/expr/src/lib.rs | 1 - src/object_store/src/lib.rs | 4 +++- src/object_store/src/object/error.rs | 2 +- src/risedevtool/src/bin/risedev-config.rs | 2 +- src/risedevtool/src/lib.rs | 1 + src/sqlparser/src/tokenizer.rs | 6 +++--- src/storage/src/error.rs | 3 ++- src/storage/src/hummock/error.rs | 2 +- src/storage/src/lib.rs | 4 +++- src/utils/async_stack_trace/src/context.rs | 5 ----- 16 files changed, 29 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 477822914c1c..5ef859169a00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5779,18 +5779,18 @@ checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" [[package]] name = "thiserror" -version = "1.0.32" +version = "1.0.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5f6586b7f764adc0231f4c79be7b920e766bb2f3e51b3661cdb263828f19994" +checksum = "8c1b05ca9d106ba7d2e31a9dab4a64e7be2cce415321966ea3132c49a656e252" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.32" +version = "1.0.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12bafc5b54507e0149cdf1b145a5d80ab80a90bcd9275df43d4fff68460f6c21" +checksum = "e8f2591983642de85c921015f3f070c665a197ed69e417af436115e3a1407487" dependencies = [ "proc-macro2", "quote", diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index 4ba2ebf84f20..350585b3b4a4 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -14,7 +14,7 @@ export RUST_TOOLCHAIN=$(cat ../rust-toolchain) # !!! CHANGE THIS WHEN YOU WANT TO BUMP CI IMAGE !!! # # AND ALSO docker-compose.yml # ###################################################### -export BUILD_ENV_VERSION=v20220826 +export BUILD_ENV_VERSION=v20220905 export BUILD_TAG="public.ecr.aws/x5u3w5h6/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index f8aa1d78bdae..046f83b4b881 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -42,12 +42,12 @@ services: retries: 5 rw-build-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20220826 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20220905 volumes: - ..:/risingwave regress-test-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20220826 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20220905 depends_on: db: condition: service_healthy diff --git a/rust-toolchain b/rust-toolchain index 185be365a452..8bdeb1d5b4a7 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly-2022-07-29 +nightly-2022-09-05 diff --git a/src/common/src/error.rs b/src/common/src/error.rs index 55928629357e..f2cf1a7bee92 100644 --- a/src/common/src/error.rs +++ b/src/common/src/error.rs @@ -230,7 +230,9 @@ impl Debug for RwError { "{}\n{}", self.inner, // Use inner error's backtrace by default, otherwise use the generated one in `From`. - self.inner.backtrace().unwrap_or(&*self.backtrace) + (&self.inner as &dyn std::error::Error) + .request_ref::() + .unwrap_or(&*self.backtrace) ) } } diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index a79177145dcc..44c5543032ed 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -30,7 +30,6 @@ #![feature(generic_associated_types)] #![feature(binary_heap_drain_sorted)] #![feature(is_sorted)] -#![feature(backtrace)] #![feature(fn_traits)] #![feature(type_alias_impl_trait)] #![feature(test)] @@ -39,6 +38,9 @@ #![feature(lint_reasons)] #![feature(generators)] #![feature(map_try_insert)] +#![feature(let_chains)] +#![feature(error_generic_member_access)] +#![feature(provide_any)] #[macro_use] pub mod error; diff --git a/src/expr/src/lib.rs b/src/expr/src/lib.rs index 7b762ce4d65c..818a6c62b4ad 100644 --- a/src/expr/src/lib.rs +++ b/src/expr/src/lib.rs @@ -30,7 +30,6 @@ #![feature(binary_heap_drain_sorted)] #![feature(binary_heap_into_iter_sorted)] #![feature(is_sorted)] -#![feature(backtrace)] #![feature(fn_traits)] #![feature(assert_matches)] #![feature(let_else)] diff --git a/src/object_store/src/lib.rs b/src/object_store/src/lib.rs index bb8552786acb..2cae06fcfd42 100644 --- a/src/object_store/src/lib.rs +++ b/src/object_store/src/lib.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(backtrace)] #![feature(generic_associated_types)] #![feature(trait_alias)] #![feature(type_alias_impl_trait)] +#![feature(let_chains)] +#![feature(error_generic_member_access)] +#![feature(provide_any)] extern crate core; diff --git a/src/object_store/src/object/error.rs b/src/object_store/src/object/error.rs index cac98cb39602..080a43a5d99d 100644 --- a/src/object_store/src/object/error.rs +++ b/src/object_store/src/object/error.rs @@ -50,7 +50,7 @@ impl std::fmt::Debug for ObjectError { write!(f, "{}", self.inner)?; writeln!(f)?; - if let Some(backtrace) = self.inner.backtrace() { + if let Some(backtrace) = (&self.inner as &dyn Error).request_ref::() { write!(f, " backtrace of inner error:\n{}", backtrace)?; } else { write!(f, " backtrace of `ObjectError`:\n{}", self.backtrace)?; diff --git a/src/risedevtool/src/bin/risedev-config.rs b/src/risedevtool/src/bin/risedev-config.rs index 4dc65551633d..4ee8ad3d2d5b 100644 --- a/src/risedevtool/src/bin/risedev-config.rs +++ b/src/risedevtool/src/bin/risedev-config.rs @@ -270,7 +270,7 @@ fn main() -> Result<()> { if component == "RISEDEV_CONFIGURED" { continue; } - match Components::from_env(&component) { + match Components::from_env(component) { Some(component) => { if val == "true" { enabled.push(component); diff --git a/src/risedevtool/src/lib.rs b/src/risedevtool/src/lib.rs index ce8268fb6c8b..5005689e2105 100644 --- a/src/risedevtool/src/lib.rs +++ b/src/risedevtool/src/lib.rs @@ -15,6 +15,7 @@ #![allow(clippy::derive_partial_eq_without_eq)] #![feature(exit_status_error)] #![feature(let_else)] +#![feature(let_chains)] #![feature(lint_reasons)] mod config; diff --git a/src/sqlparser/src/tokenizer.rs b/src/sqlparser/src/tokenizer.rs index 5fce9228a5c5..6647b0b0649e 100644 --- a/src/sqlparser/src/tokenizer.rs +++ b/src/sqlparser/src/tokenizer.rs @@ -207,7 +207,7 @@ impl Token { Token::Word(Word { value: word.to_string(), quote_style, - keyword: if quote_style == None { + keyword: if quote_style.is_none() { let keyword = ALL_KEYWORDS.binary_search(&word_uppercase.as_str()); keyword.map_or(Keyword::NoKeyword, |x| ALL_KEYWORDS_INDEX[x]) } else { @@ -330,8 +330,8 @@ impl<'a> Tokenizer<'a> { } Token::Whitespace(Whitespace::Tab) => self.col += 4, - Token::Word(w) if w.quote_style == None => self.col += w.value.len() as u64, - Token::Word(w) if w.quote_style != None => self.col += w.value.len() as u64 + 2, + Token::Word(w) if w.quote_style.is_none() => self.col += w.value.len() as u64, + Token::Word(w) if w.quote_style.is_some() => self.col += w.value.len() as u64 + 2, Token::Number(s) => self.col += s.len() as u64, Token::SingleQuotedString(s) => self.col += s.len() as u64, _ => self.col += 1, diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 4386f1fd170a..a827d4a534be 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -55,11 +55,12 @@ impl From for RwError { impl std::fmt::Debug for StorageError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use std::backtrace::Backtrace; use std::error::Error; write!(f, "{}", self)?; writeln!(f)?; - if let Some(backtrace) = self.backtrace() { + if let Some(backtrace) = (&self as &dyn Error).request_ref::() { // Since we forward all backtraces from source, `self.backtrace()` is the backtrace of // inner error. write!(f, " backtrace of inner error:\n{}", backtrace)?; diff --git a/src/storage/src/hummock/error.rs b/src/storage/src/hummock/error.rs index 92cfc577ca62..2c6c27b7a39f 100644 --- a/src/storage/src/hummock/error.rs +++ b/src/storage/src/hummock/error.rs @@ -154,7 +154,7 @@ impl std::fmt::Debug for HummockError { write!(f, "{}", self.inner)?; writeln!(f)?; - if let Some(backtrace) = self.inner.backtrace() { + if let Some(backtrace) = (&self.inner as &dyn Error).request_ref::() { write!(f, " backtrace of inner error:\n{}", backtrace)?; } else { write!( diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index eff3e0aee9d8..ee45e0d4b9d9 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -25,7 +25,6 @@ #![deny(unused_must_use)] #![deny(rustdoc::broken_intra_doc_links)] #![feature(allocator_api)] -#![feature(backtrace)] #![feature(binary_heap_drain_sorted)] #![feature(bound_as_ref)] #![feature(bound_map)] @@ -36,6 +35,7 @@ #![feature(generic_associated_types)] #![feature(hash_drain_filter)] #![feature(let_else)] +#![feature(let_chains)] #![feature(lint_reasons)] #![feature(map_first_last)] #![feature(proc_macro_hygiene)] @@ -50,6 +50,8 @@ #![feature(assert_matches)] #![feature(is_sorted)] #![feature(btree_drain_filter)] +#![feature(error_generic_member_access)] +#![feature(provide_any)] pub mod hummock; pub mod keyspace; diff --git a/src/utils/async_stack_trace/src/context.rs b/src/utils/async_stack_trace/src/context.rs index abdb238d3c38..0178bf13e469 100644 --- a/src/utils/async_stack_trace/src/context.rs +++ b/src/utils/async_stack_trace/src/context.rs @@ -12,11 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// FIXME: This is a false-positive clippy test, remove this while bumping toolchain. -// https://github.com/tokio-rs/tokio/issues/4836 -// https://github.com/rust-lang/rust-clippy/issues/8493 -#![expect(clippy::declare_interior_mutable_const)] - use std::cell::RefCell; use std::fmt::{Debug, Write}; use std::sync::atomic::{AtomicU64, Ordering}; From 4cbcd7a6cdae6c2b459c1013ff6d3f9d2d8b3c16 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 13 Sep 2022 10:54:00 +0200 Subject: [PATCH 02/12] 0913 --- ci/build-ci-image.sh | 2 +- ci/docker-compose.yml | 4 ++-- rust-toolchain | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index 350585b3b4a4..155b042bc0f1 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -14,7 +14,7 @@ export RUST_TOOLCHAIN=$(cat ../rust-toolchain) # !!! CHANGE THIS WHEN YOU WANT TO BUMP CI IMAGE !!! # # AND ALSO docker-compose.yml # ###################################################### -export BUILD_ENV_VERSION=v20220905 +export BUILD_ENV_VERSION=v20220913 export BUILD_TAG="public.ecr.aws/x5u3w5h6/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 046f83b4b881..925bc4c77b9c 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -42,12 +42,12 @@ services: retries: 5 rw-build-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20220905 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20220913 volumes: - ..:/risingwave regress-test-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20220905 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20220913 depends_on: db: condition: service_healthy diff --git a/rust-toolchain b/rust-toolchain index 8bdeb1d5b4a7..f5d159db30ae 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly-2022-09-05 +nightly-2022-09-13 From ffcf4b80797b35baadf03579e94deb6151d41272 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 22 Sep 2022 10:08:22 +0200 Subject: [PATCH 03/12] #![feature(let_chains)] --- src/batch/src/lib.rs | 1 + src/connector/src/lib.rs | 1 + src/expr/src/lib.rs | 1 + src/meta/src/lib.rs | 1 + src/stream/src/lib.rs | 1 + 5 files changed, 5 insertions(+) diff --git a/src/batch/src/lib.rs b/src/batch/src/lib.rs index 2d3cb101b3e3..decd2775cdfc 100644 --- a/src/batch/src/lib.rs +++ b/src/batch/src/lib.rs @@ -36,6 +36,7 @@ #![feature(proc_macro_hygiene, stmt_expr_attributes)] #![feature(iterator_try_collect)] #![feature(lint_reasons)] +#![feature(let_chains)] mod error; pub mod exchange_source; diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 4ab855fa2c16..c7061064db5c 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -33,6 +33,7 @@ #![feature(generic_associated_types)] #![feature(binary_heap_drain_sorted)] #![feature(lint_reasons)] +#![feature(let_chains)] pub mod aws_utils; pub mod error; diff --git a/src/expr/src/lib.rs b/src/expr/src/lib.rs index 818a6c62b4ad..a5226f4b7d9e 100644 --- a/src/expr/src/lib.rs +++ b/src/expr/src/lib.rs @@ -33,6 +33,7 @@ #![feature(fn_traits)] #![feature(assert_matches)] #![feature(let_else)] +#![feature(let_chains)] #![feature(lint_reasons)] #![feature(type_alias_impl_trait)] #![feature(generators)] diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 7b4b691331a1..f14284054573 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -39,6 +39,7 @@ #![feature(map_try_insert)] #![feature(hash_drain_filter)] #![feature(is_some_with)] +#![feature(let_chains)] #![cfg_attr(coverage, feature(no_coverage))] #![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)] diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index f6a5609442e9..aa307518a8c7 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -36,6 +36,7 @@ #![feature(binary_heap_drain_sorted)] #![feature(map_first_last)] #![feature(let_else)] +#![feature(let_chains)] #![feature(hash_drain_filter)] #![feature(drain_filter)] #![feature(generators)] From a06b2937c2597f6228a229105002d61804112d75 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 22 Sep 2022 10:13:33 +0200 Subject: [PATCH 04/12] backtrace --- src/meta/src/error.rs | 2 +- src/meta/src/lib.rs | 3 ++- src/stream/src/executor/error.rs | 2 +- src/stream/src/lib.rs | 3 ++- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index f4576f43fa2f..f479fc53f916 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -71,7 +71,7 @@ impl std::fmt::Debug for MetaError { write!(f, "{}", self.inner)?; writeln!(f)?; - if let Some(backtrace) = self.inner.backtrace() { + if let Some(backtrace) = (&self.inner as &dyn Error).request_ref::() { write!(f, " backtrace of inner error:\n{}", backtrace)?; } else { write!(f, " backtrace of `MetaError`:\n{}", self.backtrace)?; diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index f14284054573..074ad6641884 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(backtrace)] #![allow(clippy::derive_partial_eq_without_eq)] #![warn(clippy::dbg_macro)] #![warn(clippy::disallowed_methods)] @@ -40,6 +39,8 @@ #![feature(hash_drain_filter)] #![feature(is_some_with)] #![feature(let_chains)] +#![feature(error_generic_member_access)] +#![feature(provide_any)] #![cfg_attr(coverage, feature(no_coverage))] #![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)] diff --git a/src/stream/src/executor/error.rs b/src/stream/src/executor/error.rs index 251a141e6ddf..9c1ad7a84a1d 100644 --- a/src/stream/src/executor/error.rs +++ b/src/stream/src/executor/error.rs @@ -100,7 +100,7 @@ impl std::fmt::Debug for StreamExecutorError { write!(f, "{}", self.inner)?; writeln!(f)?; - if let Some(backtrace) = self.inner.backtrace() { + if let Some(backtrace) = (&self.inner as &dyn Error).request_ref::() { write!(f, " backtrace of inner error:\n{}", backtrace)?; } else { write!( diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index aa307518a8c7..2a97e1cc1dda 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -14,7 +14,6 @@ #![allow(rustdoc::private_intra_doc_links)] #![allow(clippy::derive_partial_eq_without_eq)] -#![feature(backtrace)] #![warn(clippy::dbg_macro)] #![warn(clippy::disallowed_methods)] #![warn(clippy::doc_markdown)] @@ -48,6 +47,8 @@ #![feature(result_option_inspect)] #![feature(never_type)] #![feature(btreemap_alloc)] +#![feature(error_generic_member_access)] +#![feature(provide_any)] #[macro_use] extern crate tracing; From e6fb0e4299dd6d9d962e3e244623a32ddc6a8457 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 22 Sep 2022 10:40:11 +0200 Subject: [PATCH 05/12] 20220922 --- ci/build-ci-image.sh | 2 +- ci/docker-compose.yml | 4 ++-- rust-toolchain | 2 +- src/batch/src/lib.rs | 1 - src/common/src/lib.rs | 1 - src/compute/src/lib.rs | 2 -- src/connector/src/lib.rs | 1 - src/expr/src/lib.rs | 2 -- src/frontend/src/lib.rs | 1 - src/meta/src/lib.rs | 2 -- src/object_store/src/lib.rs | 1 - src/risedevtool/src/bin/risedev-config.rs | 1 - src/risedevtool/src/lib.rs | 1 - src/rpc_client/src/lib.rs | 1 - src/source/src/lib.rs | 1 - src/storage/compactor/src/lib.rs | 1 - src/storage/src/lib.rs | 2 -- src/stream/src/lib.rs | 2 -- 18 files changed, 4 insertions(+), 24 deletions(-) diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index 155b042bc0f1..aecd45800e75 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -14,7 +14,7 @@ export RUST_TOOLCHAIN=$(cat ../rust-toolchain) # !!! CHANGE THIS WHEN YOU WANT TO BUMP CI IMAGE !!! # # AND ALSO docker-compose.yml # ###################################################### -export BUILD_ENV_VERSION=v20220913 +export BUILD_ENV_VERSION=v20220922 export BUILD_TAG="public.ecr.aws/x5u3w5h6/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 8bbdd4aaaeb3..768dcb913739 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -42,12 +42,12 @@ services: retries: 5 rw-build-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20220913 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20220922 volumes: - ..:/risingwave regress-test-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20220913 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20220922 depends_on: db: condition: service_healthy diff --git a/rust-toolchain b/rust-toolchain index f5d159db30ae..e440ea607181 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly-2022-09-13 +nightly-2022-09-22 diff --git a/src/batch/src/lib.rs b/src/batch/src/lib.rs index decd2775cdfc..d97a70da241f 100644 --- a/src/batch/src/lib.rs +++ b/src/batch/src/lib.rs @@ -27,7 +27,6 @@ #![deny(unused_must_use)] #![deny(rustdoc::broken_intra_doc_links)] #![feature(trait_alias)] -#![feature(generic_associated_types)] #![feature(binary_heap_drain_sorted)] #![feature(exact_size_is_empty)] #![feature(type_alias_impl_trait)] diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index 6dfa3ba81ac9..ac6062cb2511 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -27,7 +27,6 @@ #![deny(unused_must_use)] #![deny(rustdoc::broken_intra_doc_links)] #![feature(trait_alias)] -#![feature(generic_associated_types)] #![feature(binary_heap_drain_sorted)] #![feature(is_sorted)] #![feature(fn_traits)] diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index 23f8fcf4ba0a..f65a2e615c2b 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -26,8 +26,6 @@ #![deny(rustdoc::broken_intra_doc_links)] #![feature(trait_alias)] #![feature(binary_heap_drain_sorted)] -#![feature(generic_associated_types)] -#![feature(let_else)] #![feature(generators)] #![feature(type_alias_impl_trait)] #![cfg_attr(coverage, feature(no_coverage))] diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 60bda8a3ede3..c61aa83e8f5c 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -30,7 +30,6 @@ #![deny(unused_must_use)] #![deny(rustdoc::broken_intra_doc_links)] #![feature(trait_alias)] -#![feature(generic_associated_types)] #![feature(binary_heap_drain_sorted)] #![feature(lint_reasons)] #![feature(once_cell)] diff --git a/src/expr/src/lib.rs b/src/expr/src/lib.rs index a5226f4b7d9e..a4eb902bd625 100644 --- a/src/expr/src/lib.rs +++ b/src/expr/src/lib.rs @@ -26,13 +26,11 @@ #![deny(unused_must_use)] #![deny(rustdoc::broken_intra_doc_links)] #![feature(trait_alias)] -#![feature(generic_associated_types)] #![feature(binary_heap_drain_sorted)] #![feature(binary_heap_into_iter_sorted)] #![feature(is_sorted)] #![feature(fn_traits)] #![feature(assert_matches)] -#![feature(let_else)] #![feature(let_chains)] #![feature(lint_reasons)] #![feature(type_alias_impl_trait)] diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 42cf11408138..110db4f74cea 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -30,7 +30,6 @@ #![feature(negative_impls)] #![feature(generators)] #![feature(proc_macro_hygiene, stmt_expr_attributes)] -#![feature(let_else)] #![feature(trait_alias)] #![feature(drain_filter)] #![feature(if_let_guard)] diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index e4f250edf470..013f5b23cd08 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -26,10 +26,8 @@ #![deny(unused_must_use)] #![deny(rustdoc::broken_intra_doc_links)] #![feature(trait_alias)] -#![feature(generic_associated_types)] #![feature(binary_heap_drain_sorted)] #![feature(option_result_contains)] -#![feature(let_else)] #![feature(type_alias_impl_trait)] #![feature(map_first_last)] #![feature(drain_filter)] diff --git a/src/object_store/src/lib.rs b/src/object_store/src/lib.rs index 065d3110d0d5..76b7d87bb836 100644 --- a/src/object_store/src/lib.rs +++ b/src/object_store/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(generic_associated_types)] #![feature(trait_alias)] #![feature(type_alias_impl_trait)] #![feature(once_cell)] diff --git a/src/risedevtool/src/bin/risedev-config.rs b/src/risedevtool/src/bin/risedev-config.rs index 4ee8ad3d2d5b..b81ae687768b 100644 --- a/src/risedevtool/src/bin/risedev-config.rs +++ b/src/risedevtool/src/bin/risedev-config.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(let_else)] #![allow(clippy::needless_question_mark)] use std::fs::OpenOptions; diff --git a/src/risedevtool/src/lib.rs b/src/risedevtool/src/lib.rs index 5005689e2105..6cd62695c9c6 100644 --- a/src/risedevtool/src/lib.rs +++ b/src/risedevtool/src/lib.rs @@ -14,7 +14,6 @@ #![allow(clippy::derive_partial_eq_without_eq)] #![feature(exit_status_error)] -#![feature(let_else)] #![feature(let_chains)] #![feature(lint_reasons)] diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 679e61f991d4..ce0062213f49 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -25,7 +25,6 @@ #![deny(unused_must_use)] #![deny(rustdoc::broken_intra_doc_links)] #![feature(trait_alias)] -#![feature(generic_associated_types)] #![feature(binary_heap_drain_sorted)] #![feature(result_option_inspect)] #![feature(type_alias_impl_trait)] diff --git a/src/source/src/lib.rs b/src/source/src/lib.rs index 951fe9dbd55a..6d2c84713c34 100644 --- a/src/source/src/lib.rs +++ b/src/source/src/lib.rs @@ -27,7 +27,6 @@ #![deny(unused_must_use)] #![deny(rustdoc::broken_intra_doc_links)] #![feature(trait_alias)] -#![feature(generic_associated_types)] #![feature(binary_heap_drain_sorted)] #![feature(lint_reasons)] #![feature(result_option_inspect)] diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index 9fcf1f9b29ba..3bfcd8d2d830 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -24,7 +24,6 @@ #![warn(clippy::await_holding_lock)] #![deny(unused_must_use)] #![deny(rustdoc::broken_intra_doc_links)] -#![feature(let_else)] mod compactor_observer; mod rpc; diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 46a8cce6b336..bc80b49db036 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -32,9 +32,7 @@ #![feature(custom_test_frameworks)] #![feature(drain_filter)] #![feature(generators)] -#![feature(generic_associated_types)] #![feature(hash_drain_filter)] -#![feature(let_else)] #![feature(let_chains)] #![feature(lint_reasons)] #![feature(map_first_last)] diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index 35a6cf965f5b..5106624ae9f0 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -30,12 +30,10 @@ #![deny(rustdoc::broken_intra_doc_links)] #![feature(trait_alias)] #![feature(type_alias_impl_trait)] -#![feature(generic_associated_types)] #![feature(more_qualified_paths)] #![feature(lint_reasons)] #![feature(binary_heap_drain_sorted)] #![feature(map_first_last)] -#![feature(let_else)] #![feature(let_chains)] #![feature(hash_drain_filter)] #![feature(drain_filter)] From 124636d135c7b614b43608f5aaa47b3b746581ee Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 22 Sep 2022 10:52:48 +0200 Subject: [PATCH 06/12] clippy --- src/common/src/buffer/bitmap.rs | 2 +- src/common/src/types/ordered_float.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/common/src/buffer/bitmap.rs b/src/common/src/buffer/bitmap.rs index 7672a6956414..8c34ecdf387d 100644 --- a/src/common/src/buffer/bitmap.rs +++ b/src/common/src/buffer/bitmap.rs @@ -213,7 +213,7 @@ impl Bitmap { } fn num_bytes(num_bits: usize) -> usize { - num_bits / 8 + if num_bits % 8 > 0 { 1 } else { 0 } + num_bits / 8 + usize::from(num_bits % 8 > 0) } /// Returns the number of valid bits in the bitmap, diff --git a/src/common/src/types/ordered_float.rs b/src/common/src/types/ordered_float.rs index 51d2b4c83b26..5bd530f58177 100644 --- a/src/common/src/types/ordered_float.rs +++ b/src/common/src/types/ordered_float.rs @@ -55,7 +55,7 @@ use core::str::FromStr; pub use num_traits::Float; use num_traits::{ AsPrimitive, Bounded, CheckedAdd, CheckedDiv, CheckedMul, CheckedNeg, CheckedRem, CheckedSub, - FromPrimitive, Num, NumCast, One, Signed, ToPrimitive, Zero, + FromPrimitive, Num, One, Signed, ToPrimitive, Zero, }; // masks for the parts of the IEEE 754 float @@ -566,7 +566,7 @@ impl One for OrderedFloat { } } -impl NumCast for OrderedFloat { +impl num_traits::NumCast for OrderedFloat { #[inline] fn from(n: F) -> Option { T::from(n).map(OrderedFloat) @@ -944,7 +944,7 @@ fn raw_double_bits(f: &F) -> u64 { } let exp_u64 = exp as u16 as u64; - let sign_u64 = if sign > 0 { 1u64 } else { 0u64 }; + let sign_u64 = u64::from(sign > 0); (man & MAN_MASK) | ((exp_u64 << 52) & EXP_MASK) | ((sign_u64 << 63) & SIGN_MASK) } From e063818086eddf2d6fd0dde63a2d284ce3b8b002 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 17 Oct 2022 00:03:22 +0200 Subject: [PATCH 07/12] 20221016 -- it works! --- ci/build-ci-image.sh | 2 +- ci/docker-compose.yml | 4 +- rust-toolchain | 2 +- src/batch/src/executor/join/lookup_join.rs | 4 +- src/batch/src/task/env.rs | 1 - src/cmd_all/src/bin/risingwave.rs | 1 + src/common/src/array/mod.rs | 4 +- src/common/src/cache.rs | 4 +- src/common/src/types/chrono_wrapper.rs | 9 ++--- src/common/src/types/interval.rs | 16 ++++---- .../src/source/nexmark/source/generator.rs | 10 ++--- src/expr/src/expr/expr_unary.rs | 2 +- src/expr/src/vector_op/cast.rs | 4 +- src/frontend/planner_test/src/lib.rs | 2 +- src/frontend/src/catalog/schema_catalog.rs | 6 +-- src/frontend/src/catalog/table_catalog.rs | 2 +- src/frontend/src/handler/drop_schema.rs | 2 +- src/frontend/src/lib.rs | 2 +- .../src/optimizer/plan_node/batch_sort_agg.rs | 4 +- .../src/optimizer/plan_node/logical_join.rs | 8 ++-- .../optimizer/plan_node/logical_project.rs | 5 ++- .../plan_node/logical_project_set.rs | 5 ++- src/frontend/src/optimizer/rule/apply_join.rs | 40 +++++-------------- src/frontend/src/scheduler/plan_fragmenter.rs | 2 +- src/frontend/src/session.rs | 6 +-- src/frontend/src/utils/condition.rs | 6 +-- src/meta/src/barrier/command.rs | 2 +- .../base_level_compaction_picker.rs | 4 +- .../hummock/compaction/overlap_strategy.rs | 2 +- src/meta/src/lib.rs | 12 ++++-- src/meta/src/stream/scale.rs | 2 +- src/meta/src/stream/scheduler.rs | 4 +- src/meta/src/stream/source_manager.rs | 5 +-- src/meta/src/stream/stream_graph.rs | 8 +--- src/meta/src/stream/test_scale.rs | 4 +- src/object_store/src/object/disk.rs | 2 +- src/source/src/lib.rs | 1 - src/source/src/parser/common.rs | 4 +- src/storage/benches/bench_compactor.rs | 2 +- src/storage/compactor/src/lib.rs | 2 - .../compactor/shared_buffer_compact.rs | 2 +- src/storage/src/hummock/sstable/bloom.rs | 2 +- src/storage/src/hummock/sstable/mod.rs | 4 +- .../src/hummock/sstable/multi_builder.rs | 2 +- src/storage/src/hummock/sstable/utils.rs | 2 +- src/storage/src/hummock/sstable_store.rs | 2 +- src/storage/src/lib.rs | 2 +- src/stream/src/error.rs | 2 +- .../src/executor/aggregation/foldable.rs | 2 +- .../src/executor/source/source_executor.rs | 8 ++-- .../src/executor/wrapper/update_check.rs | 2 +- src/stream/src/lib.rs | 1 - .../src/task/barrier_manager/managed_state.rs | 2 +- src/stream/src/task/env.rs | 1 - src/test_runner/src/test_runner.rs | 4 +- src/tests/sqlsmith/src/lib.rs | 1 + src/tests/sqlsmith/tests/test_runner.rs | 2 + src/utils/memcomparable/src/ser.rs | 2 +- 58 files changed, 109 insertions(+), 141 deletions(-) diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index aecd45800e75..bb23abf1b8da 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -14,7 +14,7 @@ export RUST_TOOLCHAIN=$(cat ../rust-toolchain) # !!! CHANGE THIS WHEN YOU WANT TO BUMP CI IMAGE !!! # # AND ALSO docker-compose.yml # ###################################################### -export BUILD_ENV_VERSION=v20220922 +export BUILD_ENV_VERSION=v20221016 export BUILD_TAG="public.ecr.aws/x5u3w5h6/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 768dcb913739..c48b11329cf6 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -42,12 +42,12 @@ services: retries: 5 rw-build-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20220922 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20221016 volumes: - ..:/risingwave regress-test-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20220922 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20221016 depends_on: db: condition: service_healthy diff --git a/rust-toolchain b/rust-toolchain index e440ea607181..3a67502bb5cd 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly-2022-09-22 +nightly-2022-10-16 diff --git a/src/batch/src/executor/join/lookup_join.rs b/src/batch/src/executor/join/lookup_join.rs index bf42ae72fddb..dc74936e2e96 100644 --- a/src/batch/src/executor/join/lookup_join.rs +++ b/src/batch/src/executor/join/lookup_join.rs @@ -223,7 +223,7 @@ impl LookupExecutorBuilder for InnerSideExecutorBuilder let list = self .pu_to_scan_range_mapping .entry(parallel_unit_id) - .or_insert(vec![]); + .or_default(); list.push((scan_range, vnode)); Ok(()) @@ -580,7 +580,7 @@ impl BoxedExecutorBuilder for LookupJoinExecutorBuilder { let inner_side_key_types = inner_side_key_idxs .iter() - .map(|&i| inner_side_schema.fields[i as usize].data_type.clone()) + .map(|&i| inner_side_schema.fields[i].data_type.clone()) .collect_vec(); let null_safe = lookup_join_node.get_null_safe().to_vec(); diff --git a/src/batch/src/task/env.rs b/src/batch/src/task/env.rs index 79c221597cf0..4c67d4da691f 100644 --- a/src/batch/src/task/env.rs +++ b/src/batch/src/task/env.rs @@ -115,7 +115,6 @@ impl BatchEnvironment { self.task_manager.clone() } - #[expect(clippy::explicit_auto_deref)] pub fn source_manager(&self) -> &dyn SourceManager { &*self.source_manager } diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index eec455e7f8e4..a28ba01de3fd 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -13,6 +13,7 @@ // limitations under the License. #![cfg_attr(coverage, feature(no_coverage))] +#![feature(let_chains)] use global_stats_alloc::INSTRUMENTED_JEMALLOC; use stats_alloc::StatsAlloc; diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index 57c48ca8aac6..13991b4bad3b 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -659,7 +659,7 @@ mod tests { fn test_filter() { let mut builder = PrimitiveArrayBuilder::::new(0); for i in 0..=60 { - builder.append(Some(i as i32)); + builder.append(Some(i)); } let array = filter(&builder.finish(), |x| x.unwrap_or(0) >= 60).unwrap(); assert_eq!(array.iter().collect::>>(), vec![Some(60)]); @@ -692,7 +692,7 @@ mod tests { fn test_vectorized_add() { let mut builder = PrimitiveArrayBuilder::::new(0); for i in 0..=60 { - builder.append(Some(i as i32)); + builder.append(Some(i)); } let array1 = builder.finish(); diff --git a/src/common/src/cache.rs b/src/common/src/cache.rs index 5745dfcb550b..d79ae230de5d 100644 --- a/src/common/src/cache.rs +++ b/src/common/src/cache.rs @@ -338,12 +338,12 @@ unsafe impl Send for LruCacheShard {} impl LruCacheShard { fn new(capacity: usize, object_capacity: usize) -> Self { - let mut lru = Box::new(LruHandle::default()); + let mut lru = Box::>::default(); lru.prev = lru.as_mut(); lru.next = lru.as_mut(); let mut object_pool = Vec::with_capacity(object_capacity); for _ in 0..object_capacity { - object_pool.push(Box::new(LruHandle::default())); + object_pool.push(Box::default()); } Self { capacity, diff --git a/src/common/src/types/chrono_wrapper.rs b/src/common/src/types/chrono_wrapper.rs index 8b15766251f9..e54b81328db6 100644 --- a/src/common/src/types/chrono_wrapper.rs +++ b/src/common/src/types/chrono_wrapper.rs @@ -78,7 +78,7 @@ impl NaiveDateWrapper { pub fn with_days_value(days: i32) -> value_encoding::Result { Ok(NaiveDateWrapper::new( NaiveDate::from_num_days_from_ce_opt(days) - .ok_or(ValueEncodingError::InvalidNaiveDateEncoding(days))?, + .ok_or_else(|| ValueEncodingError::InvalidNaiveDateEncoding(days))?, )) } @@ -116,7 +116,7 @@ impl NaiveTimeWrapper { pub fn with_secs_nano_value(secs: u32, nano: u32) -> value_encoding::Result { Ok(NaiveTimeWrapper::new( NaiveTime::from_num_seconds_from_midnight_opt(secs, nano) - .ok_or(ValueEncodingError::InvalidNaiveTimeEncoding(secs, nano))?, + .ok_or_else(|| ValueEncodingError::InvalidNaiveTimeEncoding(secs, nano))?, )) } @@ -163,9 +163,8 @@ impl NaiveDateTimeWrapper { pub fn with_secs_nsecs_value(secs: i64, nsecs: u32) -> value_encoding::Result { Ok(NaiveDateTimeWrapper::new({ - NaiveDateTime::from_timestamp_opt(secs, nsecs).ok_or( - ValueEncodingError::InvalidNaiveDateTimeEncoding(secs, nsecs), - )? + NaiveDateTime::from_timestamp_opt(secs, nsecs) + .ok_or_else(|| ValueEncodingError::InvalidNaiveDateTimeEncoding(secs, nsecs))? })) } diff --git a/src/common/src/types/interval.rs b/src/common/src/types/interval.rs index d013f7a04bc2..665ae37063c8 100644 --- a/src/common/src/types/interval.rs +++ b/src/common/src/types/interval.rs @@ -138,7 +138,7 @@ impl IntervalUnit { IntervalUnit { months: (months as i32), days: (days as i32), - ms: (remaining_ms as i64), + ms: remaining_ms, } } @@ -658,8 +658,8 @@ mod tests { ]; for (lhs, rhs, expected) in cases { - let lhs = IntervalUnit::new(lhs.0 as i32, lhs.1 as i32, lhs.2 as i64); - let rhs = IntervalUnit::new(rhs.0 as i32, rhs.1 as i32, rhs.2 as i64); + let lhs = IntervalUnit::new(lhs.0, lhs.1, lhs.2 as i64); + let rhs = IntervalUnit::new(rhs.0, rhs.1, rhs.2 as i64); let result = std::panic::catch_unwind(|| { let actual = lhs.exact_div(&rhs); assert_eq!(actual, expected); @@ -688,13 +688,13 @@ mod tests { ]; for (lhs, rhs, expected) in cases_int { - let lhs = IntervalUnit::new(lhs.0 as i32, lhs.1 as i32, lhs.2 as i64); - let expected = expected.map(|x| IntervalUnit::new(x.0 as i32, x.1 as i32, x.2 as i64)); + let lhs = IntervalUnit::new(lhs.0, lhs.1, lhs.2 as i64); + let expected = expected.map(|x| IntervalUnit::new(x.0, x.1, x.2 as i64)); let actual = lhs.div_float(rhs as i16); assert_eq!(actual, expected); - let actual = lhs.div_float(rhs as i32); + let actual = lhs.div_float(rhs); assert_eq!(actual, expected); let actual = lhs.div_float(rhs as i64); @@ -702,8 +702,8 @@ mod tests { } for (lhs, rhs, expected) in cases_float { - let lhs = IntervalUnit::new(lhs.0 as i32, lhs.1 as i32, lhs.2 as i64); - let expected = expected.map(|x| IntervalUnit::new(x.0 as i32, x.1 as i32, x.2 as i64)); + let lhs = IntervalUnit::new(lhs.0, lhs.1, lhs.2 as i64); + let expected = expected.map(|x| IntervalUnit::new(x.0, x.1, x.2 as i64)); let actual = lhs.div_float(OrderedFloat::(rhs)); assert_eq!(actual, expected); diff --git a/src/connector/src/source/nexmark/source/generator.rs b/src/connector/src/source/nexmark/source/generator.rs index 65823b447174..817300a4ccdf 100644 --- a/src/connector/src/source/nexmark/source/generator.rs +++ b/src/connector/src/source/nexmark/source/generator.rs @@ -67,9 +67,7 @@ impl NexmarkEventGenerator { if let Some(event) = self.last_event.take() { num_event += 1; - res.push( - NexmarkMessage::new(self.split_id.clone(), self.events_so_far as u64, event).into(), - ); + res.push(NexmarkMessage::new(self.split_id.clone(), self.events_so_far, event).into()); } while num_event < self.max_chunk_size { @@ -105,14 +103,12 @@ impl NexmarkEventGenerator { } num_event += 1; - res.push( - NexmarkMessage::new(self.split_id.clone(), self.events_so_far as u64, event).into(), - ); + res.push(NexmarkMessage::new(self.split_id.clone(), self.events_so_far, event).into()); } if !self.use_real_time && self.min_event_gap_in_ns > 0 { tokio::time::sleep(std::time::Duration::from_nanos( - (self.events_so_far - old_events_so_far) as u64 * self.min_event_gap_in_ns, + (self.events_so_far - old_events_so_far) * self.min_event_gap_in_ns, )) .await; } diff --git a/src/expr/src/expr/expr_unary.rs b/src/expr/src/expr/expr_unary.rs index 80552de61db8..5239b28c2b52 100644 --- a/src/expr/src/expr/expr_unary.rs +++ b/src/expr/src/expr/expr_unary.rs @@ -343,7 +343,7 @@ mod tests { for i in 0..100i16 { if i % 2 == 0 { target.push(Some(i as i32)); - input.push(Some(i as i16)); + input.push(Some(i)); } else { input.push(None); target.push(None); diff --git a/src/expr/src/vector_op/cast.rs b/src/expr/src/vector_op/cast.rs index 48a79a69bfd6..20d486831f44 100644 --- a/src/expr/src/vector_op/cast.rs +++ b/src/expr/src/vector_op/cast.rs @@ -65,7 +65,7 @@ fn parse_naive_datetime(s: &str) -> Result { res.time.hour as u32, res.time.minute as u32, res.time.second as u32, - res.time.microsecond as u32, + res.time.microsecond, ); Ok(NaiveDateTime::new(date, time)) } else { @@ -94,7 +94,7 @@ fn parse_naive_time(s: &str) -> Result { res.hour as u32, res.minute as u32, res.second as u32, - res.microsecond as u32, + res.microsecond, )) } diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index 9939d933e8ab..214ffce8170c 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(label_break_value)] +#![feature(let_chains)] #![allow(clippy::derive_partial_eq_without_eq)] //! Data-driven tests. diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index cecea8ad7fbf..dec7510ddc12 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -151,7 +151,7 @@ impl SchemaCatalog { /// Iterate all indices pub fn iter_index(&self) -> impl Iterator { - self.index_by_name.iter().map(|(_, v)| v) + self.index_by_name.values() } /// Iterate all sources, including the materialized sources. @@ -174,11 +174,11 @@ impl SchemaCatalog { } pub fn iter_sink(&self) -> impl Iterator { - self.sink_by_name.iter().map(|(_, v)| v) + self.sink_by_name.values() } pub fn iter_system_tables(&self) -> impl Iterator { - self.system_table_by_name.iter().map(|(_, v)| v) + self.system_table_by_name.values() } pub fn get_table_by_name(&self, table_name: &str) -> Option<&TableCatalog> { diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index a4634b684e34..16f5e9a99419 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -164,7 +164,7 @@ impl TableCatalog { pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> ProstTable { ProstTable { - id: self.id.table_id as u32, + id: self.id.table_id, schema_id, database_id, name: self.name.clone(), diff --git a/src/frontend/src/handler/drop_schema.rs b/src/frontend/src/handler/drop_schema.rs index b5bfc561be98..7ee524cfd5d5 100644 --- a/src/frontend/src/handler/drop_schema.rs +++ b/src/frontend/src/handler/drop_schema.rs @@ -59,7 +59,7 @@ pub async fn handle_drop_schema( }; let schema_id = { // If the mode is `Restrict` or `None`, the `schema` need to be empty. - if Some(DropMode::Restrict) == mode || None == mode { + if Some(DropMode::Restrict) == mode || mode.is_none() { if let Some(table) = schema.iter_table().next() { return Err(CatalogError::NotEmpty( "schema", diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index bdc33ccb43ba..5dd665a3d19c 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -21,8 +21,8 @@ #![feature(trait_alias)] #![feature(drain_filter)] #![feature(if_let_guard)] +#![feature(let_chains)] #![feature(assert_matches)] -#![feature(map_first_last)] #![feature(lint_reasons)] #![feature(box_patterns)] #![feature(once_cell)] diff --git a/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs b/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs index 4840f264e3b5..f8d54051aab1 100644 --- a/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs +++ b/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs @@ -122,9 +122,7 @@ impl ToBatchProst for BatchSortAgg { .group_key() .iter() .clone() - .map(|idx| { - ExprImpl::InputRef(Box::new(InputRef::new(*idx as usize, DataType::Int32))) - }) + .map(|idx| ExprImpl::InputRef(Box::new(InputRef::new(*idx, DataType::Int32)))) .map(|expr| expr.to_expr_proto()) .collect::>(), }) diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index 97bc471d2180..b9ed78bca3bd 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -1148,13 +1148,13 @@ impl ToStream for LogicalJoin { .logical_pk() .iter() .cloned() - .filter(|i| l2i.try_map(*i) == None); + .filter(|i| l2i.try_map(*i).is_none()); let right_to_add = right .logical_pk() .iter() .cloned() - .filter(|i| r2i.try_map(*i) == None) + .filter(|i| r2i.try_map(*i).is_none()) .map(|i| i + left_len); // NOTE(st1page): add join keys in the pk_indices a work around before we really have stream @@ -1167,7 +1167,7 @@ impl ToStream for LogicalJoin { eq_predicate .left_eq_indexes() .into_iter() - .filter(|i| l2i.try_map(*i) == None), + .filter(|i| l2i.try_map(*i).is_none()), ) .unique(); let right_to_add = right_to_add @@ -1175,7 +1175,7 @@ impl ToStream for LogicalJoin { eq_predicate .right_eq_indexes() .into_iter() - .filter(|i| r2i.try_map(*i) == None) + .filter(|i| r2i.try_map(*i).is_none()) .map(|i| i + left_len), ) .unique(); diff --git a/src/frontend/src/optimizer/plan_node/logical_project.rs b/src/frontend/src/optimizer/plan_node/logical_project.rs index a008dd17467b..8f1eefcbc750 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project.rs @@ -451,7 +451,10 @@ impl ToStream for LogicalProject { // Add missing columns of input_pk into the select list. let input_pk = input.logical_pk(); let i2o = Self::i2o_col_mapping_inner(input.schema().len(), proj.exprs()); - let col_need_to_add = input_pk.iter().cloned().filter(|i| i2o.try_map(*i) == None); + let col_need_to_add = input_pk + .iter() + .cloned() + .filter(|i| i2o.try_map(*i).is_none()); let input_schema = input.schema(); let exprs = proj.exprs() diff --git a/src/frontend/src/optimizer/plan_node/logical_project_set.rs b/src/frontend/src/optimizer/plan_node/logical_project_set.rs index 0e5e81729832..e4b6994dc6ee 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project_set.rs @@ -343,7 +343,10 @@ impl ToStream for LogicalProjectSet { // Add missing columns of input_pk into the select list. let input_pk = input.logical_pk(); let i2o = Self::i2o_col_mapping_inner(input.schema().len(), project_set.select_list()); - let col_need_to_add = input_pk.iter().cloned().filter(|i| i2o.try_map(*i) == None); + let col_need_to_add = input_pk + .iter() + .cloned() + .filter(|i| i2o.try_map(*i).is_none()); let input_schema = input.schema(); let select_list = project_set diff --git a/src/frontend/src/optimizer/rule/apply_join.rs b/src/frontend/src/optimizer/rule/apply_join.rs index ec221253d8c8..541a1a4260d3 100644 --- a/src/frontend/src/optimizer/rule/apply_join.rs +++ b/src/frontend/src/optimizer/rule/apply_join.rs @@ -199,23 +199,14 @@ impl ApplyJoinRule { let mut d_t1_bit_set = FixedBitSet::with_capacity(apply_len); d_t1_bit_set.set_range(0..apply_left_len + join_left_len, true); - for (key, group) in &apply_on.into_iter().group_by(|expr| { + let (other, left): (Vec<_>, Vec<_>) = apply_on.into_iter().partition(|expr| { let mut visitor = CollectInputRef::with_capacity(apply_len); visitor.visit_expr(expr); let collect_bit_set = FixedBitSet::from(visitor); - if collect_bit_set.is_subset(&d_t1_bit_set) { - 0 - } else { - 1 - } - }) { - let vec = group.collect_vec(); - match key { - 0 => left_apply_condition.extend(vec), - 1 => other_condition.extend(vec), - _ => unreachable!(), - } - } + collect_bit_set.is_subset(&d_t1_bit_set) + }); + left_apply_condition.extend(left); + other_condition.extend(other); } JoinType::RightSemi | JoinType::RightAnti | JoinType::Unspecified => unreachable!(), } @@ -297,23 +288,14 @@ impl ApplyJoinRule { d_t2_bit_set.set_range(0..apply_left_len, true); d_t2_bit_set.set_range(apply_left_len + join_left_len..apply_len, true); - for (key, group) in &apply_on.into_iter().group_by(|expr| { + let (other, right): (Vec<_>, Vec<_>) = apply_on.into_iter().partition(|expr| { let mut visitor = CollectInputRef::with_capacity(apply_len); visitor.visit_expr(expr); - let collect_bit_set = FixedBitSet::from(visitor); - if collect_bit_set.is_subset(&d_t2_bit_set) { - 0 - } else { - 1 - } - }) { - let vec = group.collect_vec(); - match key { - 0 => right_apply_condition.extend(vec), - 1 => other_condition.extend(vec), - _ => unreachable!(), - } - } + let collected = FixedBitSet::from(visitor); + collected.is_subset(&d_t2_bit_set) + }); + right_apply_condition.extend(right); + other_condition.extend(other); // rewrite right condition let mut right_apply_condition_rewriter = Rewriter { diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 530330d226ed..339f0d28245c 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -527,7 +527,7 @@ fn vnode_mapping_to_owner_mapping(vnode_mapping: VnodeMapping) -> HashMap Bitmap { let mut bitmap = BitmapBuilder::zeroed(num_vnodes); - bitmap.set(vnode as usize, true); + bitmap.set(vnode, true); bitmap.finish() } diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 1a9171a5fadc..65ee28e03020 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -366,7 +366,6 @@ impl FrontendEnv { } /// Get a reference to the frontend env's catalog writer. - #[expect(clippy::explicit_auto_deref)] pub fn catalog_writer(&self) -> &dyn CatalogWriter { &*self.catalog_writer } @@ -377,7 +376,6 @@ impl FrontendEnv { } /// Get a reference to the frontend env's user info writer. - #[expect(clippy::explicit_auto_deref)] pub fn user_info_writer(&self) -> &dyn UserInfoWriter { &*self.user_info_writer } @@ -387,16 +385,14 @@ impl FrontendEnv { &self.user_info_reader } - #[expect(clippy::explicit_auto_deref)] pub fn worker_node_manager(&self) -> &WorkerNodeManager { - &*self.worker_node_manager + &self.worker_node_manager } pub fn worker_node_manager_ref(&self) -> WorkerNodeManagerRef { self.worker_node_manager.clone() } - #[expect(clippy::explicit_auto_deref)] pub fn meta_client(&self) -> &dyn FrontendMetaClient { &*self.meta_client } diff --git a/src/frontend/src/utils/condition.rs b/src/frontend/src/utils/condition.rs index 3444b3e0bccb..c65a42c401bf 100644 --- a/src/frontend/src/utils/condition.rs +++ b/src/frontend/src/utils/condition.rs @@ -229,11 +229,7 @@ impl Condition { pub fn split_disjoint(self, columns: &FixedBitSet) -> (Self, Self) { self.group_by::<_, 2>(|expr| { let input_bits = expr.collect_input_refs(columns.len()); - if input_bits.is_disjoint(columns) { - 1 - } else { - 0 - } + input_bits.is_disjoint(columns) as usize }) .into_iter() .next_tuple() diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 9af04ac436c7..875a87d83338 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -468,7 +468,7 @@ where .worker_node_id; node_dropped_actors .entry(node_id as WorkerId) - .or_insert(vec![]) + .or_insert(Vec::new()) .push(*actor_id as ActorId); } } diff --git a/src/meta/src/hummock/compaction/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/base_level_compaction_picker.rs index f8f8ca731974..c3ee2f2b7ee0 100644 --- a/src/meta/src/hummock/compaction/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/base_level_compaction_picker.rs @@ -63,7 +63,7 @@ impl CompactionPicker for LevelCompactionPicker { table_infos: l0.sub_levels[0].table_infos.clone(), }, InputLevel { - level_idx: target_level as u32, + level_idx: target_level, level_type: LevelType::Nonoverlapping as i32, table_infos: vec![], }, @@ -131,7 +131,7 @@ impl CompactionPicker for LevelCompactionPicker { // reverse because the ix of low sub-level is smaller. input_levels.reverse(); input_levels.push(InputLevel { - level_idx: target_level as u32, + level_idx: target_level, level_type: LevelType::Nonoverlapping as i32, table_infos: levels.get_level(self.target_level).table_infos.clone(), }); diff --git a/src/meta/src/hummock/compaction/overlap_strategy.rs b/src/meta/src/hummock/compaction/overlap_strategy.rs index 59b6b1b37bb6..849184056e8e 100644 --- a/src/meta/src/hummock/compaction/overlap_strategy.rs +++ b/src/meta/src/hummock/compaction/overlap_strategy.rs @@ -116,7 +116,7 @@ impl OverlapStrategy for RangeOverlapStrategy { } fn create_overlap_info(&self) -> Box { - Box::new(RangeOverlapInfo::default()) + Box::::default() } } diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 2ef7fd941f48..5ccf054dbd72 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -12,18 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![expect(clippy::iter_kv_map, reason = "FIXME: fix later")] +#![expect( + clippy::or_fun_call, + reason = "https://github.com/rust-lang/rust-clippy/issues/8574" +)] #![allow(clippy::derive_partial_eq_without_eq)] #![feature(trait_alias)] #![feature(binary_heap_drain_sorted)] #![feature(option_result_contains)] #![feature(type_alias_impl_trait)] -#![feature(map_first_last)] #![feature(drain_filter)] #![feature(custom_test_frameworks)] #![feature(lint_reasons)] #![feature(map_try_insert)] #![feature(hash_drain_filter)] -#![feature(is_some_with)] +#![feature(is_some_and)] #![feature(btree_drain_filter)] #![feature(result_option_inspect)] #![feature(once_cell)] @@ -178,8 +182,8 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { let barrier_interval = Duration::from_millis(meta_config.streaming.barrier_interval_ms as u64); let max_idle_ms = opts.dangerous_max_idle_secs.unwrap_or(0) * 1000; - let in_flight_barrier_nums = meta_config.streaming.in_flight_barrier_nums as usize; - let checkpoint_frequency = meta_config.streaming.checkpoint_frequency as usize; + let in_flight_barrier_nums = meta_config.streaming.in_flight_barrier_nums; + let checkpoint_frequency = meta_config.streaming.checkpoint_frequency; tracing::info!("Meta server listening at {}", listen_addr); let add_info = AddressInfo { diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 9fa669cfca5c..f794041d9339 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -365,7 +365,7 @@ where for downstream_actor_id in &dispatcher.downstream_actor_id { upstream_dispatchers .entry(*downstream_actor_id as ActorId) - .or_insert(vec![]) + .or_default() .push(( stream_actor.fragment_id as FragmentId, *actor_id as ActorId, diff --git a/src/meta/src/stream/scheduler.rs b/src/meta/src/stream/scheduler.rs index 3c90fdfeca2a..23bfe85bf713 100644 --- a/src/meta/src/stream/scheduler.rs +++ b/src/meta/src/stream/scheduler.rs @@ -341,7 +341,7 @@ mod test { fragment_id, nodes: Some(StreamNode { node_body: Some(NodeBody::Materialize(MaterializeNode { - table_id: fragment_id as u32, + table_id: fragment_id, ..Default::default() })), ..Default::default() @@ -398,7 +398,7 @@ mod test { for actor in fragment.actors { vnode_sum += Bitmap::from(actor.get_vnode_bitmap()?).num_high_bits(); } - assert_eq!(vnode_sum as usize, VIRTUAL_NODE_COUNT); + assert_eq!(vnode_sum, VIRTUAL_NODE_COUNT); } Ok(()) diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 9ef1c8efe3ec..28481ef9eb8e 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -227,10 +227,7 @@ where for table_frag in table_frags { for (frag_id, frag) in table_frag.fragments { let mut actors = frag.actors.iter().map(|x| x.actor_id).collect_vec(); - frag_actors - .entry(frag_id) - .or_insert(vec![]) - .append(&mut actors); + frag_actors.entry(frag_id).or_default().append(&mut actors); } } diff --git a/src/meta/src/stream/stream_graph.rs b/src/meta/src/stream/stream_graph.rs index 546a93a9224f..ac87e3a1805c 100644 --- a/src/meta/src/stream/stream_graph.rs +++ b/src/meta/src/stream/stream_graph.rs @@ -466,14 +466,13 @@ impl StreamGraphBuilder { actor_id_offset: u32, actor_id_len: u32, ) -> MetaResult>> { - let mut graph = HashMap::new(); + let mut graph: HashMap> = HashMap::new(); for builder in self.actor_builders.values_mut() { builder.seal(actor_id_offset, actor_id_len); } for builder in self.actor_builders.values() { - let actor_id = builder.actor_id; let fragment_id = builder.get_fragment_id(); let mut actor = builder.build(); let mut upstream_actors = builder @@ -489,7 +488,6 @@ impl StreamGraphBuilder { let stream_node = self.build_inner( ctx, actor.get_nodes()?, - actor_id, fragment_id, &mut upstream_actors, &mut upstream_fragments, @@ -498,7 +496,7 @@ impl StreamGraphBuilder { actor.nodes = Some(stream_node); graph .entry(builder.get_fragment_id()) - .or_insert(vec![]) + .or_default() .push(actor); } Ok(graph) @@ -514,7 +512,6 @@ impl StreamGraphBuilder { &self, ctx: &mut CreateMaterializedViewContext, stream_node: &StreamNode, - actor_id: LocalActorId, fragment_id: GlobalFragmentId, upstream_actor_id: &mut HashMap, upstream_fragment_id: &mut HashMap, @@ -659,7 +656,6 @@ impl StreamGraphBuilder { _ => self.build_inner( ctx, input, - actor_id, fragment_id, upstream_actor_id, upstream_fragment_id, diff --git a/src/meta/src/stream/test_scale.rs b/src/meta/src/stream/test_scale.rs index f05b2ace3434..d8765d842ab2 100644 --- a/src/meta/src/stream/test_scale.rs +++ b/src/meta/src/stream/test_scale.rs @@ -115,9 +115,9 @@ mod tests { assert_eq!(vnode_mapping.len(), VIRTUAL_NODE_COUNT); - let mut check = HashMap::new(); + let mut check: HashMap> = HashMap::new(); for (idx, parallel_unit_id) in vnode_mapping.into_iter().enumerate() { - check.entry(parallel_unit_id).or_insert(vec![]).push(idx); + check.entry(parallel_unit_id).or_default().push(idx); } assert_eq!(check.len(), parallel_units_num); diff --git a/src/object_store/src/object/disk.rs b/src/object_store/src/object/disk.rs index 0c93e9d8273d..e56429552124 100644 --- a/src/object_store/src/object/disk.rs +++ b/src/object_store/src/object/disk.rs @@ -247,7 +247,7 @@ impl ObjectStore for DiskObjectStore { let path_owned = path.to_owned(); let block_loc = *block_loc_ref; let future = utils::asyncify(move || { - let mut buf = vec![0; block_loc.size as usize]; + let mut buf = vec![0; block_loc.size]; file_holder .value() .read_exact_at(&mut buf, block_loc.offset as u64) diff --git a/src/source/src/lib.rs b/src/source/src/lib.rs index 3fda839eb519..4fb46d9b4cc6 100644 --- a/src/source/src/lib.rs +++ b/src/source/src/lib.rs @@ -59,7 +59,6 @@ pub enum SourceImpl { Connector(ConnectorSource), } -#[expect(clippy::large_enum_variant)] pub enum SourceStreamReaderImpl { Table(TableStreamReader), Connector(ConnectorSourceReader), diff --git a/src/source/src/parser/common.rs b/src/source/src/parser/common.rs index 70cea2e65740..7e8f4157df39 100644 --- a/src/source/src/parser/common.rs +++ b/src/source/src/parser/common.rs @@ -70,7 +70,7 @@ fn do_parse_json_value(column: &ColumnDesc, v: &Value) -> Result { ), DataType::Int64 => ensure_int!(v, i64).into(), DataType::Float32 => ScalarImpl::Float32((ensure_float!(v, f32) as f32).into()), - DataType::Float64 => ScalarImpl::Float64((ensure_float!(v, f64) as f64).into()), + DataType::Float64 => ScalarImpl::Float64((ensure_float!(v, f64)).into()), // FIXME: decimal should have more precision than f64 DataType::Decimal => Decimal::from_f64(ensure_float!(v, Decimal)) .ok_or_else(|| anyhow!("expect decimal"))? @@ -125,7 +125,7 @@ fn do_parse_simd_json_value(column: &ColumnDesc, v: &BorrowedValue<'_>) -> Resul ), DataType::Int64 => ensure_int!(v, i64).into(), DataType::Float32 => ScalarImpl::Float32((simd_json_ensure_float!(v, f32) as f32).into()), - DataType::Float64 => ScalarImpl::Float64((simd_json_ensure_float!(v, f64) as f64).into()), + DataType::Float64 => ScalarImpl::Float64((simd_json_ensure_float!(v, f64)).into()), // FIXME: decimal should have more precision than f64 DataType::Decimal => Decimal::from_f64(simd_json_ensure_float!(v, Decimal)) .ok_or_else(|| anyhow!("expect decimal"))? diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index cebcea08972a..0221e058646d 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -96,7 +96,7 @@ async fn build_table( let user_len = full_key.len() - 8; for i in range { let start = (i % 8) as usize; - let end = (start + 8) as usize; + let end = start + 8; full_key[(user_len - 8)..user_len].copy_from_slice(&i.to_be_bytes()); builder .add(&full_key, HummockValue::put(&value[start..end]), true) diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index 2b15b7601d03..85e9879338b9 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(let_else)] - mod compactor_observer; mod rpc; mod server; diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index a6edb7808053..adcf72313c26 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -237,7 +237,7 @@ async fn compact_shared_buffer( context .stats .write_build_l0_bytes - .inc_by(sst_info.file_size as u64); + .inc_by(sst_info.file_size); } level0.extend(ssts); } diff --git a/src/storage/src/hummock/sstable/bloom.rs b/src/storage/src/hummock/sstable/bloom.rs index 49349b221fc9..edfdc3fb34ea 100644 --- a/src/storage/src/hummock/sstable/bloom.rs +++ b/src/storage/src/hummock/sstable/bloom.rs @@ -79,7 +79,7 @@ impl<'a> Bloom<'a> { // 0.69 is approximately ln(2) let k = ((bits_per_key as f64) * 0.69) as u32; // limit k in [1, 30] - let k = k.min(30).max(1); + let k = k.clamp(1, 30); // For small len(keys), we set a minimum Bloom filter length to avoid high FPR let nbits = (keys.len() * bits_per_key).max(64); let nbytes = (nbits + 7) / 8; diff --git a/src/storage/src/hummock/sstable/mod.rs b/src/storage/src/hummock/sstable/mod.rs index 4035b5ad3b1b..4a2744291508 100644 --- a/src/storage/src/hummock/sstable/mod.rs +++ b/src/storage/src/hummock/sstable/mod.rs @@ -196,8 +196,8 @@ impl SstableMeta { block_meta.encode(buf); } put_length_prefixed_slice(buf, &self.bloom_filter); - buf.put_u32_le(self.estimated_size as u32); - buf.put_u32_le(self.key_count as u32); + buf.put_u32_le(self.estimated_size); + buf.put_u32_le(self.key_count); put_length_prefixed_slice(buf, &self.smallest_key); put_length_prefixed_slice(buf, &self.largest_key); buf.put_u64_le(self.meta_offset); diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index 69656ce2501a..74af7edd3f6e 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -96,7 +96,7 @@ where /// Returns the number of [`SstableBuilder`]s. pub fn len(&self) -> usize { - self.sst_outputs.len() + if self.current_builder.is_some() { 1 } else { 0 } + self.sst_outputs.len() + self.current_builder.is_some() as usize } /// Returns true if no builder is created. diff --git a/src/storage/src/hummock/sstable/utils.rs b/src/storage/src/hummock/sstable/utils.rs index ac547867cee8..db6a8f405170 100644 --- a/src/storage/src/hummock/sstable/utils.rs +++ b/src/storage/src/hummock/sstable/utils.rs @@ -58,7 +58,7 @@ pub fn bytes_diff<'a, 'b>(base: &'a [u8], target: &'b [u8]) -> &'b [u8] { pub fn xxhash64_checksum(data: &[u8]) -> u64 { let mut hasher = twox_hash::XxHash64::with_seed(0); hasher.write(data); - hasher.finish() as u64 + hasher.finish() } /// Verifies the checksum of the data equals the given checksum with xxhash64. diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index b4c84d60a5cf..99b02d16be74 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -588,7 +588,7 @@ impl SstableWriter for StreamingUploadWriter { let join_handle = tokio::spawn(async move { let uploader_memory_usage = self.object_uploader.get_memory_usage(); let _tracker = self.tracker.map(|mut t| { - if !t.try_increase_memory(uploader_memory_usage as u64) { + if !t.try_increase_memory(uploader_memory_usage) { tracing::debug!("failed to allocate increase memory for data file, sst id: {}, file size: {}", self.sst_id, uploader_memory_usage); } diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 353ca0b616cd..5bc9c22a161b 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -23,7 +23,6 @@ #![feature(hash_drain_filter)] #![feature(let_chains)] #![feature(lint_reasons)] -#![feature(map_first_last)] #![feature(proc_macro_hygiene)] #![feature(result_option_inspect)] #![feature(stmt_expr_attributes)] @@ -41,6 +40,7 @@ #![cfg_attr(coverage, feature(no_coverage))] #![feature(error_generic_member_access)] #![feature(provide_any)] +#![expect(clippy::result_large_err, reason = "FIXME: HummockError is large")] pub mod hummock; pub mod keyspace; diff --git a/src/stream/src/error.rs b/src/stream/src/error.rs index c7c68dfe9b83..51b9ecd9936d 100644 --- a/src/stream/src/error.rs +++ b/src/stream/src/error.rs @@ -58,7 +58,7 @@ impl std::fmt::Debug for StreamError { write!(f, "{}", self.inner)?; writeln!(f)?; - if let Some(backtrace) = self.inner.backtrace() { + if let Some(backtrace) = (&self.inner as &dyn Error).request_ref::() { write!(f, " backtrace of inner error:\n{}", backtrace)?; } else { write!(f, " backtrace of `StreamError`:\n{}", self.backtrace)?; diff --git a/src/stream/src/executor/aggregation/foldable.rs b/src/stream/src/executor/aggregation/foldable.rs index ff6c9fa217a6..4597c9871081 100644 --- a/src/stream/src/executor/aggregation/foldable.rs +++ b/src/stream/src/executor/aggregation/foldable.rs @@ -420,7 +420,7 @@ mod tests { /// This test uses `Box` to test a state. fn test_primitive_sum_boxed() { let mut agg: Box = - Box::new(TestStreamingSumAgg::::default()); + Box::>::default(); agg.apply_batch( &[Op::Insert, Op::Insert, Op::Insert, Op::Delete], None, diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 0a0ffc9fbaba..21249b31cfe0 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -154,7 +154,7 @@ impl SourceExecutor { // if row_id_index is None, pk is not row_id, so no need to gen row_id and refill chunk if let Some(row_id_index) = row_id_index { - let row_id_column_id = self.source_desc.columns[row_id_index as usize].column_id; + let row_id_column_id = self.source_desc.columns[row_id_index].column_id; if let Some(idx) = self .column_ids .iter() @@ -202,8 +202,8 @@ impl SourceExecutor { async fn take_snapshot(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { let cache = self .state_cache - .iter() - .map(|(_, split_impl)| split_impl.to_owned()) + .values() + .map(|split_impl| split_impl.to_owned()) .collect_vec(); if !cache.is_empty() { @@ -229,7 +229,7 @@ impl SourceExecutor { state, self.column_ids.clone(), self.source_desc.metrics.clone(), - SourceContext::new(self.ctx.id as u32, self.source_id), + SourceContext::new(self.ctx.id, self.source_id), ) .await .map(SourceStreamReaderImpl::Connector), diff --git a/src/stream/src/executor/wrapper/update_check.rs b/src/stream/src/executor/wrapper/update_check.rs index 9f2d1b8e54bd..28c7e2ea8099 100644 --- a/src/stream/src/executor/wrapper/update_check.rs +++ b/src/stream/src/executor/wrapper/update_check.rs @@ -37,7 +37,7 @@ pub async fn update_check(info: Arc, input: impl MessageStream) { .map(|r| (r.unzip())) .tuple_windows() { - if (op1 == None && op2 == Some(Op::UpdateInsert)) // the first row is U+ + if (op1.is_none() && op2 == Some(Op::UpdateInsert)) // the first row is U+ || (op1 == Some(Op::UpdateDelete) && op2 != Some(Op::UpdateInsert)) { panic!( diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index c76df37bd0f1..9b90640bae20 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -20,7 +20,6 @@ #![feature(more_qualified_paths)] #![feature(lint_reasons)] #![feature(binary_heap_drain_sorted)] -#![feature(map_first_last)] #![feature(let_chains)] #![feature(hash_drain_filter)] #![feature(drain_filter)] diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 45e1d390a468..ad18e4a3ec4e 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -83,7 +83,7 @@ impl ManagedBarrierState { remaining_actors, .. }, .. - }) => (remaining_actors.is_empty()), + }) => remaining_actors.is_empty(), _ => unreachable!(), }; diff --git a/src/stream/src/task/env.rs b/src/stream/src/task/env.rs index c59ddf080001..47fb55fb236a 100644 --- a/src/stream/src/task/env.rs +++ b/src/stream/src/task/env.rs @@ -78,7 +78,6 @@ impl StreamEnvironment { &self.server_addr } - #[expect(clippy::explicit_auto_deref)] pub fn source_manager(&self) -> &dyn SourceManager { &*self.source_manager } diff --git a/src/test_runner/src/test_runner.rs b/src/test_runner/src/test_runner.rs index c7e8590dbe36..c54d0ae3c188 100644 --- a/src/test_runner/src/test_runner.rs +++ b/src/test_runner/src/test_runner.rs @@ -64,11 +64,11 @@ pub fn run_test_inner(cases: &[&TestDescAndFn], hook: impl TestHook + 'static + let f = match case.testfn { TestFn::StaticTestFn(f) => TestFn::DynTestFn(Box::new(move || { let _watcher = TestWatcher::new(name, h); - f(); + f() })), TestFn::StaticBenchFn(f) => TestFn::DynTestFn(Box::new(move || { let _watcher = TestWatcher::new(name, h); - bench::run_once(f); + bench::run_once(f) })), ref f => panic!("unexpected testfn {:?}", f), }; diff --git a/src/tests/sqlsmith/src/lib.rs b/src/tests/sqlsmith/src/lib.rs index b531859f4777..aa679d4332de 100644 --- a/src/tests/sqlsmith/src/lib.rs +++ b/src/tests/sqlsmith/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(once_cell)] +#![feature(let_chains)] use std::vec; diff --git a/src/tests/sqlsmith/tests/test_runner.rs b/src/tests/sqlsmith/tests/test_runner.rs index 2a3a3bf72e2b..5daf51592e3a 100644 --- a/src/tests/sqlsmith/tests/test_runner.rs +++ b/src/tests/sqlsmith/tests/test_runner.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(let_chains)] + #[cfg(feature = "enable_sqlsmith_unit_test")] mod frontend; diff --git a/src/utils/memcomparable/src/ser.rs b/src/utils/memcomparable/src/ser.rs index 697b04c104d3..3dd681494b7b 100644 --- a/src/utils/memcomparable/src/ser.rs +++ b/src/utils/memcomparable/src/ser.rs @@ -190,7 +190,7 @@ impl<'a, B: BufMut> ser::Serializer for &'a mut Serializer { } fn serialize_bytes(self, v: &[u8]) -> Result<()> { - self.output.put_u8(if v.is_empty() { 0 } else { 1 }); + self.output.put_u8(!v.is_empty() as u8); let mut len = 0; for chunk in v.chunks(8) { self.output.put_slice(chunk); From f191ea120241e0b9cd4111b595c5ef5c4c9ff6a6 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 17 Oct 2022 00:10:15 +0200 Subject: [PATCH 08/12] fmt --- src/stream/src/executor/aggregation/agg_impl/foldable.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/stream/src/executor/aggregation/agg_impl/foldable.rs b/src/stream/src/executor/aggregation/agg_impl/foldable.rs index c4cc26fae59e..ea0e4b9fc5f9 100644 --- a/src/stream/src/executor/aggregation/agg_impl/foldable.rs +++ b/src/stream/src/executor/aggregation/agg_impl/foldable.rs @@ -419,8 +419,7 @@ mod tests { #[test] /// This test uses `Box` to test an aggregator. fn test_primitive_sum_boxed() { - let mut agg: Box = - Box::>::default(); + let mut agg: Box = Box::>::default(); agg.apply_batch( &[Op::Insert, Op::Insert, Op::Insert, Op::Delete], None, From 69d18a99103f1343126acd20836dc38b9e91efdf Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 17 Oct 2022 00:34:41 +0200 Subject: [PATCH 09/12] fix --- src/connector/src/source/nexmark/source/generator.rs | 8 +++----- src/meta/src/hummock/compaction_group/manager.rs | 2 +- src/storage/compaction_test/src/lib.rs | 1 - 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/connector/src/source/nexmark/source/generator.rs b/src/connector/src/source/nexmark/source/generator.rs index 6bc913122258..6d166f736082 100644 --- a/src/connector/src/source/nexmark/source/generator.rs +++ b/src/connector/src/source/nexmark/source/generator.rs @@ -60,8 +60,7 @@ impl NexmarkEventGenerator { if let Some(event) = last_event.take() { msgs.push( - NexmarkMessage::new(self.split_id.clone(), self.events_so_far as u64, event) - .into(), + NexmarkMessage::new(self.split_id.clone(), self.events_so_far, event).into(), ); } @@ -101,8 +100,7 @@ impl NexmarkEventGenerator { } msgs.push( - NexmarkMessage::new(self.split_id.clone(), self.events_so_far as u64, event) - .into(), + NexmarkMessage::new(self.split_id.clone(), self.events_so_far, event).into(), ); } @@ -114,7 +112,7 @@ impl NexmarkEventGenerator { if !self.use_real_time && self.min_event_gap_in_ns > 0 { tokio::time::sleep(Duration::from_nanos( - (self.events_so_far - old_events_so_far) as u64 * self.min_event_gap_in_ns, + (self.events_so_far - old_events_so_far) * self.min_event_gap_in_ns, )) .await; } diff --git a/src/meta/src/hummock/compaction_group/manager.rs b/src/meta/src/hummock/compaction_group/manager.rs index 45367d0564b5..2eb4eed002b4 100644 --- a/src/meta/src/hummock/compaction_group/manager.rs +++ b/src/meta/src/hummock/compaction_group/manager.rs @@ -262,7 +262,7 @@ impl CompactionGroupManagerInner { *compaction_group_id = self .id_generator_ref .generate::<{ IdCategory::CompactionGroup }>() - .await? as u64; + .await?; compaction_groups.insert( *compaction_group_id, CompactionGroup::new( diff --git a/src/storage/compaction_test/src/lib.rs b/src/storage/compaction_test/src/lib.rs index a543f34f015b..81b56b458261 100644 --- a/src/storage/compaction_test/src/lib.rs +++ b/src/storage/compaction_test/src/lib.rs @@ -23,7 +23,6 @@ #![warn(clippy::no_effect_underscore_binding)] #![warn(clippy::await_holding_lock)] #![deny(rustdoc::broken_intra_doc_links)] -#![feature(let_else)] mod server; From 2c5bc306cb063376e3497bc27f14f80ed15a6a97 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 17 Oct 2022 11:27:47 +0200 Subject: [PATCH 10/12] minor revert --- src/common/src/types/ordered_float.rs | 4 ++-- src/meta/src/barrier/command.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/common/src/types/ordered_float.rs b/src/common/src/types/ordered_float.rs index 006f6ce4900c..af94a27588c3 100644 --- a/src/common/src/types/ordered_float.rs +++ b/src/common/src/types/ordered_float.rs @@ -55,7 +55,7 @@ use core::str::FromStr; pub use num_traits::Float; use num_traits::{ AsPrimitive, Bounded, CheckedAdd, CheckedDiv, CheckedMul, CheckedNeg, CheckedRem, CheckedSub, - FromPrimitive, Num, One, Signed, ToPrimitive, Zero, + FromPrimitive, Num, NumCast, One, Signed, ToPrimitive, Zero, }; // masks for the parts of the IEEE 754 float @@ -566,7 +566,7 @@ impl One for OrderedFloat { } } -impl num_traits::NumCast for OrderedFloat { +impl NumCast for OrderedFloat { #[inline] fn from(n: F) -> Option { T::from(n).map(OrderedFloat) diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index c91c45ee8ea7..a23126470b92 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -503,7 +503,7 @@ where .worker_node_id; node_dropped_actors .entry(node_id as WorkerId) - .or_insert(Vec::new()) + .or_insert(vec![]) .push(*actor_id as ActorId); } } From db7e33f858b0efb3bec2c2bafd475b810541ebfb Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Sat, 22 Oct 2022 22:31:22 -0400 Subject: [PATCH 11/12] chore(build): workaround for toolchain upgrade Signed-off-by: Bugen Zhao --- src/ctl/src/cmd_impl/hummock/list_kv.rs | 9 +- src/storage/compaction_test/src/server.rs | 5 +- .../hummock_test/src/compactor_tests.rs | 14 +- .../hummock_test/src/failpoint_tests.rs | 5 +- .../hummock_test/src/snapshot_tests.rs | 145 +++++++++++-- .../hummock_test/src/state_store_tests.rs | 21 +- .../src/hummock/iterator/backward_user.rs | 6 +- .../src/hummock/iterator/forward_user.rs | 4 +- .../src/hummock/iterator/merge_inner.rs | 195 +++++++++++------- src/storage/src/hummock/iterator/mod.rs | 6 +- src/storage/src/hummock/state_store.rs | 88 +++----- src/storage/src/hummock/store/state_store.rs | 2 +- src/storage/src/keyspace.rs | 4 +- src/storage/src/memory.rs | 59 +++--- src/storage/src/monitor/monitored_store.rs | 54 ++--- src/storage/src/panic_store.rs | 42 ++-- src/storage/src/store.rs | 94 +++------ .../src/table/streaming_table/state_table.rs | 18 +- 18 files changed, 410 insertions(+), 361 deletions(-) diff --git a/src/ctl/src/cmd_impl/hummock/list_kv.rs b/src/ctl/src/cmd_impl/hummock/list_kv.rs index 049bcb991c40..a5db997d3f3c 100644 --- a/src/ctl/src/cmd_impl/hummock/list_kv.rs +++ b/src/ctl/src/cmd_impl/hummock/list_kv.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Bound; + use bytes::{Buf, BufMut, BytesMut}; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::next_key; @@ -30,9 +32,12 @@ pub async fn list_kv(epoch: u64, table_id: u32) -> anyhow::Result<()> { let mut buf = BytesMut::with_capacity(5); buf.put_u8(b't'); buf.put_u32(table_id); - let range = buf.to_vec()..next_key(buf.to_vec().as_slice()); + let range = ( + Bound::Included(buf.to_vec()), + Bound::Excluded(next_key(buf.to_vec().as_slice())), + ); hummock - .scan::<_, Vec>( + .scan( None, range, None, diff --git a/src/storage/compaction_test/src/server.rs b/src/storage/compaction_test/src/server.rs index 2d7ba8d07dc7..e6ce674a79ef 100644 --- a/src/storage/compaction_test/src/server.rs +++ b/src/storage/compaction_test/src/server.rs @@ -14,6 +14,7 @@ use std::collections::{BTreeMap, HashSet}; use std::net::SocketAddr; +use std::ops::Bound; use std::sync::Arc; use std::time::Duration; @@ -385,9 +386,9 @@ async fn open_hummock_iters( let mut results = BTreeMap::new(); for &epoch in snapshots.iter() { let iter = hummock - .iter::<_, Vec>( + .iter( None, - .., + (Bound::Unbounded, Bound::Unbounded), ReadOptions { epoch, table_id: TableId { table_id }, diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index dd60b2bfcca9..2102038c9e5b 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -16,6 +16,7 @@ mod tests { use std::collections::{BTreeSet, HashMap}; + use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; @@ -675,9 +676,9 @@ mod tests { // 7. scan kv to check key table_id let scan_result = storage - .scan::<_, Vec>( + .scan( None, - .., + (Bound::Unbounded, Bound::Unbounded), None, ReadOptions { epoch, @@ -844,9 +845,9 @@ mod tests { // 6. scan kv to check key table_id let scan_result = storage - .scan::<_, Vec>( + .scan( None, - .., + (Bound::Unbounded, Bound::Unbounded), None, ReadOptions { epoch, @@ -1016,7 +1017,10 @@ mod tests { let scan_result = storage .scan( Some(bloom_filter_key), - start_bound_key..end_bound_key, + ( + Bound::Included(start_bound_key), + Bound::Excluded(end_bound_key), + ), None, ReadOptions { epoch, diff --git a/src/storage/hummock_test/src/failpoint_tests.rs b/src/storage/hummock_test/src/failpoint_tests.rs index c797df3d2c5c..ce3ed2634b60 100644 --- a/src/storage/hummock_test/src/failpoint_tests.rs +++ b/src/storage/hummock_test/src/failpoint_tests.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; @@ -134,7 +135,7 @@ async fn test_failpoints_state_store_read_upload() { let result = hummock_storage .iter( None, - ..=b"ee".to_vec(), + (Bound::Unbounded, Bound::Included(b"ee".to_vec())), ReadOptions { epoch: 2, table_id: Default::default(), @@ -193,7 +194,7 @@ async fn test_failpoints_state_store_read_upload() { let mut iters = hummock_storage .iter( None, - ..=b"ee".to_vec(), + (Bound::Unbounded, Bound::Included(b"ee".to_vec())), ReadOptions { epoch: 5, table_id: Default::default(), diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index 706b9a470394..211a4beff866 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; @@ -31,7 +32,7 @@ use crate::test_utils::get_test_notification_client; macro_rules! assert_count_range_scan { ($storage:expr, $range:expr, $expect_count:expr, $epoch:expr) => {{ let mut it = $storage - .iter::<_, Vec>( + .iter( None, $range, ReadOptions { @@ -56,7 +57,7 @@ macro_rules! assert_count_range_scan { macro_rules! assert_count_backward_range_scan { ($storage:expr, $range:expr, $expect_count:expr, $epoch:expr) => {{ let mut it = $storage - .backward_iter::<_, Vec>( + .backward_iter( $range, ReadOptions { epoch: $epoch, @@ -127,7 +128,12 @@ async fn test_snapshot_inner(enable_sync: bool, enable_commit: bool) { .unwrap(); } } - assert_count_range_scan!(hummock_storage, .., 2, epoch1); + assert_count_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Unbounded), + 2, + epoch1 + ); let epoch2 = epoch1 + 1; hummock_storage @@ -161,8 +167,18 @@ async fn test_snapshot_inner(enable_sync: bool, enable_commit: bool) { .unwrap(); } } - assert_count_range_scan!(hummock_storage, .., 3, epoch2); - assert_count_range_scan!(hummock_storage, .., 2, epoch1); + assert_count_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Unbounded), + 3, + epoch2 + ); + assert_count_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Unbounded), + 2, + epoch1 + ); let epoch3 = epoch2 + 1; hummock_storage @@ -196,9 +212,24 @@ async fn test_snapshot_inner(enable_sync: bool, enable_commit: bool) { .unwrap(); } } - assert_count_range_scan!(hummock_storage, .., 0, epoch3); - assert_count_range_scan!(hummock_storage, .., 3, epoch2); - assert_count_range_scan!(hummock_storage, .., 2, epoch1); + assert_count_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Unbounded), + 0, + epoch3 + ); + assert_count_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Unbounded), + 3, + epoch2 + ); + assert_count_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Unbounded), + 2, + epoch1 + ); } async fn test_snapshot_range_scan_inner(enable_sync: bool, enable_commit: bool) { @@ -259,12 +290,42 @@ async fn test_snapshot_range_scan_inner(enable_sync: bool, enable_commit: bool) }; } - assert_count_range_scan!(hummock_storage, key!(2)..=key!(3), 2, epoch); - assert_count_range_scan!(hummock_storage, key!(2)..key!(3), 1, epoch); - assert_count_range_scan!(hummock_storage, key!(2).., 3, epoch); - assert_count_range_scan!(hummock_storage, ..=key!(3), 3, epoch); - assert_count_range_scan!(hummock_storage, ..key!(3), 2, epoch); - assert_count_range_scan!(hummock_storage, .., 4, epoch); + assert_count_range_scan!( + hummock_storage, + (Bound::Included(key!(2)), Bound::Included(key!(3))), + 2, + epoch + ); + assert_count_range_scan!( + hummock_storage, + (Bound::Included(key!(2)), Bound::Excluded(key!(3))), + 1, + epoch + ); + assert_count_range_scan!( + hummock_storage, + (Bound::Included(key!(2)), Bound::Unbounded), + 3, + epoch + ); + assert_count_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Included(key!(3))), + 3, + epoch + ); + assert_count_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Excluded(key!(3))), + 2, + epoch + ); + assert_count_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Unbounded), + 4, + epoch + ); } #[ignore] @@ -360,14 +421,54 @@ async fn test_snapshot_backward_range_scan_inner(enable_sync: bool, enable_commi }; } - assert_count_backward_range_scan!(hummock_storage, key!(3)..=key!(2), 2, epoch); - assert_count_backward_range_scan!(hummock_storage, key!(3)..key!(2), 1, epoch); - assert_count_backward_range_scan!(hummock_storage, key!(3)..key!(1), 2, epoch); - assert_count_backward_range_scan!(hummock_storage, key!(3)..=key!(1), 3, epoch); - assert_count_backward_range_scan!(hummock_storage, key!(3)..key!(0), 3, epoch); - assert_count_backward_range_scan!(hummock_storage, .., 6, epoch); - assert_count_backward_range_scan!(hummock_storage, .., 8, epoch + 1); - assert_count_backward_range_scan!(hummock_storage, key!(7)..key!(2), 5, epoch + 1); + assert_count_backward_range_scan!( + hummock_storage, + (Bound::Included(key!(3)), Bound::Included(key!(2))), + 2, + epoch + ); + assert_count_backward_range_scan!( + hummock_storage, + (Bound::Included(key!(3)), Bound::Excluded(key!(2))), + 1, + epoch + ); + assert_count_backward_range_scan!( + hummock_storage, + (Bound::Included(key!(3)), Bound::Excluded(key!(1))), + 2, + epoch + ); + assert_count_backward_range_scan!( + hummock_storage, + (Bound::Included(key!(3)), Bound::Included(key!(1))), + 3, + epoch + ); + assert_count_backward_range_scan!( + hummock_storage, + (Bound::Included(key!(3)), Bound::Excluded(key!(0))), + 3, + epoch + ); + assert_count_backward_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Unbounded), + 6, + epoch + ); + assert_count_backward_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Unbounded), + 8, + epoch + 1 + ); + assert_count_backward_range_scan!( + hummock_storage, + (Bound::Included(key!(7)), Bound::Excluded(key!(2))), + 5, + epoch + 1 + ); } #[tokio::test] diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 13eea5cb8d6e..c2db671e39c5 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; @@ -213,7 +214,7 @@ async fn test_basic() { let mut iter = hummock_storage .iter( None, - ..=b"ee".to_vec(), + (Bound::Unbounded, Bound::Included(b"ee".to_vec())), ReadOptions { epoch: epoch1, table_id: Default::default(), @@ -260,7 +261,7 @@ async fn test_basic() { let mut iter = hummock_storage .iter( None, - ..=b"ee".to_vec(), + (Bound::Unbounded, Bound::Included(b"ee".to_vec())), ReadOptions { epoch: epoch2, table_id: Default::default(), @@ -276,7 +277,7 @@ async fn test_basic() { let mut iter = hummock_storage .iter( None, - ..=b"ee".to_vec(), + (Bound::Unbounded, Bound::Included(b"ee".to_vec())), ReadOptions { epoch: epoch3, table_id: Default::default(), @@ -576,7 +577,7 @@ async fn test_reload_storage() { let mut iter = hummock_storage .iter( None, - ..=b"ee".to_vec(), + (Bound::Unbounded, Bound::Included(b"ee".to_vec())), ReadOptions { epoch: epoch1, table_id: Default::default(), @@ -623,7 +624,7 @@ async fn test_reload_storage() { let mut iter = hummock_storage .iter( None, - ..=b"ee".to_vec(), + (Bound::Unbounded, Bound::Included(b"ee".to_vec())), ReadOptions { epoch: epoch2, table_id: Default::default(), @@ -715,7 +716,10 @@ async fn test_write_anytime() { let mut iter = hummock_storage .iter( None, - "aa".as_bytes()..="cc".as_bytes(), + ( + Bound::Included(b"aa".to_vec()), + Bound::Included(b"cc".to_vec()), + ), ReadOptions { epoch, table_id: Default::default(), @@ -810,7 +814,10 @@ async fn test_write_anytime() { let mut iter = hummock_storage .iter( None, - "aa".as_bytes()..="cc".as_bytes(), + ( + Bound::Included(b"aa".to_vec()), + Bound::Included(b"cc".to_vec()), + ), ReadOptions { epoch, table_id: Default::default(), diff --git a/src/storage/src/hummock/iterator/backward_user.rs b/src/storage/src/hummock/iterator/backward_user.rs index f558e0331a5f..a768e7a973b7 100644 --- a/src/storage/src/hummock/iterator/backward_user.rs +++ b/src/storage/src/hummock/iterator/backward_user.rs @@ -298,15 +298,13 @@ impl DirectedUserIteratorBuilder for BackwardUserIterator, - >, + iterator_iter: Vec>, key_range: (Bound>, Bound>), read_epoch: u64, min_epoch: u64, version: Option, ) -> DirectedUserIterator { - let iterator = UnorderedMergeIteratorInner::new(iterator_iter); + let iterator = UnorderedMergeIteratorInner::new(iterator_iter.into_iter()); DirectedUserIterator::Backward(BackwardUserIterator::with_epoch( iterator, key_range, read_epoch, min_epoch, version, )) diff --git a/src/storage/src/hummock/iterator/forward_user.rs b/src/storage/src/hummock/iterator/forward_user.rs index fbc112fb82f9..1325ef1a96a2 100644 --- a/src/storage/src/hummock/iterator/forward_user.rs +++ b/src/storage/src/hummock/iterator/forward_user.rs @@ -235,13 +235,13 @@ impl DirectedUserIteratorBuilder for UserIterator { type SstableIteratorType = SstableIterator; fn create( - iterator_iter: impl IntoIterator>, + iterator_iter: Vec>, key_range: (Bound>, Bound>), read_epoch: u64, min_epoch: u64, version: Option, ) -> DirectedUserIterator { - let iterator = UnorderedMergeIteratorInner::new(iterator_iter); + let iterator = UnorderedMergeIteratorInner::new(iterator_iter.into_iter()); DirectedUserIterator::Forward(Self::new( iterator, key_range, read_epoch, min_epoch, version, )) diff --git a/src/storage/src/hummock/iterator/merge_inner.rs b/src/storage/src/hummock/iterator/merge_inner.rs index fb9862af210f..654398ddcbdb 100644 --- a/src/storage/src/hummock/iterator/merge_inner.rs +++ b/src/storage/src/hummock/iterator/merge_inner.rs @@ -17,6 +17,8 @@ use std::collections::binary_heap::PeekMut; use std::collections::{BinaryHeap, LinkedList}; use std::future::Future; +use futures::future::BoxFuture; +use futures::FutureExt; use risingwave_hummock_sdk::VersionedComparator; use crate::hummock::iterator::{DirectionEnum, HummockIterator, HummockIteratorDirection}; @@ -24,7 +26,7 @@ use crate::hummock::value::HummockValue; use crate::hummock::HummockResult; use crate::monitor::StoreLocalStatistic; -pub trait NodeExtraOrderInfo: Eq + Ord + Send + Sync {} +pub trait NodeExtraOrderInfo: Eq + Ord + Send + Sync + 'static {} /// For unordered merge iterator, no extra order info is needed. type UnorderedNodeExtra = (); @@ -146,16 +148,16 @@ impl MergeIteratorInner { pub type UnorderedMergeIteratorInner = MergeIteratorInner; -impl UnorderedMergeIteratorInner { - pub fn new(iterators: impl IntoIterator) -> Self { +impl UnorderedMergeIteratorInner { + pub fn new(iterators: impl IntoIterator + 'static) -> Self { Self::create(iterators) } - pub fn for_compactor(iterators: impl IntoIterator) -> Self { + pub fn for_compactor(iterators: impl IntoIterator + 'static) -> Self { Self::create(iterators) } - fn create(iterators: impl IntoIterator) -> Self { + fn create(iterators: impl IntoIterator + 'static) -> Self { Self { unused_iters: iterators .into_iter() @@ -188,6 +190,32 @@ where .drain_filter(|i| i.iter.is_valid()) .collect(); } + + // TODO(chi): workaround for Rust toolchain 2022-10-16 + + async fn rewind_inner(&mut self) -> HummockResult<()> { + self.reset_heap(); + futures::future::try_join_all( + self.unused_iters + .iter_mut() + .map(|x| x.iter.rewind().boxed()), + ) + .await?; + self.build_heap(); + Ok(()) + } + + async fn seek_inner(&mut self, key: &[u8]) -> HummockResult<()> { + self.reset_heap(); + futures::future::try_join_all( + self.unused_iters + .iter_mut() + .map(|x| x.iter.seek(key).boxed()), + ) + .await?; + self.build_heap(); + Ok(()) + } } /// The behaviour of `next` of order aware merge iterator is different from the normal one, so we @@ -199,87 +227,114 @@ trait MergeIteratorNext { fn next_inner(&mut self) -> Self::HummockResultFuture<'_>; } -impl MergeIteratorNext for OrderedMergeIteratorInner { - type HummockResultFuture<'a> = impl Future>; +// TODO(chi): workaround for Rust toolchain 2022-10-16, removed this later +fn unsafe_boxed_static_future(f: F) -> BoxFuture<'static, O> +where + F: Future + Send, +{ + let b = f.boxed(); + unsafe { std::mem::transmute::, BoxFuture<'static, O>>(b) } +} - fn next_inner(&mut self) -> Self::HummockResultFuture<'_> { - async { - let top_node = self.heap.pop().expect("no inner iter"); - let mut popped_nodes = vec![]; - - // Take all nodes with the same current key as the top_node out of the heap. - while let Some(next_node) = self.heap.peek_mut() { - match VersionedComparator::compare_key(top_node.iter.key(), next_node.iter.key()) { - Ordering::Equal => { - popped_nodes.push(PeekMut::pop(next_node)); - } - _ => break, +impl OrderedMergeIteratorInner { + async fn next_inner_inner(&mut self) -> HummockResult<()> { + let top_node = self.heap.pop().expect("no inner iter"); + let mut popped_nodes = vec![]; + + // Take all nodes with the same current key as the top_node out of the heap. + while let Some(next_node) = self.heap.peek_mut() { + match VersionedComparator::compare_key(top_node.iter.key(), next_node.iter.key()) { + Ordering::Equal => { + popped_nodes.push(PeekMut::pop(next_node)); } + _ => break, } + } - popped_nodes.push(top_node); - - // WARNING: within scope of BinaryHeap::PeekMut, we must carefully handle all places of - // return. Once the iterator enters an invalid state, we should remove it from heap - // before returning. - - // Put the popped nodes back to the heap if valid or unused_iters if invalid. - for mut node in popped_nodes { - match node.iter.next().await { - Ok(_) => {} - Err(e) => { - // If the iterator returns error, we should clear the heap, so that this - // iterator becomes invalid. - self.heap.clear(); - return Err(e); - } - } + popped_nodes.push(top_node); - if !node.iter.is_valid() { - self.unused_iters.push_back(node); - } else { - self.heap.push(node); - } - } + // WARNING: within scope of BinaryHeap::PeekMut, we must carefully handle all places of + // return. Once the iterator enters an invalid state, we should remove it from heap + // before returning. - Ok(()) - } - } -} - -impl MergeIteratorNext for UnorderedMergeIteratorInner { - type HummockResultFuture<'a> = impl Future>; + // Put the popped nodes back to the heap if valid or unused_iters if invalid. - fn next_inner(&mut self) -> Self::HummockResultFuture<'_> { - async { - let mut node = self.heap.peek_mut().expect("no inner iter"); + // TODO(chi): workaround for Rust toolchain 2022-10-16, removed boxed() later - // WARNING: within scope of BinaryHeap::PeekMut, we must carefully handle all places of - // return. Once the iterator enters an invalid state, we should remove it from heap - // before returning. + for mut node in popped_nodes { + let f = unsafe_boxed_static_future(node.iter.next()); - match node.iter.next().await { + match f.await { Ok(_) => {} Err(e) => { // If the iterator returns error, we should clear the heap, so that this // iterator becomes invalid. - PeekMut::pop(node); self.heap.clear(); return Err(e); } } if !node.iter.is_valid() { - // Put back to `unused_iters` - let node = PeekMut::pop(node); self.unused_iters.push_back(node); } else { - // This will update the heap top. - drop(node); + self.heap.push(node); } + } - Ok(()) + Ok(()) + } +} + +impl MergeIteratorNext for OrderedMergeIteratorInner { + type HummockResultFuture<'a> = impl Future> + 'a; + + fn next_inner(&mut self) -> Self::HummockResultFuture<'_> { + self.next_inner_inner() + } +} + +impl UnorderedMergeIteratorInner { + async fn next_inner_inner(&mut self) -> HummockResult<()> { + let mut node = self.heap.peek_mut().expect("no inner iter"); + + // WARNING: within scope of BinaryHeap::PeekMut, we must carefully handle all places of + // return. Once the iterator enters an invalid state, we should remove it from heap + // before returning. + + // TODO(chi): workaround for Rust toolchain 2022-10-16, removed boxed() later + + let f = unsafe_boxed_static_future(node.iter.next()); + + match f.await { + Ok(_) => {} + Err(e) => { + // If the iterator returns error, we should clear the heap, so that this + // iterator becomes invalid. + PeekMut::pop(node); + self.heap.clear(); + return Err(e); + } + } + + if !node.iter.is_valid() { + // Put back to `unused_iters` + let node = PeekMut::pop(node); + self.unused_iters.push_back(node); + } else { + // This will update the heap top. + drop(node); } + + Ok(()) + } +} + +impl MergeIteratorNext for UnorderedMergeIteratorInner { + type HummockResultFuture<'a> = impl Future> + 'a; + + fn next_inner(&mut self) -> Self::HummockResultFuture<'_> { + self.next_inner_inner() + // async move { unimplemented!() } } } @@ -311,23 +366,11 @@ where } fn rewind(&mut self) -> Self::RewindFuture<'_> { - async move { - self.reset_heap(); - futures::future::try_join_all(self.unused_iters.iter_mut().map(|x| x.iter.rewind())) - .await?; - self.build_heap(); - Ok(()) - } + self.rewind_inner() } fn seek<'a>(&'a mut self, key: &'a [u8]) -> Self::SeekFuture<'a> { - async move { - self.reset_heap(); - futures::future::try_join_all(self.unused_iters.iter_mut().map(|x| x.iter.seek(key))) - .await?; - self.build_heap(); - Ok(()) - } + self.seek_inner(key) } fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { diff --git a/src/storage/src/hummock/iterator/mod.rs b/src/storage/src/hummock/iterator/mod.rs index 64f0522c92fb..dce7acd13b08 100644 --- a/src/storage/src/hummock/iterator/mod.rs +++ b/src/storage/src/hummock/iterator/mod.rs @@ -362,15 +362,13 @@ pub enum DirectedUserIterator { Backward(BackwardUserIterator), } -pub trait DirectedUserIteratorBuilder { +pub trait DirectedUserIteratorBuilder: 'static { type Direction: HummockIteratorDirection; type SstableIteratorType: SstableIteratorType; /// Initialize an `DirectedUserIterator`. /// The `key_range` should be from smaller key to larger key. fn create( - iterator_iter: impl IntoIterator< - Item = UserIteratorPayloadType, - >, + iterator_iter: Vec>, key_range: (Bound>, Bound>), read_epoch: u64, min_epoch: u64, diff --git a/src/storage/src/hummock/state_store.rs b/src/storage/src/hummock/state_store.rs index 38f4d6fba54d..df847e5cf723 100644 --- a/src/storage/src/hummock/state_store.rs +++ b/src/storage/src/hummock/state_store.rs @@ -15,7 +15,7 @@ use std::cmp::Ordering; use std::future::Future; use std::ops::Bound::{Excluded, Included}; -use std::ops::RangeBounds; +use std::ops::{Bound, RangeBounds}; use std::sync::Arc; use bytes::Bytes; @@ -54,7 +54,7 @@ use crate::storage_value::StorageValue; use crate::store::*; use crate::{define_state_store_associated_type, StateStore, StateStoreIter}; -pub(crate) trait HummockIteratorType { +pub(crate) trait HummockIteratorType: 'static { type Direction: HummockIteratorDirection; type SstableIteratorType: SstableIteratorType; type UserIteratorBuilder: DirectedUserIteratorBuilder< @@ -90,9 +90,9 @@ impl HummockStorage { /// If `Ok(Some())` is returned, the key is found. If `Ok(None)` is returned, /// the key is not found. If `Err()` is returned, the searching for the key /// failed due to other non-EOF errors. - pub async fn get<'a>( - &'a self, - key: &'a [u8], + pub async fn get( + &self, + key: &[u8], check_bloom_filter: bool, read_options: ReadOptions, ) -> StorageResult> { @@ -258,15 +258,13 @@ impl HummockStorage { } #[allow(dead_code)] - async fn old_iter_inner( + async fn old_iter_inner( &self, prefix_hint: Option>, - key_range: R, + key_range: (Bound>, Bound>), read_options: ReadOptions, ) -> StorageResult where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, T: HummockIteratorType, { let epoch = read_options.epoch; @@ -415,11 +413,6 @@ impl HummockStorage { .with_label_values(&["sub-iter"]) .observe(overlapped_iters.len() as f64); - let key_range = ( - key_range.start_bound().map(|b| b.as_ref().to_owned()), - key_range.end_bound().map(|b| b.as_ref().to_owned()), - ); - // The input of the user iterator is a `HummockIteratorUnion` of 4 different types. We use // the union because the underlying merge iterator let mut user_iterator = T::UserIteratorBuilder::create( @@ -434,6 +427,7 @@ impl HummockStorage { .rewind() .in_span(Span::enter_with_local_parent("rewind")) .await?; + local_stats.report(self.stats.as_ref()); Ok(HummockStateStoreIter::new( user_iterator, @@ -453,20 +447,16 @@ impl StateStore for HummockStorage { check_bloom_filter: bool, read_options: ReadOptions, ) -> Self::GetFuture<'_> { - async move { self.get(key, check_bloom_filter, read_options).await } + self.get(key, check_bloom_filter, read_options) } - fn scan( + fn scan( &self, prefix_hint: Option>, - key_range: R, + key_range: (Bound>, Bound>), limit: Option, read_options: ReadOptions, - ) -> Self::ScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::ScanFuture<'_> { async move { self.iter(prefix_hint, key_range, read_options) .await? @@ -475,16 +465,12 @@ impl StateStore for HummockStorage { } } - fn backward_scan( + fn backward_scan( &self, - _key_range: R, + _key_range: (Bound>, Bound>), _limit: Option, _read_options: ReadOptions, - ) -> Self::BackwardScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::BackwardScanFuture<'_> { async move { unimplemented!() } } @@ -507,16 +493,12 @@ impl StateStore for HummockStorage { /// Returns an iterator that scan from the begin key to the end key /// The result is based on a snapshot corresponding to the given `epoch`. - fn iter( + fn iter( &self, prefix_hint: Option>, - key_range: R, + key_range: (Bound>, Bound>), read_options: ReadOptions, - ) -> Self::IterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::IterFuture<'_> { if let Some(prefix_hint) = prefix_hint.as_ref() { let next_key = next_key(prefix_hint); @@ -535,8 +517,8 @@ impl StateStore for HummockStorage { // // 3. Include(pk) => prefix_hint <= start_bound < next_key(prefix_hint) Included(range_start) | Excluded(range_start) => { - assert!(range_start.as_ref() >= prefix_hint.as_slice()); - assert!(range_start.as_ref() < next_key.as_slice() || next_key.is_empty()); + assert!(range_start.as_slice() >= prefix_hint.as_slice()); + assert!(range_start.as_slice() < next_key.as_slice() || next_key.is_empty()); } _ => unreachable!(), @@ -544,8 +526,8 @@ impl StateStore for HummockStorage { match key_range.end_bound() { Included(range_end) => { - assert!(range_end.as_ref() >= prefix_hint.as_slice()); - assert!(range_end.as_ref() < next_key.as_slice() || next_key.is_empty()); + assert!(range_end.as_slice() >= prefix_hint.as_slice()); + assert!(range_end.as_slice() < next_key.as_slice() || next_key.is_empty()); } // 1. Excluded(end_bound_of_prefix(pk + col)) => prefix_hint < end_bound <= @@ -554,8 +536,8 @@ impl StateStore for HummockStorage { // 2. Excluded(pk + bound) => prefix_hint < end_bound <= // next_key(prefix_hint) Excluded(range_end) => { - assert!(range_end.as_ref() > prefix_hint.as_slice()); - assert!(range_end.as_ref() <= next_key.as_slice() || next_key.is_empty()); + assert!(range_end.as_slice() > prefix_hint.as_slice()); + assert!(range_end.as_slice() <= next_key.as_slice() || next_key.is_empty()); } std::ops::Bound::Unbounded => { @@ -573,27 +555,17 @@ impl StateStore for HummockStorage { retention_seconds: read_options.retention_seconds, }; - return self.storage_core.iter( - ( - key_range.start_bound().map(|b| b.as_ref().to_owned()), - key_range.end_bound().map(|b| b.as_ref().to_owned()), - ), - read_options.epoch, - read_options_v2, - ); + self.storage_core + .iter(key_range, read_options.epoch, read_options_v2) } /// Returns a backward iterator that scans from the end key to the begin key /// The result is based on a snapshot corresponding to the given `epoch`. - fn backward_iter( + fn backward_iter( &self, - _key_range: R, + _key_range: (Bound>, Bound>), _read_options: ReadOptions, - ) -> Self::BackwardIterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::BackwardIterFuture<'_> { async move { unimplemented!(); } @@ -679,7 +651,7 @@ impl StateStoreIter for HummockStateStoreIter { type Item = (Bytes, Bytes); type NextFuture<'a> = - impl Future>> + Send; + impl Future>> + Send + 'a; fn next(&mut self) -> Self::NextFuture<'_> { async move { diff --git a/src/storage/src/hummock/store/state_store.rs b/src/storage/src/hummock/store/state_store.rs index 2713e9877f43..cdd3446a62d0 100644 --- a/src/storage/src/hummock/store/state_store.rs +++ b/src/storage/src/hummock/store/state_store.rs @@ -612,7 +612,7 @@ pub struct HummockStorageIterator { impl StateStoreIter for HummockStorageIterator { type Item = (Bytes, Bytes); - type NextFuture<'a> = impl Future>> + Send; + type NextFuture<'a> = impl Future>> + Send + 'a; fn next(&mut self) -> Self::NextFuture<'_> { async { diff --git a/src/storage/src/keyspace.rs b/src/storage/src/keyspace.rs index e218eab4738a..63e03eaab704 100644 --- a/src/storage/src/keyspace.rs +++ b/src/storage/src/keyspace.rs @@ -169,7 +169,7 @@ impl Keyspace { } } -pub struct StripPrefixIterator> { +pub struct StripPrefixIterator + 'static> { iter: I, prefix_len: usize, } @@ -178,7 +178,7 @@ impl> StateStoreIter for StripPrefixIte type Item = (Bytes, Bytes); type NextFuture<'a> = - impl Future>> + Send; + impl Future>> + Send + 'a; fn next(&mut self) -> Self::NextFuture<'_> { async move { diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 23bd6f9820ca..6ffe6e074295 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -212,7 +212,7 @@ impl StateStore for MemoryStateStore { read_options: ReadOptions, ) -> Self::GetFuture<'_> { async move { - let range_bounds = key.to_vec()..=key.to_vec(); + let range_bounds = (Bound::Included(key.to_vec()), Bound::Included(key.to_vec())); // We do not really care about vnodes here, so we just use the default value. let res = self.scan(None, range_bounds, Some(1), read_options).await?; @@ -224,17 +224,13 @@ impl StateStore for MemoryStateStore { } } - fn scan( + fn scan( &self, _prefix_hint: Option>, - key_range: R, + key_range: (Bound>, Bound>), limit: Option, read_options: ReadOptions, - ) -> Self::ScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::ScanFuture<'_> { async move { let epoch = read_options.epoch; let mut data = vec![]; @@ -262,16 +258,12 @@ impl StateStore for MemoryStateStore { } } - fn backward_scan( + fn backward_scan( &self, - _key_range: R, + _key_range: (Bound>, Bound>), _limit: Option, _read_options: ReadOptions, - ) -> Self::BackwardScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::BackwardScanFuture<'_> { async move { unimplemented!() } } @@ -292,16 +284,12 @@ impl StateStore for MemoryStateStore { } } - fn iter( + fn iter( &self, _prefix_hint: Option>, - key_range: R, + key_range: (Bound>, Bound>), read_options: ReadOptions, - ) -> Self::IterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::IterFuture<'_> { async move { Ok(MemoryStateStoreIter::new( batched_iter::Iter::new(self.inner.clone(), to_bytes_range(key_range)), @@ -310,15 +298,11 @@ impl StateStore for MemoryStateStore { } } - fn backward_iter( + fn backward_iter( &self, - _key_range: R, + _key_range: (Bound>, Bound>), _read_options: ReadOptions, - ) -> Self::BackwardIterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::BackwardIterFuture<'_> { async move { unimplemented!() } } @@ -366,7 +350,7 @@ impl MemoryStateStoreIter { impl StateStoreIter for MemoryStateStoreIter { type Item = (Bytes, Bytes); - type NextFuture<'a> = impl Future>> + Send; + type NextFuture<'a> = impl Future>> + Send + 'a; fn next(&mut self) -> Self::NextFuture<'_> { async move { @@ -423,7 +407,10 @@ mod tests { state_store .scan( None, - "a"..="b", + ( + Bound::Included(b"a".to_vec()), + Bound::Included(b"b".to_vec()), + ), None, ReadOptions { epoch: 0, @@ -442,7 +429,10 @@ mod tests { state_store .scan( None, - "a"..="b", + ( + Bound::Included(b"a".to_vec()), + Bound::Included(b"b".to_vec()), + ), Some(1), ReadOptions { epoch: 0, @@ -458,7 +448,10 @@ mod tests { state_store .scan( None, - "a"..="b", + ( + Bound::Included(b"a".to_vec()), + Bound::Included(b"b".to_vec()), + ), None, ReadOptions { epoch: 1, diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 55e995cfdf07..ab571bb91cfa 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::RangeBounds; +use std::ops::Bound; use std::sync::Arc; use async_stack_trace::StackTrace; @@ -120,17 +120,13 @@ where } } - fn scan( + fn scan( &self, prefix_hint: Option>, - key_range: R, + key_range: (Bound>, Bound>), limit: Option, read_options: ReadOptions, - ) -> Self::ScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::ScanFuture<'_> { async move { let timer = self.stats.range_scan_duration.start_timer(); let result = self @@ -149,16 +145,12 @@ where } } - fn backward_scan( + fn backward_scan( &self, - key_range: R, + key_range: (Bound>, Bound>), limit: Option, read_options: ReadOptions, - ) -> Self::BackwardScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::BackwardScanFuture<'_> { async move { let timer = self.stats.range_backward_scan_duration.start_timer(); let result = self @@ -204,35 +196,21 @@ where } } - fn iter( + fn iter( &self, prefix_hint: Option>, - key_range: R, + key_range: (Bound>, Bound>), read_options: ReadOptions, - ) -> Self::IterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { - async move { - self.monitored_iter(self.inner.iter(prefix_hint, key_range, read_options)) - .await - } + ) -> Self::IterFuture<'_> { + self.monitored_iter(self.inner.iter(prefix_hint, key_range, read_options)) } - fn backward_iter( + fn backward_iter( &self, - key_range: R, + key_range: (Bound>, Bound>), read_options: ReadOptions, - ) -> Self::BackwardIterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { - async move { - self.monitored_iter(self.inner.backward_iter(key_range, read_options)) - .await - } + ) -> Self::BackwardIterFuture<'_> { + self.monitored_iter(self.inner.backward_iter(key_range, read_options)) } fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Self::WaitEpochFuture<'_> { @@ -312,7 +290,7 @@ where type Item = (Bytes, Bytes); type NextFuture<'a> = - impl Future>> + Send; + impl Future>> + Send + 'a; fn next(&mut self) -> Self::NextFuture<'_> { async move { diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index fd518e8b590a..889c9d56e968 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::future::Future; -use std::ops::RangeBounds; +use std::ops::Bound; use bytes::Bytes; use risingwave_hummock_sdk::HummockReadEpoch; @@ -43,32 +43,24 @@ impl StateStore for PanicStateStore { } } - fn scan( + fn scan( &self, _prefix_hint: Option>, - _key_range: R, + _key_range: (Bound>, Bound>), _limit: Option, _read_options: ReadOptions, - ) -> Self::ScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::ScanFuture<'_> { async move { panic!("should not scan from the state store!"); } } - fn backward_scan( + fn backward_scan( &self, - _key_range: R, + _key_range: (Bound>, Bound>), _limit: Option, _read_options: ReadOptions, - ) -> Self::BackwardScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::BackwardScanFuture<'_> { async move { panic!("should not backward scan from the state store!"); } @@ -84,30 +76,22 @@ impl StateStore for PanicStateStore { } } - fn iter( + fn iter( &self, _prefix_hint: Option>, - _key_range: R, + _key_range: (Bound>, Bound>), _read_options: ReadOptions, - ) -> Self::IterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::IterFuture<'_> { async move { panic!("should not create iter from the state store!"); } } - fn backward_iter( + fn backward_iter( &self, - _key_range: R, + _key_range: (Bound>, Bound>), _read_options: ReadOptions, - ) -> Self::BackwardIterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::BackwardIterFuture<'_> { async move { panic!("should not create backward iter from the panic state store!"); } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index f377b2a8c833..8ad2c5f76617 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. use std::future::Future; -use std::ops::RangeBounds; +use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; @@ -32,13 +32,13 @@ pub struct SyncResult { pub uncommitted_ssts: Vec, } -pub trait GetFutureTrait<'a> = Future>> + Send; -pub trait ScanFutureTrait<'a, R, B> = Future>> + Send; -pub trait IterFutureTrait<'a, I: StateStoreIter, R, B> = - Future> + Send; -pub trait EmptyFutureTrait<'a> = Future> + Send; -pub trait SyncFutureTrait<'a> = Future> + Send; -pub trait IngestBatchFutureTrait<'a> = Future> + Send; +pub trait GetFutureTrait<'a> = Future>> + Send + 'a; +pub trait ScanFutureTrait<'a> = Future>> + Send + 'a; +pub trait IterFutureTrait<'a, I: StateStoreIter> = + Future> + Send + 'a; +pub trait EmptyFutureTrait<'a> = Future> + Send + 'a; +pub trait SyncFutureTrait<'a> = Future> + Send + 'a; +pub trait IngestBatchFutureTrait<'a> = Future> + Send + 'a; #[macro_export] macro_rules! define_state_store_associated_type { @@ -48,44 +48,26 @@ macro_rules! define_state_store_associated_type { type WaitEpochFuture<'a> = impl EmptyFutureTrait<'a>; type SyncFuture<'a> = impl SyncFutureTrait<'a>; - type BackwardIterFuture<'a, R, B> = impl IterFutureTrait<'a, Self::Iter, R, B> - where - R: 'static + Send + RangeBounds, - B: 'static + Send + AsRef<[u8]>; + type BackwardIterFuture<'a> = impl IterFutureTrait<'a, Self::Iter>; - type IterFuture<'a, R, B> = impl IterFutureTrait<'a, Self::Iter, R, B> - where - R: 'static + Send + RangeBounds, - B: 'static + Send + AsRef<[u8]>; + type IterFuture<'a> = impl IterFutureTrait<'a, Self::Iter>; - type BackwardScanFuture<'a, R, B> = impl ScanFutureTrait<'a, R, B> - where - R: 'static + Send + RangeBounds, - B: 'static + Send + AsRef<[u8]>; + type BackwardScanFuture<'a> = impl ScanFutureTrait<'a>; - type ScanFuture<'a, R, B> = impl ScanFutureTrait<'a, R, B> - where - R: 'static + Send + RangeBounds, - B: 'static + Send + AsRef<[u8]>; + type ScanFuture<'a> = impl ScanFutureTrait<'a>; type ClearSharedBufferFuture<'a> = impl EmptyFutureTrait<'a>; }; } pub trait StateStore: Send + Sync + 'static + Clone { - type Iter: StateStoreIter; + type Iter: StateStoreIter + 'static; type GetFuture<'a>: GetFutureTrait<'a>; - type ScanFuture<'a, R, B>: ScanFutureTrait<'a, R, B> - where - R: 'static + Send + RangeBounds, - B: 'static + Send + AsRef<[u8]>; + type ScanFuture<'a>: ScanFutureTrait<'a>; - type BackwardScanFuture<'a, R, B>: ScanFutureTrait<'a, R, B> - where - R: 'static + Send + RangeBounds, - B: 'static + Send + AsRef<[u8]>; + type BackwardScanFuture<'a>: ScanFutureTrait<'a>; type IngestBatchFuture<'a>: IngestBatchFutureTrait<'a>; @@ -93,15 +75,9 @@ pub trait StateStore: Send + Sync + 'static + Clone { type SyncFuture<'a>: SyncFutureTrait<'a>; - type IterFuture<'a, R, B>: IterFutureTrait<'a, Self::Iter, R, B> - where - R: 'static + Send + RangeBounds, - B: 'static + Send + AsRef<[u8]>; + type IterFuture<'a>: IterFutureTrait<'a, Self::Iter>; - type BackwardIterFuture<'a, R, B>: IterFutureTrait<'a, Self::Iter, R, B> - where - R: 'static + Send + RangeBounds, - B: 'static + Send + AsRef<[u8]>; + type BackwardIterFuture<'a>: IterFutureTrait<'a, Self::Iter>; type ClearSharedBufferFuture<'a>: EmptyFutureTrait<'a>; @@ -121,26 +97,20 @@ pub trait StateStore: Send + Sync + 'static + Clone { /// /// /// By default, this simply calls `StateStore::iter` to fetch elements. - fn scan( + fn scan( &self, prefix_hint: Option>, - key_range: R, + key_range: (Bound>, Bound>), limit: Option, read_options: ReadOptions, - ) -> Self::ScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send; + ) -> Self::ScanFuture<'_>; - fn backward_scan( + fn backward_scan( &self, - key_range: R, + key_range: (Bound>, Bound>), limit: Option, read_options: ReadOptions, - ) -> Self::BackwardScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send; + ) -> Self::BackwardScanFuture<'_>; /// Ingests a batch of data into the state store. One write batch should never contain operation /// on the same key. e.g. Put(233, x) then Delete(233). @@ -162,27 +132,21 @@ pub trait StateStore: Send + Sync + 'static + Clone { /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included in /// `key_range`) The returned iterator will iterate data based on a snapshot corresponding to /// the given `epoch`. - fn iter( + fn iter( &self, prefix_hint: Option>, - key_range: R, + key_range: (Bound>, Bound>), read_options: ReadOptions, - ) -> Self::IterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send; + ) -> Self::IterFuture<'_>; /// Opens and returns a backward iterator for given `key_range`. /// The returned iterator will iterate data based on a snapshot corresponding to the given /// `epoch` - fn backward_iter( + fn backward_iter( &self, - key_range: R, + key_range: (Bound>, Bound>), read_options: ReadOptions, - ) -> Self::BackwardIterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send; + ) -> Self::BackwardIterFuture<'_>; /// Creates a `WriteBatch` associated with this state store. fn start_write_batch(&self, write_options: WriteOptions) -> WriteBatch<'_, Self> { diff --git a/src/storage/src/table/streaming_table/state_table.rs b/src/storage/src/table/streaming_table/state_table.rs index cb15f8eb6b67..b674b6892820 100644 --- a/src/storage/src/table/streaming_table/state_table.rs +++ b/src/storage/src/table/streaming_table/state_table.rs @@ -16,8 +16,8 @@ use std::borrow::Cow; use std::cmp::Ordering; use std::collections::BTreeMap; use std::marker::PhantomData; +use std::ops::Bound; use std::ops::Bound::*; -use std::ops::{Bound, RangeBounds}; use std::sync::Arc; use async_stack_trace::StackTrace; @@ -1023,18 +1023,18 @@ struct StorageIterInner { deserializer: RowDeserializer, } -impl StorageIterInner { - async fn new( +impl StorageIterInner +where + S: 'static, + S::Iter: 'static, +{ + async fn new( keyspace: &Keyspace, prefix_hint: Option>, - raw_key_range: R, + raw_key_range: (Bound>, Bound>), read_options: ReadOptions, deserializer: RowDeserializer, - ) -> StorageResult - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> StorageResult { let iter = keyspace .iter_with_range(prefix_hint, raw_key_range, read_options) .await?; From a0b2a119d6b5843bde33f9937b0f1bed3e80e0c8 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Sat, 22 Oct 2022 23:05:04 -0400 Subject: [PATCH 12/12] fix --- Cargo.lock | 4 ++-- src/batch/src/execution/grpc_exchange.rs | 2 +- src/batch/src/execution/local_exchange.rs | 2 +- src/batch/src/executor/test_utils.rs | 2 +- src/batch/src/task/broadcast_channel.rs | 4 ++-- src/batch/src/task/fifo_channel.rs | 4 ++-- src/batch/src/task/hash_shuffle_channel.rs | 4 ++-- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9a1d58d0acc0..64469c1fc776 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3865,9 +3865,9 @@ dependencies = [ [[package]] name = "pg_interval" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47354dbd658c57a5ee1cc97a79937345170234d4c817768de80ea6d2e9f5b98a" +checksum = "fe46640b465e284b048ef065cbed8ef17a622878d310c724578396b4cfd00df2" dependencies = [ "bytes", "chrono", diff --git a/src/batch/src/execution/grpc_exchange.rs b/src/batch/src/execution/grpc_exchange.rs index eb85a2f3253a..8abed4c8fb11 100644 --- a/src/batch/src/execution/grpc_exchange.rs +++ b/src/batch/src/execution/grpc_exchange.rs @@ -72,7 +72,7 @@ impl Debug for GrpcExchangeSource { } impl ExchangeSource for GrpcExchangeSource { - type TakeDataFuture<'a> = impl Future>>; + type TakeDataFuture<'a> = impl Future>> + 'a; fn take_data(&mut self) -> Self::TakeDataFuture<'_> { async { diff --git a/src/batch/src/execution/local_exchange.rs b/src/batch/src/execution/local_exchange.rs index 259f8550601a..f098e6d21978 100644 --- a/src/batch/src/execution/local_exchange.rs +++ b/src/batch/src/execution/local_exchange.rs @@ -52,7 +52,7 @@ impl Debug for LocalExchangeSource { } impl ExchangeSource for LocalExchangeSource { - type TakeDataFuture<'a> = impl Future>>; + type TakeDataFuture<'a> = impl Future>> + 'a; fn take_data(&mut self) -> Self::TakeDataFuture<'_> { async { diff --git a/src/batch/src/executor/test_utils.rs b/src/batch/src/executor/test_utils.rs index f81073ecf927..dcd1178443fb 100644 --- a/src/batch/src/executor/test_utils.rs +++ b/src/batch/src/executor/test_utils.rs @@ -251,7 +251,7 @@ impl FakeExchangeSource { } impl ExchangeSource for FakeExchangeSource { - type TakeDataFuture<'a> = impl Future>>; + type TakeDataFuture<'a> = impl Future>> + 'a; fn take_data(&mut self) -> Self::TakeDataFuture<'_> { async { diff --git a/src/batch/src/task/broadcast_channel.rs b/src/batch/src/task/broadcast_channel.rs index 55f3d9ef0e86..4ff9c8506b12 100644 --- a/src/batch/src/task/broadcast_channel.rs +++ b/src/batch/src/task/broadcast_channel.rs @@ -42,7 +42,7 @@ impl Debug for BroadcastSender { } impl ChanSender for BroadcastSender { - type SendFuture<'a> = impl Future>; + type SendFuture<'a> = impl Future> + 'a; fn send(&mut self, chunk: Option) -> Self::SendFuture<'_> { async move { @@ -65,7 +65,7 @@ pub struct BroadcastReceiver { } impl ChanReceiver for BroadcastReceiver { - type RecvFuture<'a> = impl Future>>; + type RecvFuture<'a> = impl Future>> + 'a; fn recv(&mut self) -> Self::RecvFuture<'_> { async move { diff --git a/src/batch/src/task/fifo_channel.rs b/src/batch/src/task/fifo_channel.rs index a5cd02dd8fe6..03502295666e 100644 --- a/src/batch/src/task/fifo_channel.rs +++ b/src/batch/src/task/fifo_channel.rs @@ -39,7 +39,7 @@ pub struct FifoReceiver { } impl ChanSender for FifoSender { - type SendFuture<'a> = impl Future>; + type SendFuture<'a> = impl Future> + 'a; fn send(&mut self, chunk: Option) -> Self::SendFuture<'_> { async { @@ -52,7 +52,7 @@ impl ChanSender for FifoSender { } impl ChanReceiver for FifoReceiver { - type RecvFuture<'a> = impl Future>>; + type RecvFuture<'a> = impl Future>> + 'a; fn recv(&mut self) -> Self::RecvFuture<'_> { async move { diff --git a/src/batch/src/task/hash_shuffle_channel.rs b/src/batch/src/task/hash_shuffle_channel.rs index 65b6d80d6a29..26c4dd4bc918 100644 --- a/src/batch/src/task/hash_shuffle_channel.rs +++ b/src/batch/src/task/hash_shuffle_channel.rs @@ -105,7 +105,7 @@ fn generate_new_data_chunks( } impl ChanSender for HashShuffleSender { - type SendFuture<'a> = impl Future>; + type SendFuture<'a> = impl Future> + 'a; fn send(&mut self, chunk: Option) -> Self::SendFuture<'_> { async move { @@ -150,7 +150,7 @@ impl HashShuffleSender { } impl ChanReceiver for HashShuffleReceiver { - type RecvFuture<'a> = impl Future>>; + type RecvFuture<'a> = impl Future>> + 'a; fn recv(&mut self) -> Self::RecvFuture<'_> { async move {