Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(request-response): don't use upgrade infrastructure #3914

Merged
merged 34 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1db7fc3
Never close connections in request-response
thomaseizinger May 11, 2023
10508d6
Log error instead
thomaseizinger May 11, 2023
f4f1e58
Refactor `request-response` to not use upgrade mechanism
thomaseizinger May 11, 2023
4afc9d3
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger Jun 5, 2023
30f53b3
Don't panic on timeouts
thomaseizinger Jun 5, 2023
50e6207
Remove unnecessary comment
thomaseizinger Jun 5, 2023
aea1027
Use `mpsc` channel for sending inbound requests
thomaseizinger Jun 5, 2023
6e5f76d
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger Jul 31, 2023
d41f44b
Fix keep-alive TODO
thomaseizinger Jul 31, 2023
796780e
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger Jul 31, 2023
791220c
Fix MSRV issue
thomaseizinger Jul 31, 2023
eaf1d97
Introduce `futures-bounded` for time and space bounded workers
thomaseizinger Aug 3, 2023
be64f0d
Add changelog
thomaseizinger Aug 3, 2023
9a9fd79
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger Aug 3, 2023
bdd95f7
Add description
thomaseizinger Aug 3, 2023
51ef639
Add test for backpressure
thomaseizinger Aug 3, 2023
a250457
Allow configuration of max capacity for worker streams
thomaseizinger Aug 6, 2023
7127463
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger Sep 18, 2023
0723c2f
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger Sep 20, 2023
701d288
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger Sep 20, 2023
f8c42de
Fix compile errors
thomaseizinger Sep 20, 2023
7960117
Update swarm/CHANGELOG.md
thomaseizinger Sep 20, 2023
671b90f
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger Oct 16, 2023
3810cac
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger Oct 19, 2023
4b24287
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger Oct 20, 2023
302c920
Move changelog entry
thomaseizinger Oct 20, 2023
dfa3a58
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger Oct 20, 2023
53b9abd
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger Oct 20, 2023
d812e1c
Merge branch 'refactor/req-res-on-upgrade' of github.com:libp2p/rust-…
thomaseizinger Oct 20, 2023
71354b2
Report IO failures
thomaseizinger Oct 20, 2023
feccbc2
fix(request-response): Report failures (#4701)
oblique Oct 26, 2023
beb8863
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger Oct 26, 2023
c6603a1
Add further changelog entry
thomaseizinger Oct 26, 2023
1599f8f
Merge branch 'master' into refactor/req-res-on-upgrade
mergify[bot] Oct 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions examples/file-sharing/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use libp2p::{
identity, kad,
multiaddr::Protocol,
noise,
request_response::{self, ProtocolSupport, RequestId, ResponseChannel},
request_response::{self, OutboundRequestId, ProtocolSupport, ResponseChannel},
swarm::{NetworkBehaviour, Swarm, SwarmEvent},
tcp, yamux, PeerId,
};
Expand Down Expand Up @@ -175,7 +175,7 @@ pub(crate) struct EventLoop {
pending_start_providing: HashMap<kad::QueryId, oneshot::Sender<()>>,
pending_get_providers: HashMap<kad::QueryId, oneshot::Sender<HashSet<PeerId>>>,
pending_request_file:
HashMap<RequestId, oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>>,
HashMap<OutboundRequestId, oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>>,
}

impl EventLoop {
Expand Down
3 changes: 3 additions & 0 deletions protocols/autonat/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 0.12.0 - unreleased

- Remove `Clone`, `PartialEq` and `Eq` implementations on `Event` and its sub-structs.
The `Event` also contains errors which are not clonable or comparable.
See [PR 3914](https://github.com/libp2p/rust-libp2p/pull/3914).

## 0.11.0

Expand Down
16 changes: 9 additions & 7 deletions protocols/autonat/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use instant::Instant;
use libp2p_core::{multiaddr::Protocol, ConnectedPoint, Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_request_response::{
self as request_response, ProtocolSupport, RequestId, ResponseChannel,
self as request_response, InboundRequestId, OutboundRequestId, ProtocolSupport, ResponseChannel,
};
use libp2p_swarm::{
behaviour::{
Expand Down Expand Up @@ -133,7 +133,7 @@ impl ProbeId {
}

/// Event produced by [`Behaviour`].
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug)]
pub enum Event {
/// Event on an inbound probe.
InboundProbe(InboundProbeEvent),
Expand Down Expand Up @@ -187,14 +187,14 @@ pub struct Behaviour {
PeerId,
(
ProbeId,
RequestId,
InboundRequestId,
Vec<Multiaddr>,
ResponseChannel<DialResponse>,
),
>,

// Ongoing outbound probes and mapped to the inner request id.
ongoing_outbound: HashMap<RequestId, ProbeId>,
ongoing_outbound: HashMap<OutboundRequestId, ProbeId>,

// Connected peers with the observed address of each connection.
// If the endpoint of a connection is relayed or not global (in case of Config::only_global_ips),
Expand All @@ -220,9 +220,11 @@ pub struct Behaviour {
impl Behaviour {
pub fn new(local_peer_id: PeerId, config: Config) -> Self {
let protocols = iter::once((DEFAULT_PROTOCOL_NAME, ProtocolSupport::Full));
let mut cfg = request_response::Config::default();
cfg.set_request_timeout(config.timeout);
let inner = request_response::Behaviour::with_codec(AutoNatCodec, protocols, cfg);
let inner = request_response::Behaviour::with_codec(
AutoNatCodec,
protocols,
request_response::Config::default().with_request_timeout(config.timeout),
);
Self {
local_peer_id,
inner,
Expand Down
10 changes: 5 additions & 5 deletions protocols/autonat/src/behaviour/as_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use futures_timer::Delay;
use instant::Instant;
use libp2p_core::Multiaddr;
use libp2p_identity::PeerId;
use libp2p_request_response::{self as request_response, OutboundFailure, RequestId};
use libp2p_request_response::{self as request_response, OutboundFailure, OutboundRequestId};
use libp2p_swarm::{ConnectionId, ListenAddresses, ToSwarm};
use rand::{seq::SliceRandom, thread_rng};
use std::{
Expand All @@ -39,7 +39,7 @@ use std::{
};

/// Outbound probe failed or was aborted.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug)]
pub enum OutboundProbeError {
/// Probe was aborted because no server is known, or all servers
/// are throttled through [`Config::throttle_server_period`].
Expand All @@ -53,7 +53,7 @@ pub enum OutboundProbeError {
Response(ResponseError),
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug)]
pub enum OutboundProbeEvent {
/// A dial-back request was sent to a remote peer.
Request {
Expand Down Expand Up @@ -91,7 +91,7 @@ pub(crate) struct AsClient<'a> {
pub(crate) throttled_servers: &'a mut Vec<(PeerId, Instant)>,
pub(crate) nat_status: &'a mut NatStatus,
pub(crate) confidence: &'a mut usize,
pub(crate) ongoing_outbound: &'a mut HashMap<RequestId, ProbeId>,
pub(crate) ongoing_outbound: &'a mut HashMap<OutboundRequestId, ProbeId>,
pub(crate) last_probe: &'a mut Option<Instant>,
pub(crate) schedule_probe: &'a mut Delay,
pub(crate) listen_addresses: &'a ListenAddresses,
Expand All @@ -117,7 +117,7 @@ impl<'a> HandleInnerEvent for AsClient<'a> {
let probe_id = self
.ongoing_outbound
.remove(&request_id)
.expect("RequestId exists.");
.expect("OutboundRequestId exists.");

let event = match response.result.clone() {
Ok(address) => OutboundProbeEvent::Response {
Expand Down
8 changes: 4 additions & 4 deletions protocols/autonat/src/behaviour/as_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use instant::Instant;
use libp2p_core::{multiaddr::Protocol, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_request_response::{
self as request_response, InboundFailure, RequestId, ResponseChannel,
self as request_response, InboundFailure, InboundRequestId, ResponseChannel,
};
use libp2p_swarm::{
dial_opts::{DialOpts, PeerCondition},
Expand All @@ -38,15 +38,15 @@ use std::{
};

/// Inbound probe failed.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug)]
pub enum InboundProbeError {
/// Receiving the dial-back request or sending a response failed.
InboundRequest(InboundFailure),
/// We refused or failed to dial the client.
Response(ResponseError),
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug)]
pub enum InboundProbeEvent {
/// A dial-back request was received from a remote peer.
Request {
Expand Down Expand Up @@ -85,7 +85,7 @@ pub(crate) struct AsServer<'a> {
PeerId,
(
ProbeId,
RequestId,
InboundRequestId,
Vec<Multiaddr>,
ResponseChannel<DialResponse>,
),
Expand Down
8 changes: 4 additions & 4 deletions protocols/autonat/tests/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn test_auto_probe() {
match client.next_behaviour_event().await {
Event::OutboundProbe(OutboundProbeEvent::Error { peer, error, .. }) => {
assert!(peer.is_none());
assert_eq!(error, OutboundProbeError::NoAddresses);
assert!(matches!(error, OutboundProbeError::NoAddresses));
}
other => panic!("Unexpected behaviour event: {other:?}."),
}
Expand Down Expand Up @@ -181,10 +181,10 @@ async fn test_confidence() {
peer,
error,
} if !test_public => {
assert_eq!(
assert!(matches!(
error,
OutboundProbeError::Response(ResponseError::DialError)
);
));
(peer.unwrap(), probe_id)
}
other => panic!("Unexpected Outbound Event: {other:?}"),
Expand Down Expand Up @@ -261,7 +261,7 @@ async fn test_throttle_server_period() {
match client.next_behaviour_event().await {
Event::OutboundProbe(OutboundProbeEvent::Error { peer, error, .. }) => {
assert!(peer.is_none());
assert_eq!(error, OutboundProbeError::NoServer);
assert!(matches!(error, OutboundProbeError::NoServer));
}
other => panic!("Unexpected behaviour event: {other:?}."),
}
Expand Down
9 changes: 6 additions & 3 deletions protocols/autonat/tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ async fn test_dial_error() {
}) => {
assert_eq!(probe_id, request_probe_id);
assert_eq!(peer, client_id);
assert_eq!(error, InboundProbeError::Response(ResponseError::DialError));
assert!(matches!(
error,
InboundProbeError::Response(ResponseError::DialError)
));
}
other => panic!("Unexpected behaviour event: {other:?}."),
}
Expand Down Expand Up @@ -252,10 +255,10 @@ async fn test_throttle_peer_max() {
}) => {
assert_eq!(client_id, peer);
assert_ne!(first_probe_id, probe_id);
assert_eq!(
assert!(matches!(
error,
InboundProbeError::Response(ResponseError::DialRefused)
)
));
}
other => panic!("Unexpected behaviour event: {other:?}."),
};
Expand Down
14 changes: 9 additions & 5 deletions protocols/rendezvous/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use libp2p_core::{Endpoint, Multiaddr, PeerRecord};
use libp2p_identity::{Keypair, PeerId, SigningError};
use libp2p_request_response::{ProtocolSupport, RequestId};
use libp2p_request_response::{OutboundRequestId, ProtocolSupport};
use libp2p_swarm::{
ConnectionDenied, ConnectionId, ExternalAddresses, FromSwarm, NetworkBehaviour, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
Expand All @@ -41,8 +41,8 @@ pub struct Behaviour {

keypair: Keypair,

waiting_for_register: HashMap<RequestId, (PeerId, Namespace)>,
waiting_for_discovery: HashMap<RequestId, (PeerId, Option<Namespace>)>,
waiting_for_register: HashMap<OutboundRequestId, (PeerId, Namespace)>,
waiting_for_discovery: HashMap<OutboundRequestId, (PeerId, Option<Namespace>)>,

/// Hold addresses of all peers that we have discovered so far.
///
Expand Down Expand Up @@ -336,7 +336,7 @@ impl NetworkBehaviour for Behaviour {
}

impl Behaviour {
fn event_for_outbound_failure(&mut self, req_id: &RequestId) -> Option<Event> {
fn event_for_outbound_failure(&mut self, req_id: &OutboundRequestId) -> Option<Event> {
if let Some((rendezvous_node, namespace)) = self.waiting_for_register.remove(req_id) {
return Some(Event::RegisterFailed {
rendezvous_node,
Expand All @@ -356,7 +356,11 @@ impl Behaviour {
None
}

fn handle_response(&mut self, request_id: &RequestId, response: Message) -> Option<Event> {
fn handle_response(
&mut self,
request_id: &OutboundRequestId,
response: Message,
) -> Option<Event> {
match response {
RegisterResponse(Ok(ttl)) => {
if let Some((rendezvous_node, namespace)) =
Expand Down
8 changes: 7 additions & 1 deletion protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

- Remove `request_response::Config::set_connection_keep_alive` in favor of `SwarmBuilder::idle_connection_timeout`.
See [PR 4679](https://github.com/libp2p/rust-libp2p/pull/4679).

- Allow at most 100 concurrent inbound + outbound streams per instance of `request_response::Behaviour`.
This limit is configurable via `Config::with_max_concurrent_streams`.
See [PR 3914](https://github.com/libp2p/rust-libp2p/pull/3914).
- Report IO failures on inbound and outbound streams.
See [PR 3914](https://github.com/libp2p/rust-libp2p/pull/3914).
- Introduce dedicated types for `InboundRequestId` and `OutboundRequestId`.
See [PR 3914](https://github.com/libp2p/rust-libp2p/pull/3914).
- Keep peer addresses in `HashSet` instead of `SmallVec` to prevent adding duplicate addresses.
See [PR 4700](https://github.com/libp2p/rust-libp2p/pull/4700).

Expand Down
3 changes: 3 additions & 0 deletions protocols/request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ serde_json = { version = "1.0.107", optional = true }
smallvec = "1.11.1"
void = "1.0.2"
log = "0.4.20"
futures-timer = "3.0.2"
futures-bounded = { workspace = true }

[features]
json = ["dep:serde", "dep:serde_json", "libp2p-swarm/macros"]
cbor = ["dep:serde", "dep:cbor4ii", "libp2p-swarm/macros"]

[dev-dependencies]
anyhow = "1.0.75"
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.10.0"
libp2p-noise = { workspace = true }
Expand Down