diff --git a/CHANGELOG.md b/CHANGELOG.md index 04c34943e..4eff3ac95 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ rumqttc rumqttd ------- +- Add meters related to router, subscriptions, and connections (#505) +- Allow multi-tenancy validation for mtls clients with `Org` set in certificates - Add `tracing` for structured, context-aware logging (#499, #503) - Added properties field to `Unsubscribe`, `UnsubAck`, and `Disconnect` packets so its consistent with other packets. (#480) - Changed default segment size in demo config to 100MB (#484) diff --git a/Cargo.lock b/Cargo.lock index 259071c48..0d9630f15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -129,6 +129,12 @@ dependencies = [ "nodrop", ] +[[package]] +name = "arrayvec" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" + [[package]] name = "ascii" version = "1.1.0" @@ -235,6 +241,18 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitvec" +version = "0.19.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55f93d0ef3363c364d5976646a38f04cf67cfe1d4c8d160cdea02cab2c116b33" +dependencies = [ + "funty", + "radium", + "tap", + "wyz", +] + [[package]] name = "block-buffer" version = "0.9.0" @@ -409,7 +427,7 @@ dependencies = [ "async-trait", "json5", "lazy_static", - "nom", + "nom 7.1.1", "pathdiff", "ron", "rust-ini", @@ -493,6 +511,12 @@ dependencies = [ "syn", ] +[[package]] +name = "data-encoding" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57" + [[package]] name = "debugid" version = "0.8.0" @@ -512,6 +536,31 @@ dependencies = [ "gzip-header", ] +[[package]] +name = "der-oid-macro" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4cccf60bb98c0fca115a581f894aed0e43fa55bf289fdac5599bec440bb4fd6" +dependencies = [ + "nom 6.1.2", + "num-bigint", + "num-traits", + "syn", +] + +[[package]] +name = "der-parser" +version = "5.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d7ededb7525bb4114bc209685ce7894edc2965f4914312a1ea578a645a237f0" +dependencies = [ + "der-oid-macro", + "nom 6.1.2", + "num-bigint", + "num-traits", + "rusticata-macros", +] + [[package]] name = "diff" version = "0.1.13" @@ -703,6 +752,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" +[[package]] +name = "funty" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7" + [[package]] name = "futures" version = "0.1.31" @@ -1154,6 +1209,19 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lexical-core" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe" +dependencies = [ + "arrayvec 0.5.2", + "bitflags", + "cfg-if 1.0.0", + "ryu", + "static_assertions", +] + [[package]] name = "libc" version = "0.2.134" @@ -1360,6 +1428,19 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb" +[[package]] +name = "nom" +version = "6.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2" +dependencies = [ + "bitvec", + "funty", + "lexical-core", + "memchr", + "version_check", +] + [[package]] name = "nom" version = "7.1.1" @@ -1389,13 +1470,24 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-bigint" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-format" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bafe4179722c2894288ee77a9f044f02811c86af699344c498b0840c698a2465" dependencies = [ - "arrayvec", + "arrayvec 0.4.12", "itoa 0.4.8", ] @@ -1446,6 +1538,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "oid-registry" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6aae73e474f83beacd8ae2179e328e03d63d9223949d97e1b7c108059a34715" +dependencies = [ + "der-parser", +] + [[package]] name = "once_cell" version = "1.15.0" @@ -1872,6 +1973,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "radium" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8" + [[package]] name = "rand" version = "0.8.5" @@ -2073,6 +2180,7 @@ dependencies = [ "tracing-subscriber", "vergen", "websocket-codec", + "x509-parser", ] [[package]] @@ -2124,6 +2232,15 @@ dependencies = [ "semver", ] +[[package]] +name = "rusticata-macros" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbbee512c633ecabd4481c40111b6ded03ddd9ab10ba6caa5a74e14c889921ad" +dependencies = [ + "nom 6.1.2", +] + [[package]] name = "rustls" version = "0.20.6" @@ -2434,6 +2551,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "str_stack" version = "0.1.0" @@ -2518,6 +2641,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + [[package]] name = "tempfile" version = "3.3.0" @@ -3247,6 +3376,30 @@ dependencies = [ "tungstenite 0.16.0", ] +[[package]] +name = "wyz" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214" + +[[package]] +name = "x509-parser" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64abca276c58f8341ddc13fd4bd6ae75993cc669043f5b34813c90f7dff04771" +dependencies = [ + "base64", + "chrono", + "data-encoding", + "der-parser", + "lazy_static", + "nom 6.1.2", + "oid-registry", + "rusticata-macros", + "rustversion", + "thiserror", +] + [[package]] name = "yaml-rust" version = "0.4.5" diff --git a/rumqttd/Cargo.toml b/rumqttd/Cargo.toml index 205f442a5..085dc44ec 100644 --- a/rumqttd/Cargo.toml +++ b/rumqttd/Cargo.toml @@ -24,7 +24,7 @@ rustls-pemfile = { version = "0.3.0", optional = true } tokio-tungstenite = { version = "0.15.0", optional = true } websocket-codec = { version = "0.5.1", optional = true } rouille = "3.1.1" -# x509-parser = {version= "0.9.2", optional = true} +x509-parser = {version= "0.9.2", optional = true} futures-util = { version = "0.3.16", optional = true} parking_lot = "0.11.2" config = "0.13" @@ -34,9 +34,10 @@ tracing-subscriber = { version="0.3.16", features=["env-filter"] } [features] default = ["use-rustls"] -use-rustls = ["tokio-rustls", "rustls-pemfile"] #, "x509-parser"] -use-native-tls = ["tokio-native-tls"] #, "x509-parser"] +use-rustls = ["tokio-rustls", "rustls-pemfile", "x509-parser"] +use-native-tls = ["tokio-native-tls" , "x509-parser"] websockets = ["tokio-tungstenite", "websocket-codec", "tokio-util", "futures-util"] +validate-tenant-prefix = [] [dev-dependencies] pretty_env_logger = "0.4.0" diff --git a/rumqttd/src/link/console.rs b/rumqttd/src/link/console.rs index 826a2936d..f81d9c88f 100644 --- a/rumqttd/src/link/console.rs +++ b/rumqttd/src/link/console.rs @@ -18,8 +18,7 @@ impl ConsoleLink { /// Requires the corresponding Router to be running to complete pub fn new(config: ConsoleSettings, router_tx: Sender<(ConnectionId, Event)>) -> ConsoleLink { let tx = router_tx.clone(); - let (link_tx, link_rx, _ack) = - Link::new(/*None,*/ "console", tx, true, None, true).unwrap(); + let (link_tx, link_rx, _ack) = Link::new(None, "console", tx, true, None, true).unwrap(); let connection_id = link_tx.connection_id; ConsoleLink { config, diff --git a/rumqttd/src/link/local.rs b/rumqttd/src/link/local.rs index fd5d02157..76f91079e 100644 --- a/rumqttd/src/link/local.rs +++ b/rumqttd/src/link/local.rs @@ -40,7 +40,7 @@ pub struct Link; impl Link { #[allow(clippy::type_complexity)] fn prepare( - // tenant_id: Option, + tenant_id: Option, client_id: &str, clean: bool, last_will: Option, @@ -53,7 +53,7 @@ impl Link { Receiver, ) { let (connection, metrics_rx) = Connection::new( - // tenant_id, + tenant_id, client_id.to_owned(), clean, last_will, @@ -81,7 +81,7 @@ impl Link { #[allow(clippy::new_ret_no_self)] pub fn new( - // tenant_id: Option, + tenant_id: Option, client_id: &str, router_tx: Sender<(ConnectionId, Event)>, clean: bool, @@ -91,12 +91,8 @@ impl Link { // Connect to router // Local connections to the router shall have access to all subscriptions - let (message, i, o, link_rx, metrics_rx) = Link::prepare( - /*tenant_id,*/ client_id, - clean, - last_will, - dynamic_filters, - ); + let (message, i, o, link_rx, metrics_rx) = + Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters); router_tx.send((0, message))?; link_rx.recv()?; @@ -115,7 +111,7 @@ impl Link { } pub async fn init( - // tenant_id: Option, + tenant_id: Option, client_id: &str, router_tx: Sender<(ConnectionId, Event)>, clean: bool, @@ -125,12 +121,8 @@ impl Link { // Connect to router // Local connections to the router shall have access to all subscriptions - let (message, i, o, link_rx, metrics_rx) = Link::prepare( - /*tenant_id,*/ client_id, - clean, - last_will, - dynamic_filters, - ); + let (message, i, o, link_rx, metrics_rx) = + Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters); router_tx.send_async((0, message)).await?; link_rx.recv_async().await?; diff --git a/rumqttd/src/link/remote.rs b/rumqttd/src/link/remote.rs index 0be00e285..bb1632a05 100644 --- a/rumqttd/src/link/remote.rs +++ b/rumqttd/src/link/remote.rs @@ -57,7 +57,7 @@ impl RemoteLink

{ pub async fn new( config: Arc, router_tx: Sender<(ConnectionId, Event)>, - // tenant_id: Option, + tenant_id: Option, mut network: Network

, ) -> Result, Error> { // Wait for MQTT connect packet and error out if it's not received in time to prevent @@ -91,7 +91,7 @@ impl RemoteLink

{ } let (link_tx, link_rx, notification) = Link::new( - // tenant_id, + tenant_id, &client_id, router_tx, clean_session, diff --git a/rumqttd/src/link/shadow.rs b/rumqttd/src/link/shadow.rs index 930caec60..87569cb46 100644 --- a/rumqttd/src/link/shadow.rs +++ b/rumqttd/src/link/shadow.rs @@ -71,7 +71,8 @@ impl ShadowLink { let client_id = connect.client_id.clone(); let (link_tx, link_rx, _ack) = Link::new( - /*None,*/ &client_id, + None, + &client_id, router_tx, true, None, diff --git a/rumqttd/src/router/connection.rs b/rumqttd/src/router/connection.rs index 202eab815..a6387c338 100644 --- a/rumqttd/src/router/connection.rs +++ b/rumqttd/src/router/connection.rs @@ -11,8 +11,8 @@ use super::{ConnectionMeter, MetricsReply}; #[derive(Debug)] pub struct Connection { pub client_id: String, - // /// Id of client's organisation/tenant and the prefix associated with tenant's MQTT topic - // pub tenant_prefix: Option, + /// Id of client's organisation/tenant and the prefix associated with tenant's MQTT topic + pub tenant_prefix: Option, /// Dynamically create subscription filters incase they didn't exist during a publish pub dynamic_filters: bool, /// Clean session @@ -29,7 +29,7 @@ pub struct Connection { impl Connection { /// Create connection state to hold identifying information of connecting device pub fn new( - // tenant_id: Option, + tenant_id: Option, client_id: String, clean: bool, last_will: Option, @@ -37,20 +37,20 @@ impl Connection { ) -> (Connection, Receiver) { let (metrics_tx, metrics_rx) = bounded(1); - // // Change client id to -> tenant_id.client_id and derive topic path prefix - // // to validate topics - // let (client_id, tenant_prefix) = match tenant_id { - // Some(tenant_id) => { - // let tenant_prefix = Some("/tenants/".to_owned() + &tenant_id + "/"); - // let client_id = tenant_id + "." + &client_id; - // (client_id, tenant_prefix) - // } - // None => (client_id, None), - // }; + // Change client id to -> tenant_id.client_id and derive topic path prefix + // to validate topics + let (client_id, tenant_prefix) = match tenant_id { + Some(tenant_id) => { + let tenant_prefix = Some("/tenants/".to_owned() + &tenant_id + "/"); + let client_id = tenant_id + "." + &client_id; + (client_id, tenant_prefix) + } + None => (client_id, None), + }; let connection = Connection { client_id, - // tenant_prefix, + tenant_prefix, dynamic_filters, clean, subscriptions: HashSet::default(), diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index 96f4db4cd..3207df688 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -36,8 +36,9 @@ pub enum RouterError { Disconnected, #[error("Topic not utf-8")] NonUtf8Topic(#[from] Utf8Error), - // #[error("Bad Tenant")] - // BadTenant(String, String), + #[cfg(feature = "validate-tenant-prefix")] + #[error("Bad Tenant")] + BadTenant(String, String), #[error("No matching filters to topic {0}")] NoMatchingFilters(String), #[error("Unsupported QoS {0:?}")] @@ -485,8 +486,9 @@ impl Router { info!("Adding subscription on topic {}", f.path); let connection = self.connections.get_mut(id).unwrap(); - if let Err(e) = validate_subscription(/*connection,*/ &f) { + if let Err(e) = validate_subscription(connection, &f) { warn!(reason = ?e,"Subscription cannot be validated: {}", e); + disconnect = true; break; } @@ -850,15 +852,16 @@ fn append_to_commitlog( ) -> Result { let topic = std::str::from_utf8(&publish.topic)?; - // // Ensure that only clients associated with a tenant can publish to tenant's topic - // if let Some(tenant_prefix) = &connections[id].tenant_prefix { - // if !topic.starts_with(tenant_prefix) { - // return Err(RouterError::BadTenant( - // tenant_prefix.to_owned(), - // topic.to_owned(), - // )); - // } - // } + // Ensure that only clients associated with a tenant can publish to tenant's topic + #[cfg(feature = "validate-tenant-prefix")] + if let Some(tenant_prefix) = &connections[id].tenant_prefix { + if !topic.starts_with(tenant_prefix) { + return Err(RouterError::BadTenant( + tenant_prefix.to_owned(), + topic.to_owned(), + )); + } + } if publish.payload.is_empty() { datalog.remove_from_retained_publishes(topic.to_owned()); @@ -1129,15 +1132,21 @@ fn retrieve_metrics(id: ConnectionId, router: &mut Router, metrics: MetricsReque } fn validate_subscription( - // connection: &mut Connection, + connection: &mut Connection, filter: &protocol::Filter, ) -> Result<(), RouterError> { - // // Ensure that only client devices of the tenant can - // if let Some(tenant_prefix) = &connection.tenant_prefix { - // if !filter.path.starts_with(tenant_prefix) { - // return Err(RouterError::InvalidFilterPrefix(filter.path.to_owned())); - // } - // } + trace!( + "validate subscription = {}, tenant = {:?}", + filter.path, + connection.tenant_prefix + ); + // Ensure that only client devices of the tenant can + #[cfg(feature = "validate-tenant-prefix")] + if let Some(tenant_prefix) = &connection.tenant_prefix { + if !filter.path.starts_with(tenant_prefix) { + return Err(RouterError::InvalidFilterPrefix(filter.path.to_owned())); + } + } if filter.qos == QoS::ExactlyOnce { return Err(RouterError::UnsupportedQoS(filter.qos)); diff --git a/rumqttd/src/server/broker.rs b/rumqttd/src/server/broker.rs index b2a12439c..2be046860 100644 --- a/rumqttd/src/server/broker.rs +++ b/rumqttd/src/server/broker.rs @@ -123,13 +123,8 @@ impl Broker { pub fn link(&self, client_id: &str) -> Result<(LinkTx, LinkRx), local::LinkError> { // Register this connection with the router. Router replies with ack which if ok will // start the link. Router can sometimes reject the connection (ex max connection limit) - let (link_tx, link_rx, _ack) = Link::new( - /*None,*/ client_id, - self.router_tx.clone(), - true, - None, - false, - )?; + let (link_tx, link_rx, _ack) = + Link::new(None, client_id, self.router_tx.clone(), true, None, false)?; Ok((link_tx, link_rx)) } @@ -250,20 +245,17 @@ impl Server

{ } // Depending on TLS or not create a new Network - async fn tls_accept( - &self, - stream: TcpStream, - ) -> Result /*, Option)*/, Error> { + async fn tls_accept(&self, stream: TcpStream) -> Result<(Box, Option), Error> { #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))] match &self.config.tls { Some(c) => { - let /*(tenant_id,*/ network/*)*/ = TLSAcceptor::new(c)?.accept(stream).await?; - Ok(/*(*/ network /*, Some(tenant_id))*/) + let (tenant_id, network) = TLSAcceptor::new(c)?.accept(stream).await?; + Ok((network, tenant_id)) } - None => Ok(/*(*/ Box::new(stream) /*, None)*/), + None => Ok((Box::new(stream), None)), } #[cfg(not(any(feature = "use-rustls", feature = "use-native-tls")))] - Ok(/*(*/ Box::new(stream) /*, None)*/) + Ok((Box::new(stream), None)) } async fn start(&self, shadow: bool) -> Result<(), Error> { @@ -287,7 +279,7 @@ impl Server

{ } }; - let /*(*/network /*, tenant_id)*/ = match self.tls_accept(stream).await { + let (network, tenant_id) = match self.tls_accept(stream).await { Ok(o) => o, Err(e) => { error!(error=?e, "Tls accept error"); @@ -314,7 +306,7 @@ impl Server

{ ), )), _ => task::spawn( - remote(config, /*tenant_id,*/ router_tx, network, protocol).instrument( + remote(config, tenant_id, router_tx, network, protocol).instrument( tracing::info_span!( "remote_link", client_id = field::Empty, @@ -336,14 +328,14 @@ impl Server

{ /// mqtt connection packet to make make the server reach its concurrent connection limit) async fn remote( config: Arc, - // tenant_id: Option, + tenant_id: Option, router_tx: Sender<(ConnectionId, Event)>, stream: Box, protocol: P, ) { let network = Network::new(stream, config.max_payload_size, 100, protocol); // Start the link - let mut link = match RemoteLink::new(config, router_tx.clone(), /*tenant_id,*/ network).await { + let mut link = match RemoteLink::new(config, router_tx.clone(), tenant_id, network).await { Ok(l) => l, Err(e) => { error!(error=?e, "Remote link error"); diff --git a/rumqttd/src/server/tls.rs b/rumqttd/src/server/tls.rs index bf1839cac..fc48f4292 100644 --- a/rumqttd/src/server/tls.rs +++ b/rumqttd/src/server/tls.rs @@ -49,34 +49,39 @@ pub enum Error { NativeTlsNotEnabled, #[cfg(not(feature = "use-rustls"))] RustlsNotEnabled, - // #[error("Invalid tenant id = {0}")] - // InvalidTenantId(String), - // #[error("Invalid tenant certificate")] - // InvalidTenant, - // #[error("Tenant id missing in certificate")] - // MissingTenantId, - // #[error("Tenant id missing in certificate")] - // CertificateParse, + #[error("Invalid tenant id = {0}")] + InvalidTenantId(String), + #[error("Invalid tenant certificate")] + InvalidTenant, + #[error("Tenant id missing in certificate")] + MissingTenantId, + #[error("Tenant id missing in certificate")] + CertificateParse, } -// /// Extract uid from certificate's subject organization field -// fn extract_tenant_id(der: &[u8]) -> Result { -// let (_, cert) = -// x509_parser::parse_x509_certificate(der).map_err(|_| Error::CertificateParse)?; -// let tenant_id = match cert.subject().iter_organization().next() { -// Some(org) => match org.as_str() { -// Ok(val) => val.to_string(), -// Err(_) => return Err(Error::InvalidTenant), -// }, -// None => return Err(Error::MissingTenantId), -// }; - -// if tenant_id.chars().any(|c| !c.is_alphanumeric()) { -// return Err(Error::InvalidTenantId(tenant_id)); -// } - -// Ok(tenant_id) -// } +/// Extract uid from certificate's subject organization field +fn extract_tenant_id(der: &[u8]) -> Result, Error> { + let (_, cert) = + x509_parser::parse_x509_certificate(der).map_err(|_| Error::CertificateParse)?; + let tenant_id = match cert.subject().iter_organization().next() { + Some(org) => match org.as_str() { + Ok(val) => val.to_string(), + Err(_) => return Err(Error::InvalidTenant), + }, + None => { + #[cfg(feature = "validate-tenant-prefix")] + return Err(Error::MissingTenantId); + #[cfg(not(feature = "validate-tenant-prefix"))] + return Ok(None); + } + }; + + if tenant_id.chars().any(|c| !c.is_alphanumeric()) { + return Err(Error::InvalidTenantId(tenant_id)); + } + + Ok(Some(tenant_id)) +} #[allow(dead_code)] pub enum TLSAcceptor { @@ -109,30 +114,30 @@ impl TLSAcceptor { } } - pub async fn accept(&self, stream: TcpStream) -> Result /*)*/, Error> { + pub async fn accept(&self, stream: TcpStream) -> Result<(Option, Box), Error> { match self { #[cfg(feature = "use-rustls")] TLSAcceptor::Rustls { acceptor } => { let stream = acceptor.accept(stream).await?; - // let (_, session) = stream.get_ref(); - // let peer_certificates = session - // .peer_certificates() - // .ok_or(Error::NoPeerCertificate)?; - // let tenant_id = extract_tenant_id(&peer_certificates[0].0)?; + let (_, session) = stream.get_ref(); + let peer_certificates = session + .peer_certificates() + .ok_or(Error::NoPeerCertificate)?; + let tenant_id = extract_tenant_id(&peer_certificates[0].0)?; let network = Box::new(stream); - Ok(/*(tenant_id,*/ network /*)*/) + Ok((tenant_id, network)) } #[cfg(feature = "use-native-tls")] TLSAcceptor::NativeTLS { acceptor } => { let stream = acceptor.accept(stream).await?; - // let session = stream.get_ref(); - // let peer_certificate = session - // .peer_certificate()? - // .ok_or(Error::NoPeerCertificate)? - // .to_der()?; - // let tenant_id = extract_tenant_id(&peer_certificate)?; + let session = stream.get_ref(); + let peer_certificate = session + .peer_certificate()? + .ok_or(Error::NoPeerCertificate)? + .to_der()?; + let tenant_id = extract_tenant_id(&peer_certificate)?; let network = Box::new(stream); - Ok(/*(tenant_id,*/ network /*)*/) + Ok((tenant_id, network)) } } }