Skip to content

Commit

Permalink
Merge pull request #1353 from twittner/fixes
Browse files Browse the repository at this point in the history
Several changes.
  • Loading branch information
tomaka committed Jan 2, 2020
2 parents 34a3e5c + d870c73 commit ff0d2d5
Show file tree
Hide file tree
Showing 40 changed files with 165 additions and 185 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -14,7 +14,7 @@ default = ["secp256k1", "libp2p-websocket"]
secp256k1 = ["libp2p-core/secp256k1", "libp2p-secio/secp256k1"]

[dependencies]
bytes = "0.4"
bytes = "0.5"
futures = "0.3.1"
multiaddr = { package = "parity-multiaddr", version = "0.6.0", path = "misc/multiaddr" }
multihash = { package = "parity-multihash", version = "0.2.0", path = "misc/multihash" }
Expand Down
8 changes: 4 additions & 4 deletions core/Cargo.toml
Expand Up @@ -12,12 +12,12 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
asn1_der = "0.6.1"
bs58 = "0.3.0"
bytes = "0.4"
bytes = "0.5"
ed25519-dalek = "1.0.0-pre.3"
failure = "0.1"
fnv = "1.0"
futures = { version = "0.3.1", features = ["compat", "io-compat", "executor", "thread-pool"] }
futures-timer = "0.3"
futures-timer = "2"
lazy_static = "1.2"
libsecp256k1 = { version = "0.3.1", optional = true }
log = "0.4"
Expand All @@ -26,13 +26,13 @@ multihash = { package = "parity-multihash", version = "0.2.0", path = "../misc/m
multistream-select = { version = "0.6.0", path = "../misc/multistream-select" }
parking_lot = "0.9.0"
pin-project = "0.4.6"
protobuf = "2.8"
protobuf = "= 2.8.1"
quick-error = "1.2"
rand = "0.7"
rw-stream-sink = { version = "0.1.1", path = "../misc/rw-stream-sink" }
sha2 = "0.8.0"
smallvec = "1.0"
unsigned-varint = "0.2"
unsigned-varint = "0.3"
void = "1"
zeroize = "1"

Expand Down
2 changes: 1 addition & 1 deletion core/src/nodes/listeners.rs
Expand Up @@ -358,7 +358,7 @@ mod tests {

#[test]
fn incoming_event() {
futures::executor::block_on(async move {
async_std::task::block_on(async move {
let mem_transport = transport::MemoryTransport::default();

let mut listeners = ListenersStream::new(mem_transport);
Expand Down
4 changes: 1 addition & 3 deletions core/src/transport/memory.rs
Expand Up @@ -19,7 +19,6 @@
// DEALINGS IN THE SOFTWARE.

use crate::{Transport, transport::{TransportError, ListenerEvent}};
use bytes::IntoBuf;
use fnv::FnvHashMap;
use futures::{future::{self, Ready}, prelude::*, channel::mpsc, task::Context, task::Poll};
use lazy_static::lazy_static;
Expand Down Expand Up @@ -271,8 +270,7 @@ impl<T> Sink<T> for Chan<T> {
}
}

impl<T: IntoBuf> Into<RwStreamSink<Chan<T>>> for Chan<T> {
#[inline]
impl<T: AsRef<[u8]>> Into<RwStreamSink<Chan<T>>> for Chan<T> {
fn into(self) -> RwStreamSink<Chan<T>> {
RwStreamSink::new(self)
}
Expand Down
5 changes: 2 additions & 3 deletions core/src/transport/timeout.rs
Expand Up @@ -172,10 +172,9 @@ where
Poll::Ready(Err(err)) => return Poll::Ready(Err(TransportTimeoutError::Other(err))),
}

match TryFuture::try_poll(Pin::new(&mut this.timer), cx) {
match Pin::new(&mut this.timer).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => Poll::Ready(Err(TransportTimeoutError::Timeout)),
Poll::Ready(Err(err)) => Poll::Ready(Err(TransportTimeoutError::TimerError(err))),
Poll::Ready(()) => Poll::Ready(Err(TransportTimeoutError::Timeout))
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions core/tests/network_dial_error.rs
Expand Up @@ -113,7 +113,7 @@ fn deny_incoming_connec() {

swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();

let address = futures::executor::block_on(future::poll_fn(|cx| {
let address = async_std::task::block_on(future::poll_fn(|cx| {
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll(cx) {
Poll::Ready(listen_addr)
} else {
Expand All @@ -126,7 +126,7 @@ fn deny_incoming_connec() {
.into_not_connected().unwrap()
.connect(address.clone(), TestHandler::default().into_node_handler_builder());

futures::executor::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
async_std::task::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
match swarm1.poll(cx) {
Poll::Ready(NetworkEvent::IncomingConnection(inc)) => drop(inc),
Poll::Ready(_) => unreachable!(),
Expand Down Expand Up @@ -182,7 +182,7 @@ fn dial_self() {

swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();

let (address, mut swarm) = futures::executor::block_on(
let (address, mut swarm) = async_std::task::block_on(
future::lazy(move |cx| {
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm.poll(cx) {
Ok::<_, void::Void>((listen_addr, swarm))
Expand All @@ -196,7 +196,7 @@ fn dial_self() {

let mut got_dial_err = false;
let mut got_inc_err = false;
futures::executor::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
async_std::task::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
loop {
match swarm.poll(cx) {
Poll::Ready(NetworkEvent::UnknownPeerDialError {
Expand Down Expand Up @@ -284,7 +284,7 @@ fn multiple_addresses_err() {
.connect_iter(addresses.clone(), TestHandler::default().into_node_handler_builder())
.unwrap();

futures::executor::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
async_std::task::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
loop {
match swarm.poll(cx) {
Poll::Ready(NetworkEvent::DialError {
Expand Down
29 changes: 12 additions & 17 deletions core/tests/network_simult.rs
Expand Up @@ -18,8 +18,6 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

mod util;

use futures::prelude::*;
use libp2p_core::{identity, upgrade, Transport};
use libp2p_core::nodes::{Network, NetworkEvent, Peer};
Expand Down Expand Up @@ -111,10 +109,7 @@ fn raw_swarm_simultaneous_connect() {
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1Lazy)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.and_then(|(peer, mplex), _| {
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
});
.multiplex(libp2p_mplex::MplexConfig::new());
Network::new(transport, local_public_key.into_peer_id())
};

Expand All @@ -124,10 +119,7 @@ fn raw_swarm_simultaneous_connect() {
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1Lazy)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.and_then(|(peer, mplex), _| {
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
});
.multiplex(libp2p_mplex::MplexConfig::new());
Network::new(transport, local_public_key.into_peer_id())
};

Expand Down Expand Up @@ -160,7 +152,7 @@ fn raw_swarm_simultaneous_connect() {
Dialing,
Connected,
Replaced,
Errored
Denied
}

loop {
Expand All @@ -170,7 +162,7 @@ fn raw_swarm_simultaneous_connect() {
let mut swarm1_dial_start = Delay::new(Duration::new(0, rand::random::<u32>() % 50_000_000));
let mut swarm2_dial_start = Delay::new(Duration::new(0, rand::random::<u32>() % 50_000_000));

let future = future::poll_fn(|cx| -> Poll<bool> {
let future = future::poll_fn(|cx| {
loop {
let mut swarm1_not_ready = false;
let mut swarm2_not_ready = false;
Expand Down Expand Up @@ -210,7 +202,7 @@ fn raw_swarm_simultaneous_connect() {
error: IncomingError::DeniedLowerPriority, ..
}) => {
assert_eq!(swarm1_step, Step::Connected);
swarm1_step = Step::Errored
swarm1_step = Step::Denied
}
Poll::Ready(NetworkEvent::Connected { conn_info, .. }) => {
assert_eq!(conn_info, *swarm2.local_peer_id());
Expand Down Expand Up @@ -241,7 +233,7 @@ fn raw_swarm_simultaneous_connect() {
error: IncomingError::DeniedLowerPriority, ..
}) => {
assert_eq!(swarm2_step, Step::Connected);
swarm2_step = Step::Errored
swarm2_step = Step::Denied
}
Poll::Ready(NetworkEvent::Connected { conn_info, .. }) => {
assert_eq!(conn_info, *swarm1.local_peer_id());
Expand All @@ -268,9 +260,12 @@ fn raw_swarm_simultaneous_connect() {

match (swarm1_step, swarm2_step) {
| (Step::Connected, Step::Replaced)
| (Step::Connected, Step::Errored)
| (Step::Connected, Step::Denied)
| (Step::Replaced, Step::Connected)
| (Step::Errored, Step::Connected) => return Poll::Ready(true),
| (Step::Replaced, Step::Denied)
| (Step::Replaced, Step::Replaced)
| (Step::Denied, Step::Connected)
| (Step::Denied, Step::Replaced) => return Poll::Ready(true),
_else => ()
}

Expand All @@ -280,7 +275,7 @@ fn raw_swarm_simultaneous_connect() {
}
});

if futures::executor::block_on(future) {
if async_std::task::block_on(future) {
// The test exercised what we wanted to exercise: a simultaneous connect.
break
}
Expand Down
2 changes: 1 addition & 1 deletion misc/multiaddr/Cargo.toml
Expand Up @@ -17,7 +17,7 @@ data-encoding = "2.1"
multihash = { package = "parity-multihash", version = "0.2.0", path = "../multihash" }
percent-encoding = "2.1.0"
serde = "1.0.70"
unsigned-varint = "0.2"
unsigned-varint = "0.3"
url = { version = "2.1.0", default-features = false }

[dev-dependencies]
Expand Down
22 changes: 3 additions & 19 deletions misc/multiaddr/src/lib.rs
Expand Up @@ -7,7 +7,7 @@ mod errors;
mod from_url;
mod util;

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use serde::{
Deserialize,
Deserializer,
Expand Down Expand Up @@ -290,10 +290,10 @@ impl From<Ipv6Addr> for Multiaddr {
}
}

impl TryFrom<Bytes> for Multiaddr {
impl TryFrom<Vec<u8>> for Multiaddr {
type Error = Error;

fn try_from(v: Bytes) -> Result<Self> {
fn try_from(v: Vec<u8>) -> Result<Self> {
// Check if the argument is a valid `Multiaddr` by reading its protocols.
let mut slice = &v[..];
while !slice.is_empty() {
Expand All @@ -304,22 +304,6 @@ impl TryFrom<Bytes> for Multiaddr {
}
}

impl TryFrom<BytesMut> for Multiaddr {
type Error = Error;

fn try_from(v: BytesMut) -> Result<Self> {
Multiaddr::try_from(v.freeze())
}
}

impl TryFrom<Vec<u8>> for Multiaddr {
type Error = Error;

fn try_from(v: Vec<u8>) -> Result<Self> {
Multiaddr::try_from(Bytes::from(v))
}
}

impl TryFrom<String> for Multiaddr {
type Error = Error;

Expand Down
6 changes: 3 additions & 3 deletions misc/multihash/Cargo.toml
Expand Up @@ -11,9 +11,9 @@ documentation = "https://docs.rs/parity-multihash/"

[dependencies]
blake2 = { version = "0.8", default-features = false }
bytes = "0.4.12"
rand = { version = "0.6", default-features = false, features = ["std"] }
bytes = "0.5"
rand = { version = "0.7", default-features = false, features = ["std"] }
sha-1 = { version = "0.8", default-features = false }
sha2 = { version = "0.8", default-features = false }
sha3 = { version = "0.8", default-features = false }
unsigned-varint = "0.2"
unsigned-varint = "0.3"
2 changes: 1 addition & 1 deletion misc/multihash/src/lib.rs
Expand Up @@ -247,7 +247,7 @@ impl<'a> MultihashRef<'a> {
/// This operation allocates.
pub fn into_owned(self) -> Multihash {
Multihash {
bytes: Bytes::from(self.bytes)
bytes: Bytes::copy_from_slice(self.bytes)
}
}

Expand Down
4 changes: 2 additions & 2 deletions misc/multistream-select/Cargo.toml
Expand Up @@ -10,12 +10,12 @@ categories = ["network-programming", "asynchronous"]
edition = "2018"

[dependencies]
bytes = "0.4"
bytes = "0.5"
futures = "0.1"
log = "0.4"
smallvec = "1.0"
tokio-io = "0.1"
unsigned-varint = "0.2.2"
unsigned-varint = "0.3"

[dev-dependencies]
tokio = "0.1"
Expand Down
4 changes: 2 additions & 2 deletions misc/multistream-select/src/length_delimited.rs
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use bytes::{Bytes, BytesMut, BufMut};
use bytes::{Bytes, BytesMut, Buf, BufMut};
use futures::{try_ready, Async, Poll, Sink, StartSend, Stream, AsyncSink};
use std::{io, u16};
use tokio_io::{AsyncRead, AsyncWrite};
Expand Down Expand Up @@ -136,7 +136,7 @@ impl<R> LengthDelimited<R> {
"Failed to write buffered frame."))
}

self.write_buffer.split_to(n);
self.write_buffer.advance(n);
}

Ok(Async::Ready(()))
Expand Down
10 changes: 5 additions & 5 deletions misc/multistream-select/src/negotiated.rs
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use bytes::BytesMut;
use bytes::{BytesMut, Buf};
use crate::protocol::{Protocol, MessageReader, Message, Version, ProtocolError};
use futures::{prelude::*, Async, try_ready};
use log::debug;
Expand Down Expand Up @@ -93,7 +93,7 @@ impl<TInner> Negotiated<TInner> {
}

if let State::Completed { remaining, .. } = &mut self.state {
let _ = remaining.take(); // Drop remaining data flushed above.
let _ = remaining.split_to(remaining.len()); // Drop remaining data flushed above.
return Ok(Async::Ready(()))
}

Expand Down Expand Up @@ -232,7 +232,7 @@ where
if n == 0 {
return Err(io::ErrorKind::WriteZero.into())
}
remaining.split_to(n);
remaining.advance(n);
}
io.write(buf)
},
Expand All @@ -251,7 +251,7 @@ where
io::ErrorKind::WriteZero,
"Failed to write remaining buffer."))
}
remaining.split_to(n);
remaining.advance(n);
}
io.flush()
},
Expand Down Expand Up @@ -363,7 +363,7 @@ mod tests {
let cap = rem.len() + free as usize;
let step = u8::min(free, step) as usize + 1;
let buf = Capped { buf: Vec::with_capacity(cap), step };
let rem = BytesMut::from(rem);
let rem = BytesMut::from(&rem[..]);
let mut io = Negotiated::completed(buf, rem.clone());
let mut written = 0;
loop {
Expand Down
6 changes: 3 additions & 3 deletions misc/multistream-select/src/protocol.rs
Expand Up @@ -143,7 +143,7 @@ impl TryFrom<&[u8]> for Protocol {
type Error = ProtocolError;

fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
Self::try_from(Bytes::from(value))
Self::try_from(Bytes::copy_from_slice(value))
}
}

Expand Down Expand Up @@ -208,7 +208,7 @@ impl Message {
out_msg.push(b'\n')
}
dest.reserve(out_msg.len());
dest.put(out_msg);
dest.put(out_msg.as_ref());
Ok(())
}
Message::NotAvailable => {
Expand Down Expand Up @@ -254,7 +254,7 @@ impl Message {
if len == 0 || len > rem.len() || rem[len - 1] != b'\n' {
return Err(ProtocolError::InvalidMessage)
}
let p = Protocol::try_from(Bytes::from(&rem[.. len - 1]))?;
let p = Protocol::try_from(Bytes::copy_from_slice(&rem[.. len - 1]))?;
protocols.push(p);
remaining = &rem[len ..]
}
Expand Down

0 comments on commit ff0d2d5

Please sign in to comment.