Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize monitoring RPCs #793

Merged
merged 11 commits into from
Sep 29, 2021
16 changes: 13 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion monitoring/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ tokio = { version = "1.8", features = ["full"] }
tokio-stream = "0.1.2"
futures = { version = "0.3", default-features = false }
warp = "0.3"
uuid = { version = "0.8", features = ["serde", "v4"] }
uuid = { git = "https://github.com/tezedge/uuid", tag = "v0.8.2-cleanup-unsafe-1", default-features = false, features = ["v4"] }
storage = { path = "../storage" }

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ url = "2.2"
rusqlite = "0.25.1"
cached = "0.23"
bincode = "1.3"
uuid = { git = "https://github.com/tezedge/uuid", tag = "v0.8.2-cleanup-unsafe-1", default-features = false, features = ["v4"] }
# local dependencies
crypto = { path = "../crypto" }
# TODO: once full integration is done, remove shell dependency from here
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub fn make_response_with_status_and_json_string(status_code: u16, body: &str) -

/// Function to generate JSON response from a stream
pub(crate) fn make_json_stream_response<
T: futures::Stream<Item = Result<String, anyhow::Error>> + Send + 'static,
T: futures::Stream<Item = Result<String, RpcServiceError>> + Send + 'static,
>(
content: T,
) -> ServiceResult {
Expand Down
15 changes: 14 additions & 1 deletion rpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::runtime::Handle;

use crypto::hash::ChainId;
use shell::mempool::CurrentMempoolStateStorageRef;
use shell_integration::ShellConnectorRef;
use shell_integration::{ShellConnectorRef, StreamCounter, StreamWakers};
use storage::{BlockHeaderWithHash, PersistentStorage};
use tezos_api::environment::TezosEnvironmentConfiguration;
use tezos_messages::p2p::encoding::version::NetworkVersion;
Expand Down Expand Up @@ -47,6 +47,19 @@ pub type RpcServiceEnvironmentRef = Arc<RpcServiceEnvironment>;
pub struct RpcCollectedState {
#[get = "pub(crate)"]
current_head: Arc<BlockHeaderWithHash>,

// Wakers for open streams (monitors) that access the mempool state
streams: StreamWakers,
}

impl StreamCounter for RpcCollectedState {
fn get_streams(&self) -> StreamWakers {
self.streams.clone()
}

fn get_mutable_streams(&mut self) -> &mut StreamWakers {
&mut self.streams
}
}

/// Server environment parameters
Expand Down
3 changes: 3 additions & 0 deletions rpc/src/server/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use shell::mempool::CurrentMempoolStateStorageRef;
use shell_integration::notifications::*;
use shell_integration::*;
use slog::{error, info, warn, Logger};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::time::Duration;
Expand Down Expand Up @@ -56,6 +57,7 @@ impl RpcServer {
) -> Self {
let shared_state = Arc::new(RwLock::new(RpcCollectedState {
current_head: hydrated_current_head_block,
streams: HashMap::new(),
}));

let env = Arc::new(RpcServiceEnvironment::new(
Expand Down Expand Up @@ -142,6 +144,7 @@ pub fn handle_notify_rpc_server_msg(
match env.state().write() {
Ok(mut current_head_ref) => {
current_head_ref.current_head = notification.block.clone();
current_head_ref.wake_up_all_streams();
}
Err(e) => {
warn!(env.log(), "Failed to update current head in RPC server env"; "reason" => format!("{}", e));
Expand Down
1 change: 1 addition & 0 deletions rpc/src/server/shell_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ pub async fn head_chain(
env.state.clone(),
protocol,
&env.persistent_storage,
env.log.clone(),
))
}

Expand Down
4 changes: 2 additions & 2 deletions rpc/src/services/mempool_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub fn get_pending_operations(
Ok((mempool_operations, mempool_prevalidator_protocol))
}

fn convert_applied(
pub(crate) fn convert_applied(
applied: &[Applied],
operations: &HashMap<OperationHash, Operation>,
) -> Result<Vec<HashMap<String, Value>>, RpcServiceError> {
Expand Down Expand Up @@ -118,7 +118,7 @@ fn convert_applied(
Ok(result)
}

fn convert_errored(
pub(crate) fn convert_errored(
errored: &[Errored],
operations: &HashMap<OperationHash, Operation>,
protocol: &ProtocolHash,
Expand Down