Skip to content

Commit

Permalink
feat(download): prioritize requests for missing bodies (#1095)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk committed Jan 31, 2023
1 parent f771e23 commit e0dbcae
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
5 changes: 4 additions & 1 deletion crates/net/downloaders/src/bodies/concurrent.rs
Expand Up @@ -13,6 +13,7 @@ use reth_interfaces::{
response::BlockResponse,
},
error::{DownloadError, DownloadResult},
priority::Priority,
},
};
use reth_primitives::{BlockNumber, SealedHeader};
Expand Down Expand Up @@ -302,7 +303,7 @@ where
for range in requests {
let headers = self
.query_headers(
*range.start()..*range.end() + 1, //
*range.start()..*range.end() + 1,
range.clone().count() as u64,
)?
.ok_or(DownloadError::MissingHeader { block_number: *range.start() })?;
Expand All @@ -312,6 +313,7 @@ where
Arc::clone(&self.client),
Arc::clone(&self.consensus),
headers,
Priority::High,
);
}
}
Expand Down Expand Up @@ -364,6 +366,7 @@ where
Arc::clone(&this.client),
Arc::clone(&this.consensus),
request,
Priority::Normal,
);
new_request_submitted = true;
}
Expand Down
9 changes: 7 additions & 2 deletions crates/net/downloaders/src/bodies/queue.rs
Expand Up @@ -4,7 +4,10 @@ use futures::{stream::FuturesUnordered, Stream};
use futures_util::StreamExt;
use reth_interfaces::{
consensus::Consensus,
p2p::bodies::{client::BodiesClient, response::BlockResponse},
p2p::{
bodies::{client::BodiesClient, response::BlockResponse},
priority::Priority,
},
};
use reth_primitives::{BlockNumber, SealedHeader};
use std::{
Expand Down Expand Up @@ -66,6 +69,7 @@ where
client: Arc<B>,
consensus: Arc<dyn Consensus>,
request: Vec<SealedHeader>,
priority: Priority,
) {
// Set last max requested block number
self.last_requested_block_number = request
Expand All @@ -81,7 +85,8 @@ where

// Create request and push into the queue.
self.inner.push(
BodiesRequestFuture::new(client, consensus, self.metrics.clone()).with_headers(request),
BodiesRequestFuture::new(client, consensus, priority, self.metrics.clone())
.with_headers(request),
)
}

Expand Down
8 changes: 7 additions & 1 deletion crates/net/downloaders/src/bodies/request.rs
Expand Up @@ -6,6 +6,7 @@ use reth_interfaces::{
p2p::{
bodies::{client::BodiesClient, response::BlockResponse},
error::{DownloadError, DownloadResult},
priority::Priority,
},
};
use reth_primitives::{PeerId, SealedBlock, SealedHeader, WithPeerId, H256};
Expand Down Expand Up @@ -39,6 +40,7 @@ pub(crate) struct BodiesRequestFuture<B: BodiesClient> {
client: Arc<B>,
consensus: Arc<dyn Consensus>,
metrics: DownloaderMetrics,
priority: Priority,
// Headers to download. The collection is shrinked as responses are buffered.
headers: VecDeque<SealedHeader>,
buffer: Vec<BlockResponse>,
Expand All @@ -54,12 +56,14 @@ where
pub(crate) fn new(
client: Arc<B>,
consensus: Arc<dyn Consensus>,
priority: Priority,
metrics: DownloaderMetrics,
) -> Self {
Self {
client,
consensus,
metrics,
priority,
headers: Default::default(),
buffer: Default::default(),
last_request_len: None,
Expand Down Expand Up @@ -102,7 +106,7 @@ where
tracing::trace!(target: "downloaders::bodies", request_len = req.len(), "Requesting bodies");
let client = Arc::clone(&self.client);
self.last_request_len = Some(req.len());
self.fut = Some(client.get_block_bodies(req));
self.fut = Some(client.get_block_bodies_with_priority(req, self.priority));
}

/// Process block response.
Expand Down Expand Up @@ -246,6 +250,7 @@ mod tests {
let fut = BodiesRequestFuture::new(
client.clone(),
Arc::new(TestConsensus::default()),
Priority::Normal,
DownloaderMetrics::new(TEST_SCOPE),
)
.with_headers(headers.clone());
Expand All @@ -267,6 +272,7 @@ mod tests {
let fut = BodiesRequestFuture::new(
client.clone(),
Arc::new(TestConsensus::default()),
Priority::Normal,
DownloaderMetrics::new(TEST_SCOPE),
)
.with_headers(headers.clone());
Expand Down

0 comments on commit e0dbcae

Please sign in to comment.