Skip to content

Commit

Permalink
protocols/kad: Refactor KademliaEvent (#2321)
Browse files Browse the repository at this point in the history
Rename `KademliaEvent::InboundRequestServed` to `KademliaEvent::InboundRequest` and move
`InboundPutRecordRequest` into `InboundRequest::PutRecord` and `InboundAddProviderRequest` into
`InboundRequest::AddProvider`.

Co-authored-by: supercmmetry <vishaals2000@gmail.com>
  • Loading branch information
mxinden and supercmmetry committed Oct 30, 2021
1 parent ff5d455 commit 06c339c
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 64 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -45,6 +45,7 @@
## Version 0.41.0 [unreleased]

- Update individual crates.
- `libp2p-kad`
- `libp2p-websocket`
- Forward `wasm-bindgen` feature to `futures-timer`, `instant`, `parking_lot`, `getrandom/js` and `rand/wasm-bindgen`.

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -75,7 +75,7 @@ libp2p-core = { version = "0.30.0-rc.2", path = "core", default-features = fals
libp2p-floodsub = { version = "0.31.0-rc.1", path = "protocols/floodsub", optional = true }
libp2p-gossipsub = { version = "0.33.0-rc.1", path = "./protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.31.0-rc.2", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.32.0-rc.2", path = "protocols/kad", optional = true }
libp2p-kad = { version = "0.33.0", path = "protocols/kad", optional = true }
libp2p-metrics = { version = "0.1.0-rc.1", path = "misc/metrics", optional = true }
libp2p-mplex = { version = "0.30.0-rc.1", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.33.0-rc.1", path = "transports/noise", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion misc/metrics/Cargo.toml
Expand Up @@ -17,7 +17,7 @@ ping = ["libp2p-ping"]
[dependencies]
libp2p-core= { version = "0.30.0-rc.1", path = "../../core" }
libp2p-identify = { version = "0.31.0-rc.1", path = "../../protocols/identify", optional = true }
libp2p-kad = { version = "0.32.0-rc.1", path = "../../protocols/kad", optional = true }
libp2p-kad = { version = "0.33.0", path = "../../protocols/kad", optional = true }
libp2p-ping = { version = "0.31.0-rc.1", path = "../../protocols/ping", optional = true }
libp2p-swarm = { version = "0.31.0-rc.1", path = "../../swarm" }
open-metrics-client = "0.12.0"
Expand Down
2 changes: 1 addition & 1 deletion misc/metrics/src/kad.rs
Expand Up @@ -258,7 +258,7 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
}
}

libp2p_kad::KademliaEvent::InboundRequestServed { request } => {
libp2p_kad::KademliaEvent::InboundRequest { request } => {
self.kad
.inbound_requests
.get_or_create(&request.into())
Expand Down
7 changes: 6 additions & 1 deletion protocols/kad/CHANGELOG.md
@@ -1,8 +1,13 @@
# 0.32.1 [unreleased]
# 0.33.0 [unreleased]

- Use `instant` and `futures-timer` instead of `wasm-timer` (see [PR 2245]).

- Rename `KademliaEvent::InboundRequestServed` to `KademliaEvent::InboundRequest` and move
`InboundPutRecordRequest` into `InboundRequest::PutRecord` and `InboundAddProviderRequest` into
`InboundRequest::AddProvider` (see [PR 2297]).

[PR 2245]: https://github.com/libp2p/rust-libp2p/pull/2245
[PR 2297]: https://github.com/libp2p/rust-libp2p/pull/2297

# 0.32.0-rc.2 [2021-10-15]

Expand Down
2 changes: 1 addition & 1 deletion protocols/kad/Cargo.toml
Expand Up @@ -2,7 +2,7 @@
name = "libp2p-kad"
edition = "2018"
description = "Kademlia protocol for libp2p"
version = "0.32.1"
version = "0.33.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
106 changes: 58 additions & 48 deletions protocols/kad/src/behaviour.rs
Expand Up @@ -148,8 +148,8 @@ pub enum KademliaStoreInserts {
/// the record is forwarded immediately to the [`RecordStore`].
Unfiltered,
/// Whenever a (provider) record is received, an event is emitted.
/// Provider records generate a [`KademliaEvent::InboundAddProviderRequest`],
/// normal records generate a [`KademliaEvent::InboundPutRecordRequest`].
/// Provider records generate a [`InboundRequest::AddProvider`] under [`KademliaEvent::InboundRequest`],
/// normal records generate a [`InboundRequest::PutRecord`] under [`KademliaEvent::InboundRequest`].
///
/// When deemed valid, a (provider) record needs to be explicitly stored in
/// the [`RecordStore`] via [`RecordStore::put`] or [`RecordStore::add_provider`],
Expand Down Expand Up @@ -1625,11 +1625,23 @@ where
// is a waste of resources.
match self.record_filtering {
KademliaStoreInserts::Unfiltered => match self.store.put(record.clone()) {
Ok(()) => debug!(
"Record stored: {:?}; {} bytes",
record.key,
record.value.len()
),
Ok(()) => {
debug!(
"Record stored: {:?}; {} bytes",
record.key,
record.value.len()
);
self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::InboundRequest {
request: InboundRequest::PutRecord {
source,
connection,
record: None,
},
},
));
}
Err(e) => {
info!("Record not stored: {:?}", e);
self.queued_events
Expand All @@ -1638,16 +1650,19 @@ where
handler: NotifyHandler::One(connection),
event: KademliaHandlerIn::Reset(request_id),
});

return;
}
},
KademliaStoreInserts::FilterBoth => {
self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::InboundPutRecordRequest {
source,
connection,
record: record.clone(),
KademliaEvent::InboundRequest {
request: InboundRequest::PutRecord {
source,
connection,
record: Some(record.clone()),
},
},
));
}
Expand Down Expand Up @@ -1686,12 +1701,24 @@ where
KademliaStoreInserts::Unfiltered => {
if let Err(e) = self.store.add_provider(record) {
info!("Provider record not stored: {:?}", e);
return;
}

self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::InboundRequest {
request: InboundRequest::AddProvider { record: None },
},
));
}
KademliaStoreInserts::FilterBoth => {
self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::InboundAddProviderRequest { record },
KademliaEvent::InboundRequest {
request: InboundRequest::AddProvider {
record: Some(record),
},
},
));
}
}
Expand Down Expand Up @@ -1951,7 +1978,7 @@ where

self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::InboundRequestServed {
KademliaEvent::InboundRequest {
request: InboundRequest::FindNode {
num_closer_peers: closer_peers.len(),
},
Expand Down Expand Up @@ -1982,7 +2009,7 @@ where

self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::InboundRequestServed {
KademliaEvent::InboundRequest {
request: InboundRequest::GetProvider {
num_closer_peers: closer_peers.len(),
num_provider_peers: provider_peers.len(),
Expand Down Expand Up @@ -2039,13 +2066,6 @@ where
}

self.provider_received(key, provider);

self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::InboundRequestServed {
request: InboundRequest::AddProvider {},
},
));
}

KademliaHandlerEvent::GetRecord { key, request_id } => {
Expand All @@ -2066,7 +2086,7 @@ where

self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::InboundRequestServed {
KademliaEvent::InboundRequest {
request: InboundRequest::GetRecord {
num_closer_peers: closer_peers.len(),
present_locally: record.is_some(),
Expand Down Expand Up @@ -2150,13 +2170,6 @@ where

KademliaHandlerEvent::PutRecord { record, request_id } => {
self.record_received(source, connection, request_id, record);

self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::InboundRequestServed {
request: InboundRequest::PutRecord {},
},
));
}

KademliaHandlerEvent::PutRecordRes { user_data, .. } => {
Expand Down Expand Up @@ -2371,26 +2384,12 @@ pub struct PeerRecord {
/// See [`NetworkBehaviour::poll`].
#[derive(Debug)]
pub enum KademliaEvent {
/// A peer sent a [`KademliaHandlerIn::PutRecord`] request and filtering is enabled.
///
/// See [`KademliaStoreInserts`] and [`KademliaConfig::set_record_filtering`].
InboundPutRecordRequest {
source: PeerId,
connection: ConnectionId,
record: Record,
},

/// A peer sent a [`KademliaHandlerIn::AddProvider`] request and filtering [`KademliaStoreInserts::FilterBoth`] is enabled.
///
/// See [`KademliaStoreInserts`] and [`KademliaConfig::set_record_filtering`] for details..
InboundAddProviderRequest { record: ProviderRecord },

/// An inbound request has been received and handled.
//
// Note on the difference between 'request' and 'query': A request is a
// single request-response style exchange with a single remote peer. A query
// is made of multiple requests across multiple remote peers.
InboundRequestServed { request: InboundRequest },
InboundRequest { request: InboundRequest },

/// An outbound query has produced a result.
OutboundQueryCompleted {
Expand Down Expand Up @@ -2464,15 +2463,26 @@ pub enum InboundRequest {
num_closer_peers: usize,
num_provider_peers: usize,
},
/// Request to store a peer as a provider.
AddProvider {},
/// A peer sent a [`KademliaHandlerIn::AddProvider`] request.
/// If filtering [`KademliaStoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is
/// included.
///
/// See [`KademliaStoreInserts`] and [`KademliaConfig::set_record_filtering`] for details..
AddProvider { record: Option<ProviderRecord> },
/// Request to retrieve a record.
GetRecord {
num_closer_peers: usize,
present_locally: bool,
},
/// Request to store a record.
PutRecord {},
/// A peer sent a [`KademliaHandlerIn::PutRecord`] request.
/// If filtering [`KademliaStoreInserts::FilterBoth`] is enabled, the [`Record`] is included.
///
/// See [`KademliaStoreInserts`] and [`KademliaConfig::set_record_filtering`].
PutRecord {
source: PeerId,
connection: ConnectionId,
record: Option<Record>,
},
}

/// The results of Kademlia queries.
Expand Down
31 changes: 20 additions & 11 deletions protocols/kad/src/behaviour/test.rs
Expand Up @@ -602,19 +602,28 @@ fn put_record() {
}
}
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::InboundPutRecordRequest { record, .. },
KademliaEvent::InboundRequest {
request: InboundRequest::PutRecord { record, .. },
},
))) => {
assert_ne!(
swarm.behaviour().record_filtering,
KademliaStoreInserts::Unfiltered
);
if !drop_records {
// Accept the record
swarm
.behaviour_mut()
.store_mut()
.put(record)
.expect("record is stored");
if let Some(record) = record {
assert_eq!(
swarm.behaviour().record_filtering,
KademliaStoreInserts::FilterBoth
);
// Accept the record
swarm
.behaviour_mut()
.store_mut()
.put(record)
.expect("record is stored");
} else {
assert_eq!(
swarm.behaviour().record_filtering,
KademliaStoreInserts::Unfiltered
);
}
}
}
// Ignore any other event.
Expand Down

0 comments on commit 06c339c

Please sign in to comment.