Skip to content

Commit

Permalink
feat: Support for multi tenancy prefix checks (bytebeamio#505)
Browse files Browse the repository at this point in the history
* Enable multitenancy again

* Update cargo lock

* Add feature flag for tenant prefix validation

* Fix shadow compilation error

* Cargo format

* Don't error on empty organisation in TLS cert

* Update changelog

* Update stale parts of main branch

* Remove meters link which is included by mistake (not included in module tree anyway)

* Default to not having multitenancy prefix checks

Co-authored-by: Mrinal Paliwal <mrinal16164@iiitd.ac.in>
  • Loading branch information
tekjar and mnpw committed Nov 20, 2022
1 parent 26b4a99 commit f062324
Show file tree
Hide file tree
Showing 11 changed files with 272 additions and 118 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -8,6 +8,8 @@ rumqttc

rumqttd
-------
- Add meters related to router, subscriptions, and connections (#505)
- Allow multi-tenancy validation for mtls clients with `Org` set in certificates
- Add `tracing` for structured, context-aware logging (#499, #503)
- Added properties field to `Unsubscribe`, `UnsubAck`, and `Disconnect` packets so its consistent with other packets. (#480)
- Changed default segment size in demo config to 100MB (#484)
Expand Down
157 changes: 155 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions rumqttd/Cargo.toml
Expand Up @@ -24,7 +24,7 @@ rustls-pemfile = { version = "0.3.0", optional = true }
tokio-tungstenite = { version = "0.15.0", optional = true }
websocket-codec = { version = "0.5.1", optional = true }
rouille = "3.1.1"
# x509-parser = {version= "0.9.2", optional = true}
x509-parser = {version= "0.9.2", optional = true}
futures-util = { version = "0.3.16", optional = true}
parking_lot = "0.11.2"
config = "0.13"
Expand All @@ -34,9 +34,10 @@ tracing-subscriber = { version="0.3.16", features=["env-filter"] }

[features]
default = ["use-rustls"]
use-rustls = ["tokio-rustls", "rustls-pemfile"] #, "x509-parser"]
use-native-tls = ["tokio-native-tls"] #, "x509-parser"]
use-rustls = ["tokio-rustls", "rustls-pemfile", "x509-parser"]
use-native-tls = ["tokio-native-tls" , "x509-parser"]
websockets = ["tokio-tungstenite", "websocket-codec", "tokio-util", "futures-util"]
validate-tenant-prefix = []

[dev-dependencies]
pretty_env_logger = "0.4.0"
Expand Down
3 changes: 1 addition & 2 deletions rumqttd/src/link/console.rs
Expand Up @@ -18,8 +18,7 @@ impl ConsoleLink {
/// Requires the corresponding Router to be running to complete
pub fn new(config: ConsoleSettings, router_tx: Sender<(ConnectionId, Event)>) -> ConsoleLink {
let tx = router_tx.clone();
let (link_tx, link_rx, _ack) =
Link::new(/*None,*/ "console", tx, true, None, true).unwrap();
let (link_tx, link_rx, _ack) = Link::new(None, "console", tx, true, None, true).unwrap();
let connection_id = link_tx.connection_id;
ConsoleLink {
config,
Expand Down
24 changes: 8 additions & 16 deletions rumqttd/src/link/local.rs
Expand Up @@ -40,7 +40,7 @@ pub struct Link;
impl Link {
#[allow(clippy::type_complexity)]
fn prepare(
// tenant_id: Option<String>,
tenant_id: Option<String>,
client_id: &str,
clean: bool,
last_will: Option<LastWill>,
Expand All @@ -53,7 +53,7 @@ impl Link {
Receiver<MetricsReply>,
) {
let (connection, metrics_rx) = Connection::new(
// tenant_id,
tenant_id,
client_id.to_owned(),
clean,
last_will,
Expand Down Expand Up @@ -81,7 +81,7 @@ impl Link {

#[allow(clippy::new_ret_no_self)]
pub fn new(
// tenant_id: Option<String>,
tenant_id: Option<String>,
client_id: &str,
router_tx: Sender<(ConnectionId, Event)>,
clean: bool,
Expand All @@ -91,12 +91,8 @@ impl Link {
// Connect to router
// Local connections to the router shall have access to all subscriptions

let (message, i, o, link_rx, metrics_rx) = Link::prepare(
/*tenant_id,*/ client_id,
clean,
last_will,
dynamic_filters,
);
let (message, i, o, link_rx, metrics_rx) =
Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters);
router_tx.send((0, message))?;

link_rx.recv()?;
Expand All @@ -115,7 +111,7 @@ impl Link {
}

pub async fn init(
// tenant_id: Option<String>,
tenant_id: Option<String>,
client_id: &str,
router_tx: Sender<(ConnectionId, Event)>,
clean: bool,
Expand All @@ -125,12 +121,8 @@ impl Link {
// Connect to router
// Local connections to the router shall have access to all subscriptions

let (message, i, o, link_rx, metrics_rx) = Link::prepare(
/*tenant_id,*/ client_id,
clean,
last_will,
dynamic_filters,
);
let (message, i, o, link_rx, metrics_rx) =
Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters);
router_tx.send_async((0, message)).await?;

link_rx.recv_async().await?;
Expand Down
4 changes: 2 additions & 2 deletions rumqttd/src/link/remote.rs
Expand Up @@ -57,7 +57,7 @@ impl<P: Protocol> RemoteLink<P> {
pub async fn new(
config: Arc<ConnectionSettings>,
router_tx: Sender<(ConnectionId, Event)>,
// tenant_id: Option<String>,
tenant_id: Option<String>,
mut network: Network<P>,
) -> Result<RemoteLink<P>, Error> {
// Wait for MQTT connect packet and error out if it's not received in time to prevent
Expand Down Expand Up @@ -91,7 +91,7 @@ impl<P: Protocol> RemoteLink<P> {
}

let (link_tx, link_rx, notification) = Link::new(
// tenant_id,
tenant_id,
&client_id,
router_tx,
clean_session,
Expand Down

0 comments on commit f062324

Please sign in to comment.