Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize monitoring RPCs #793

Merged
merged 11 commits into from
Sep 29, 2021
14 changes: 12 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ url = "2.2"
rusqlite = "0.25.1"
cached = "0.23"
bincode = "1.3"
uuid = { git = "https://github.com/tezedge/uuid", tag = "v0.8.2-cleanup-unsafe-1", default-features = false, features = ["v4"] }
# local dependencies
crypto = { path = "../crypto" }
# TODO: once full integration is done, remove shell dependency from here
Expand Down
16 changes: 16 additions & 0 deletions rpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ use std::{
path::PathBuf,
};

use futures::task::Waker;
use getset::{CopyGetters, Getters};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response};
use slog::{error, Logger};
use tokio::runtime::Handle;
use uuid::Uuid;

use crypto::hash::ChainId;
use shell::mempool::CurrentMempoolStateStorageRef;
use shell::state::streaming_state::StreamCounter;
use shell_integration::ShellConnectorRef;
use storage::{BlockHeaderWithHash, PersistentStorage};
use tezos_api::environment::TezosEnvironmentConfiguration;
Expand Down Expand Up @@ -47,6 +50,19 @@ pub type RpcServiceEnvironmentRef = Arc<RpcServiceEnvironment>;
pub struct RpcCollectedState {
#[get = "pub(crate)"]
current_head: Arc<BlockHeaderWithHash>,

// Wakers for open streams (monitors) that access the mempool state
streams: HashMap<Uuid, Waker>,
bkontur marked this conversation as resolved.
Show resolved Hide resolved
}

impl StreamCounter for RpcCollectedState {
fn get_streams(&self) -> HashMap<Uuid, Waker> {
self.streams.clone()
}

fn get_mutable_streams(&mut self) -> &mut HashMap<Uuid, Waker> {
&mut self.streams
}
}

/// Server environment parameters
Expand Down
4 changes: 4 additions & 0 deletions rpc/src/server/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use shell::mempool::CurrentMempoolStateStorageRef;
use shell_integration::notifications::*;
use shell_integration::*;
use slog::{error, info, warn, Logger};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::time::Duration;
Expand All @@ -14,6 +15,7 @@ use tezos_api::environment::TezosEnvironmentConfiguration;
use tezos_messages::p2p::encoding::version::NetworkVersion;
use tezos_wrapper::TezosApiConnectionPool;
use tokio::runtime::Handle;
use shell::state::streaming_state::StreamCounter;

use crate::server::{spawn_server, RpcCollectedState, RpcServiceEnvironment};
use crate::RpcServiceEnvironmentRef;
Expand Down Expand Up @@ -56,6 +58,7 @@ impl RpcServer {
) -> Self {
let shared_state = Arc::new(RwLock::new(RpcCollectedState {
current_head: hydrated_current_head_block,
streams: HashMap::new(),
}));

let env = Arc::new(RpcServiceEnvironment::new(
Expand Down Expand Up @@ -142,6 +145,7 @@ pub fn handle_notify_rpc_server_msg(
match env.state().write() {
Ok(mut current_head_ref) => {
current_head_ref.current_head = notification.block.clone();
current_head_ref.wake_up_all_streams();
}
Err(e) => {
warn!(env.log(), "Failed to update current head in RPC server env"; "reason" => format!("{}", e));
Expand Down
4 changes: 2 additions & 2 deletions rpc/src/services/mempool_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub fn get_pending_operations(
Ok((mempool_operations, mempool_prevalidator_protocol))
}

fn convert_applied(
pub(crate) fn convert_applied(
applied: &[Applied],
operations: &HashMap<OperationHash, Operation>,
) -> Result<Vec<HashMap<String, Value>>, RpcServiceError> {
Expand Down Expand Up @@ -118,7 +118,7 @@ fn convert_applied(
Ok(result)
}

fn convert_errored(
pub(crate) fn convert_errored(
errored: &[Errored],
operations: &HashMap<OperationHash, Operation>,
protocol: &ProtocolHash,
Expand Down