Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unused TcpConnectService #299

Merged
merged 3 commits into from Mar 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions actix-tls/CHANGES.md
Expand Up @@ -7,10 +7,13 @@
* Remove `connect::ssl::openssl::OpensslConnectService`. [#297]
* Add `connect::ssl::native_tls` module for native tls support. [#295]
* Rename `accept::{nativetls => native_tls}`. [#295]
* Remove `connect::TcpConnectService` type. service caller expect a `TcpStream` should use
`connect::ConnectService` instead and call `Connection<T, TcpStream>::into_parts`. [#299]

[#295]: https://github.com/actix/actix-net/pull/295
[#296]: https://github.com/actix/actix-net/pull/296
[#297]: https://github.com/actix/actix-net/pull/297
[#299]: https://github.com/actix/actix-net/pull/299


## 3.0.0-beta.4 - 2021-02-24
Expand Down
73 changes: 34 additions & 39 deletions actix-tls/src/connect/connector.rs
Expand Up @@ -72,7 +72,7 @@ pub enum TcpConnectorResponse<T> {
port: u16,
local_addr: Option<IpAddr>,
addrs: Option<VecDeque<SocketAddr>>,
stream: Option<ReusableBoxFuture<Result<TcpStream, io::Error>>>,
stream: ReusableBoxFuture<Result<TcpStream, io::Error>>,
},
Error(Option<ConnectError>),
}
Expand Down Expand Up @@ -103,18 +103,22 @@ impl<T: Address> TcpConnectorResponse<T> {
port,
local_addr,
addrs: None,
stream: Some(ReusableBoxFuture::new(connect(addr, local_addr))),
stream: ReusableBoxFuture::new(connect(addr, local_addr)),
},

// when resolver returns multiple socket addr for request they would be popped from
// front end of queue and returns with the first successful tcp connection.
ConnectAddrs::Multi(addrs) => TcpConnectorResponse::Response {
req: Some(req),
port,
local_addr,
addrs: Some(addrs),
stream: None,
},
ConnectAddrs::Multi(mut addrs) => {
let addr = addrs.pop_front().unwrap();

TcpConnectorResponse::Response {
req: Some(req),
port,
local_addr,
addrs: Some(addrs),
stream: ReusableBoxFuture::new(connect(addr, local_addr)),
}
}
}
}
}
Expand All @@ -133,40 +137,31 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
addrs,
stream,
} => loop {
if let Some(new) = stream.as_mut() {
match ready!(new.poll(cx)) {
Ok(sock) => {
let req = req.take().unwrap();
trace!(
"TCP connector: successfully connected to {:?} - {:?}",
req.hostname(),
sock.peer_addr()
);
return Poll::Ready(Ok(Connection::new(sock, req)));
}

Err(err) => {
trace!(
"TCP connector: failed to connect to {:?} port: {}",
req.as_ref().unwrap().hostname(),
port,
);
match ready!(stream.poll(cx)) {
Ok(sock) => {
let req = req.take().unwrap();
trace!(
"TCP connector: successfully connected to {:?} - {:?}",
req.hostname(),
sock.peer_addr()
);
return Poll::Ready(Ok(Connection::new(sock, req)));
}

if addrs.is_none() || addrs.as_ref().unwrap().is_empty() {
return Poll::Ready(Err(ConnectError::Io(err)));
}
Err(err) => {
trace!(
"TCP connector: failed to connect to {:?} port: {}",
req.as_ref().unwrap().hostname(),
port,
);

if let Some(addr) = addrs.as_mut().and_then(|addrs| addrs.pop_front()) {
stream.set(connect(addr, *local_addr));
} else {
return Poll::Ready(Err(ConnectError::Io(err)));
}
}
}

// try to connect
let addr = addrs.as_mut().unwrap().pop_front().unwrap();

let fut = connect(addr, *local_addr);
match stream {
Some(rbf) => rbf.set(fut),
None => *stream = Some(ReusableBoxFuture::new(fut)),
}
},
}
}
Expand Down
8 changes: 4 additions & 4 deletions actix-tls/src/connect/mod.rs
Expand Up @@ -26,20 +26,20 @@ pub mod ssl;
mod uri;

use actix_rt::net::TcpStream;
use actix_service::{pipeline, pipeline_factory, Service, ServiceFactory};
use actix_service::{Service, ServiceFactory};

pub use self::connect::{Address, Connect, Connection};
pub use self::connector::{TcpConnector, TcpConnectorFactory};
pub use self::error::ConnectError;
pub use self::resolve::{Resolve, Resolver, ResolverFactory};
pub use self::service::{ConnectService, ConnectServiceFactory, TcpConnectService};
pub use self::service::{ConnectService, ConnectServiceFactory};

/// Create TCP connector service.
pub fn new_connector<T: Address + 'static>(
resolver: Resolver,
) -> impl Service<Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError> + Clone
{
pipeline(resolver).and_then(TcpConnector)
ConnectServiceFactory::new(resolver).service()
}

/// Create TCP connector service factory.
Expand All @@ -52,7 +52,7 @@ pub fn new_connector_factory<T: Address + 'static>(
Error = ConnectError,
InitError = (),
> + Clone {
pipeline_factory(ResolverFactory::new(resolver)).and_then(TcpConnectorFactory)
ConnectServiceFactory::new(resolver)
}

/// Create connector service with default parameters.
Expand Down
51 changes: 1 addition & 50 deletions actix-tls/src/connect/service.rs
Expand Up @@ -34,14 +34,6 @@ impl ConnectServiceFactory {
resolver: self.resolver.service(),
}
}

/// Construct new tcp stream service
pub fn tcp_service(&self) -> TcpConnectService {
TcpConnectService {
tcp: self.tcp.service(),
resolver: self.resolver.service(),
}
}
}

impl Clone for ConnectServiceFactory {
Expand All @@ -63,7 +55,7 @@ impl<T: Address> ServiceFactory<Connect<T>> for ConnectServiceFactory {

fn new_service(&self, _: ()) -> Self::Future {
let service = self.service();
Box::pin(async move { Ok(service) })
Box::pin(async { Ok(service) })
}
}

Expand Down Expand Up @@ -135,44 +127,3 @@ impl<T: Address> Future for ConnectServiceResponse<T> {
}
}
}

#[derive(Clone)]
pub struct TcpConnectService {
tcp: TcpConnector,
resolver: Resolver,
}

impl<T: Address> Service<Connect<T>> for TcpConnectService {
type Response = TcpStream;
type Error = ConnectError;
type Future = TcpConnectServiceResponse<T>;

actix_service::always_ready!();

fn call(&self, req: Connect<T>) -> Self::Future {
TcpConnectServiceResponse {
fut: ConnectFuture::Resolve(self.resolver.call(req)),
tcp: self.tcp,
}
}
}

pub struct TcpConnectServiceResponse<T: Address> {
fut: ConnectFuture<T>,
tcp: TcpConnector,
}

impl<T: Address> Future for TcpConnectServiceResponse<T> {
type Output = Result<TcpStream, ConnectError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(self.fut.poll_connect(cx))? {
ConnectOutput::Resolved(res) => {
self.fut = ConnectFuture::Connect(self.tcp.call(res));
}
ConnectOutput::Connected(conn) => return Poll::Ready(Ok(conn.into_parts().0)),
}
}
}
}