Skip to content

Commit

Permalink
topology: filter hosts for the control connection
Browse files Browse the repository at this point in the history
Now, the control connection is aware of the host filter and carefully
tries to omit filtered out nodes.

If the initial nodes list contained a node that is not accepted by the
host filter and a control connection has been established to it, a
warning will be printed and the driver will try to re-establish it to
one of the accepted nodes.

Additionally, in case of a mistake where an incorrect host filter is
provided and all nodes are filtered out, a message is printed which
points out the problem.
  • Loading branch information
piodul committed Oct 4, 2022
1 parent 3342a0d commit 0a4f948
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
1 change: 1 addition & 0 deletions scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl Cluster {
server_events_sender,
fetch_schema_metadata,
address_translator,
host_filter,
);

let metadata = metadata_reader.read_metadata(true).await?;
Expand Down
70 changes: 69 additions & 1 deletion scylla/src/transport/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::statement::query::Query;
use crate::transport::connection::{Connection, ConnectionConfig};
use crate::transport::connection_pool::{NodeConnectionPool, PoolConfig, PoolSize};
use crate::transport::errors::{DbError, QueryError};
use crate::transport::host_filter::HostFilter;
use crate::transport::session::{AddressTranslator, IntoTypedRows};
use crate::utils::parse::{ParseErrorCause, ParseResult, ParserState};

Expand Down Expand Up @@ -36,6 +37,7 @@ pub(crate) struct MetadataReader {
fetch_schema: bool,

address_translator: Option<Arc<dyn AddressTranslator>>,
host_filter: Option<Arc<dyn HostFilter>>,
}

/// Describes all metadata retrieved from the cluster
Expand Down Expand Up @@ -219,6 +221,7 @@ impl MetadataReader {
server_event_sender: mpsc::Sender<Event>,
fetch_schema: bool,
address_translator: &Option<Arc<dyn AddressTranslator>>,
host_filter: &Option<Arc<dyn HostFilter>>,
) -> Self {
let control_connection_address = *known_peers
.choose(&mut thread_rng())
Expand All @@ -243,6 +246,7 @@ impl MetadataReader {
known_peers: known_peers.into(),
fetch_schema,
address_translator: address_translator.clone(),
host_filter: host_filter.clone(),
}
}

Expand All @@ -251,6 +255,9 @@ impl MetadataReader {
let mut result = self.fetch_metadata(initial).await;
if let Ok(metadata) = result {
self.update_known_peers(&metadata);
if initial {
self.handle_unaccepted_host_in_control_connection(&metadata);
}
return Ok(metadata);
}

Expand Down Expand Up @@ -302,6 +309,7 @@ impl MetadataReader {
match &result {
Ok(metadata) => {
self.update_known_peers(metadata);
self.handle_unaccepted_host_in_control_connection(metadata);
debug!("Fetched new metadata");
}
Err(error) => error!(
Expand Down Expand Up @@ -343,7 +351,67 @@ impl MetadataReader {
}

fn update_known_peers(&mut self, metadata: &Metadata) {
self.known_peers = metadata.peers.iter().map(|peer| peer.address).collect();
let host_filter = self.host_filter.as_ref();
self.known_peers = metadata
.peers
.iter()
.filter(|peer| host_filter.map_or(true, |f| f.accept(peer)))
.map(|peer| peer.address)
.collect();

// Check if the host filter isn't accidentally too restrictive,
// and print an error message about this fact
if !metadata.peers.is_empty() && self.known_peers.is_empty() {
error!(
node_ips = ?metadata
.peers
.iter()
.map(|peer| peer.address)
.collect::<Vec<_>>(),
"The host filter rejected all nodes in the cluster, \
no connections that can serve user queries have been \
established. The session cannot serve any queries!"
)
}
}

fn handle_unaccepted_host_in_control_connection(&mut self, metadata: &Metadata) {
let control_connection_peer = metadata
.peers
.iter()
.find(|peer| peer.address == self.control_connection_address);
if let Some(peer) = control_connection_peer {
if !self.host_filter.as_ref().map_or(true, |f| f.accept(peer)) {
warn!(
filtered_node_ips = ?metadata
.peers
.iter()
.filter(|peer| self.host_filter.as_ref().map_or(true, |p| p.accept(peer)))
.map(|peer| peer.address)
.collect::<Vec<_>>(),
control_connection_address = ?self.control_connection_address,
"The node that the control connection is established to \
is not accepted by the host filter. Please verify that \
the nodes in your initial peers list are accepted by the \
host filter. The driver will try to re-establish the \
control connection to a different node."
);

// Assuming here that known_peers are up-to-date
if !self.known_peers.is_empty() {
self.control_connection_address = *self
.known_peers
.choose(&mut thread_rng())
.expect("known_peers is empty - should be impossible");

self.control_connection = Self::make_control_connection_pool(
self.control_connection_address,
self.connection_config.clone(),
self.keepalive_interval,
);
}
}
}
}

fn make_control_connection_pool(
Expand Down

0 comments on commit 0a4f948

Please sign in to comment.