diff --git a/crates/net/downloaders/src/bodies/concurrent.rs b/crates/net/downloaders/src/bodies/concurrent.rs index 545a676b3c0..1e9b3278f98 100644 --- a/crates/net/downloaders/src/bodies/concurrent.rs +++ b/crates/net/downloaders/src/bodies/concurrent.rs @@ -13,6 +13,7 @@ use reth_interfaces::{ response::BlockResponse, }, error::{DownloadError, DownloadResult}, + priority::Priority, }, }; use reth_primitives::{BlockNumber, SealedHeader}; @@ -302,7 +303,7 @@ where for range in requests { let headers = self .query_headers( - *range.start()..*range.end() + 1, // + *range.start()..*range.end() + 1, range.clone().count() as u64, )? .ok_or(DownloadError::MissingHeader { block_number: *range.start() })?; @@ -312,6 +313,7 @@ where Arc::clone(&self.client), Arc::clone(&self.consensus), headers, + Priority::High, ); } } @@ -364,6 +366,7 @@ where Arc::clone(&this.client), Arc::clone(&this.consensus), request, + Priority::Normal, ); new_request_submitted = true; } diff --git a/crates/net/downloaders/src/bodies/queue.rs b/crates/net/downloaders/src/bodies/queue.rs index f78187790c6..33b9276084b 100644 --- a/crates/net/downloaders/src/bodies/queue.rs +++ b/crates/net/downloaders/src/bodies/queue.rs @@ -4,7 +4,10 @@ use futures::{stream::FuturesUnordered, Stream}; use futures_util::StreamExt; use reth_interfaces::{ consensus::Consensus, - p2p::bodies::{client::BodiesClient, response::BlockResponse}, + p2p::{ + bodies::{client::BodiesClient, response::BlockResponse}, + priority::Priority, + }, }; use reth_primitives::{BlockNumber, SealedHeader}; use std::{ @@ -66,6 +69,7 @@ where client: Arc, consensus: Arc, request: Vec, + priority: Priority, ) { // Set last max requested block number self.last_requested_block_number = request @@ -81,7 +85,8 @@ where // Create request and push into the queue. self.inner.push( - BodiesRequestFuture::new(client, consensus, self.metrics.clone()).with_headers(request), + BodiesRequestFuture::new(client, consensus, priority, self.metrics.clone()) + .with_headers(request), ) } diff --git a/crates/net/downloaders/src/bodies/request.rs b/crates/net/downloaders/src/bodies/request.rs index 3845574010f..c2e9e0651f6 100644 --- a/crates/net/downloaders/src/bodies/request.rs +++ b/crates/net/downloaders/src/bodies/request.rs @@ -6,6 +6,7 @@ use reth_interfaces::{ p2p::{ bodies::{client::BodiesClient, response::BlockResponse}, error::{DownloadError, DownloadResult}, + priority::Priority, }, }; use reth_primitives::{PeerId, SealedBlock, SealedHeader, WithPeerId, H256}; @@ -39,6 +40,7 @@ pub(crate) struct BodiesRequestFuture { client: Arc, consensus: Arc, metrics: DownloaderMetrics, + priority: Priority, // Headers to download. The collection is shrinked as responses are buffered. headers: VecDeque, buffer: Vec, @@ -54,12 +56,14 @@ where pub(crate) fn new( client: Arc, consensus: Arc, + priority: Priority, metrics: DownloaderMetrics, ) -> Self { Self { client, consensus, metrics, + priority, headers: Default::default(), buffer: Default::default(), last_request_len: None, @@ -102,7 +106,7 @@ where tracing::trace!(target: "downloaders::bodies", request_len = req.len(), "Requesting bodies"); let client = Arc::clone(&self.client); self.last_request_len = Some(req.len()); - self.fut = Some(client.get_block_bodies(req)); + self.fut = Some(client.get_block_bodies_with_priority(req, self.priority)); } /// Process block response. @@ -246,6 +250,7 @@ mod tests { let fut = BodiesRequestFuture::new( client.clone(), Arc::new(TestConsensus::default()), + Priority::Normal, DownloaderMetrics::new(TEST_SCOPE), ) .with_headers(headers.clone()); @@ -267,6 +272,7 @@ mod tests { let fut = BodiesRequestFuture::new( client.clone(), Arc::new(TestConsensus::default()), + Priority::Normal, DownloaderMetrics::new(TEST_SCOPE), ) .with_headers(headers.clone());