Skip to content

Commit

Permalink
[refactor] Refactor client and client actor to move the code for bloc…
Browse files Browse the repository at this point in the history
…k processing to client (#7898)

This PR is a pure refactoring. The context is that any processing details should be put in Client instead of ClientActor. ClientActor should just serve as a coordinator class to handle messages and check triggers and immediately pass it to Client. This is better for testing since we can't write unit test for any logic in ClientActor and also better for code readability as the logic is not scattered in two classes.

This PR only moves the part around block processing. The rest is tracked by #7899
  • Loading branch information
mzhangmzz authored and nikurt committed Nov 9, 2022
1 parent 1d99cf6 commit 9776fbd
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 193 deletions.
4 changes: 4 additions & 0 deletions chain/chain/src/test_utils.rs
Expand Up @@ -73,6 +73,10 @@ pub fn wait_for_all_blocks_in_processing(chain: &mut Chain) -> bool {
chain.blocks_in_processing.wait_for_all_blocks()
}

pub fn is_block_in_processing(chain: &Chain, block_hash: &CryptoHash) -> bool {
chain.blocks_in_processing.contains(block_hash)
}

pub fn wait_for_block_in_processing(
chain: &mut Chain,
hash: &CryptoHash,
Expand Down
199 changes: 197 additions & 2 deletions chain/client/src/client.rs
Expand Up @@ -27,7 +27,7 @@ use near_chain::{
};
use near_chain_configs::ClientConfig;
use near_chunks::ShardsManager;
use near_network::types::{FullPeerInfo, NetworkRequests, PeerManagerAdapter};
use near_network::types::{FullPeerInfo, NetworkRequests, PeerManagerAdapter, ReasonForBan};
use near_primitives::block::{Approval, ApprovalInner, ApprovalMessage, Block, BlockHeader, Tip};
use near_primitives::challenge::{Challenge, ChallengeBody};
use near_primitives::hash::CryptoHash;
Expand All @@ -53,6 +53,7 @@ use near_network::types::{AccountKeys, ChainInfo, PeerManagerMessageRequest, Set
use near_o11y::{log_assert, WithSpanContextExt};
use near_primitives::block_header::ApprovalType;
use near_primitives::epoch_manager::RngSeed;
use near_primitives::network::PeerId;
use near_primitives::version::PROTOCOL_VERSION;
use near_primitives::views::CatchupStatusView;

Expand All @@ -64,6 +65,8 @@ pub const EPOCH_SYNC_REQUEST_TIMEOUT: Duration = Duration::from_millis(1_000);
/// How frequently a Epoch Sync response can be sent to a particular peer
// TODO #3488 set 60_000
pub const EPOCH_SYNC_PEER_TIMEOUT: Duration = Duration::from_millis(10);
/// Drop blocks whose height are beyond head + horizon if it is not in the current epoch.
const BLOCK_HORIZON: u64 = 500;

/// number of blocks at the epoch start for which we will log more detailed info
pub const EPOCH_START_INFO_BLOCKS: u64 = 500;
Expand Down Expand Up @@ -806,6 +809,165 @@ impl Client {
}
}

/// Processes received block. Ban peer if the block header is invalid or the block is ill-formed.
// This function is just a wrapper for process_block_impl that makes error propagation easier.
pub fn receive_block(
&mut self,
block: Block,
peer_id: PeerId,
was_requested: bool,
apply_chunks_done_callback: DoneApplyChunkCallback,
) {
let hash = *block.hash();
let prev_hash = *block.header().prev_hash();
let _span = tracing::debug_span!(
target: "client",
"receive_block",
me = ?self.validator_signer.as_ref().map(|vs| vs.validator_id()),
%prev_hash,
%hash,
height = block.header().height(),
%peer_id,
was_requested)
.entered();

let res = self.receive_block_impl(
block,
peer_id.clone(),
was_requested,
apply_chunks_done_callback,
);
// Log the errors here. Note that the real error handling logic is already
// done within process_block_impl, this is just for logging.
if let Err(err) = res {
if err.is_bad_data() {
warn!(target: "client", "Receive bad block: {}", err);
} else if err.is_error() {
if let near_chain::Error::DBNotFoundErr(msg) = &err {
debug_assert!(!msg.starts_with("BLOCK HEIGHT"), "{:?}", err);
}
if self.sync_status.is_syncing() {
// While syncing, we may receive blocks that are older or from next epochs.
// This leads to Old Block or EpochOutOfBounds errors.
debug!(target: "client", "Error on receival of block: {}", err);
} else {
error!(target: "client", "Error on receival of block: {}", err);
}
} else {
debug!(target: "client", error = %err, "Process block: refused by chain");
}
}
}

/// Processes received block.
/// This function first does some pre-check based on block height to avoid processing
/// blocks multiple times.
/// Then it process the block header. If the header if valid, broadcast the block to its peers
/// Then it starts the block processing process to process the full block.
pub(crate) fn receive_block_impl(
&mut self,
block: Block,
peer_id: PeerId,
was_requested: bool,
apply_chunks_done_callback: DoneApplyChunkCallback,
) -> Result<(), near_chain::Error> {
// To protect ourselves from spamming, we do some pre-check on block height before we do any
// real processing.
if !self.check_block_height(&block, was_requested)? {
return Ok(());
}
let prev_hash = *block.header().prev_hash();
let block = block.into();
self.verify_and_rebroadcast_block(&block, was_requested, &peer_id)?;
let provenance =
if was_requested { near_chain::Provenance::SYNC } else { near_chain::Provenance::NONE };
let res = self.start_process_block(block, provenance, apply_chunks_done_callback);
match &res {
Err(near_chain::Error::Orphan) => {
if !self.chain.is_orphan(&prev_hash) {
self.request_block(prev_hash, peer_id)
}
}
_ => {}
}
res
}

/// To protect ourselves from spamming, we do some pre-check on block height before we do any
/// processing. This function returns true if the block height is valid.
fn check_block_height(
&self,
block: &Block,
was_requested: bool,
) -> Result<bool, near_chain::Error> {
let head = self.chain.head()?;
let is_syncing = self.sync_status.is_syncing();
if block.header().height() >= head.height + BLOCK_HORIZON && is_syncing && !was_requested {
debug!(target: "client", head_height = head.height, "Dropping a block that is too far ahead.");
return Ok(false);
}
let tail = self.chain.tail()?;
if block.header().height() < tail {
debug!(target: "client", tail_height = tail, "Dropping a block that is too far behind.");
return Ok(false);
}
// drop the block if a) it is not requested, b) we already processed this height,
//est-utils/actix-test-utils/src/lib.rs c) it is not building on top of current head
if !was_requested
&& block.header().prev_hash()
!= &self
.chain
.head()
.map_or_else(|_| CryptoHash::default(), |tip| tip.last_block_hash)
{
if self.chain.is_height_processed(block.header().height())? {
debug!(target: "client", height = block.header().height(), "Dropping a block because we've seen this height before and we didn't request it");
return Ok(false);
}
}
Ok(true)
}

/// Verify the block and rebroadcast it if it is valid, ban the peer if it's invalid.
/// Ignore all other errors because the full block will be processed later.
/// Note that this happens before the full block processing logic because we want blocks to be
/// propagated in the network fast.
fn verify_and_rebroadcast_block(
&mut self,
block: &MaybeValidated<Block>,
was_requested: bool,
peer_id: &PeerId,
) -> Result<(), near_chain::Error> {
let res = self.chain.process_block_header(block.header(), &mut vec![]);
let res = res.and_then(|_| self.chain.validate_block(block));
match res {
Ok(_) => {
let head = self.chain.head()?;
// do not broadcast blocks that are too far back.
if (head.height < block.header().height()
|| &head.epoch_id == block.header().epoch_id())
&& !was_requested
&& !self.sync_status.is_syncing()
{
self.rebroadcast_block(block.as_ref().into_inner());
}
Ok(())
}
Err(e) if e.is_bad_data() => {
self.ban_peer(peer_id.clone(), ReasonForBan::BadBlockHeader);
Err(e)
}
Err(_) => {
// We are ignoring all other errors and proceeding with the
// block. If it is an orphan (i.e. we haven’t processed its
// previous block) than we will get MissingBlock errors. In
// those cases we shouldn’t reject the block instead passing
// it along. Eventually, it’ll get saved as an orphan.
Ok(())
}
}
}

/// Start the processing of a block. Note that this function will return before
/// the full processing is finished because applying chunks is done asynchronously
/// in the rayon thread pool.
Expand Down Expand Up @@ -931,7 +1093,7 @@ impl Client {
self.request_missing_chunks(blocks_missing_chunks, orphans_missing_chunks);
}

pub fn rebroadcast_block(&mut self, block: &Block) {
fn rebroadcast_block(&mut self, block: &Block) {
if self.rebroadcasted_blocks.get(block.hash()).is_none() {
self.network_adapter.do_send(
PeerManagerMessageRequest::NetworkRequests(NetworkRequests::Block {
Expand Down Expand Up @@ -1867,6 +2029,39 @@ impl Client {
}
}

/* implements functions used to communicate with network */
impl Client {
pub fn request_block(&self, hash: CryptoHash, peer_id: PeerId) {
match self.chain.block_exists(&hash) {
Ok(false) => {
self.network_adapter.do_send(
PeerManagerMessageRequest::NetworkRequests(NetworkRequests::BlockRequest {
hash,
peer_id,
})
.with_span_context(),
);
}
Ok(true) => {
debug!(target: "client", "send_block_request_to_peer: block {} already known", hash)
}
Err(e) => {
error!(target: "client", "send_block_request_to_peer: failed to check block exists: {:?}", e)
}
}
}

pub fn ban_peer(&self, peer_id: PeerId, ban_reason: ReasonForBan) {
self.network_adapter.do_send(
PeerManagerMessageRequest::NetworkRequests(NetworkRequests::BanPeer {
peer_id,
ban_reason,
})
.with_span_context(),
);
}
}

impl Client {
/// Each epoch defines a set of important accounts: block producers, chunk producers,
/// approvers. Low-latency reliable communication between those accounts is critical,
Expand Down

0 comments on commit 9776fbd

Please sign in to comment.