Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
Add flag to select which advertise IP for arrow Flight SQL #2
Browse files Browse the repository at this point in the history
- Update logic in job_to_fetch_part to use advertise-host flag when it exists
- Remove default from advertise_host in scheduler_config_spec.toml
- Wrap scheduler_server advertise_host variable in Option
- Update scheduler's main.rs to reflect advertise_host being wrapped in Option
  • Loading branch information
Dalton Modlin committed Oct 4, 2022
1 parent 9520ca5 commit 34f920e
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 17 deletions.
3 changes: 1 addition & 2 deletions ballista/rust/scheduler/scheduler_config_spec.toml
Expand Up @@ -27,8 +27,7 @@ doc = "Print version of this executable"
[[param]]
name = "advertise_host"
type = "String"
default = "std::string::String::from(\"127.0.0.1\")"
doc = "IP address for proxying flight results via scheduler."
doc = "Route for proxying flight results via scheduler. Should be of the form 'IP:PORT'"

[[param]]
abbr = "b"
Expand Down
35 changes: 25 additions & 10 deletions ballista/rust/scheduler/src/flight_sql.rs
Expand Up @@ -196,13 +196,28 @@ impl FlightSqlServiceImpl {
) -> Result<Vec<FlightEndpoint>, Status> {
let mut fieps: Vec<_> = vec![];
for loc in completed.partition_location.iter() {
let (host, port) = if let Some(ref md) = loc.executor_meta {
(md.host.clone(), md.port)
} else {
Err(Status::internal(
"Invalid partition location, missing executor metadata".to_string(),
))?
let (host, port) = match self.server.advertise_host {
Some(_) => {
let advertise_host_flag: Vec<&str> = self
.server
.advertise_host
.as_ref()
.unwrap()
.split(":")
.collect();
(advertise_host_flag[0].to_string(), advertise_host_flag[1].parse().unwrap())
}
None => {
if let Some(ref md) = loc.executor_meta {
(md.host.clone(), md.port)
} else {
Err(Status::internal(
"Invalid partition location, missing executor metadata and advertise_host flag is undefined.".to_string(),
))?
}
}
};

let fetch = if let Some(ref id) = loc.partition_id {
let fetch = protobuf::FetchPartition {
job_id: id.job_id.clone(),
Expand All @@ -213,9 +228,7 @@ impl FlightSqlServiceImpl {
port,
};
protobuf::Action {
action_type: Some(protobuf::action::ActionType::FetchPartition(
fetch,
)),
action_type: Some(FetchPartition(fetch)),
settings: vec![],
}
} else {
Expand Down Expand Up @@ -457,9 +470,11 @@ impl FlightSqlService for FlightSqlServiceImpl {
let stream = flight_client
.do_get(request)
.await
.map_err(|e| Status::internal(format!("{:?}", e)))?
.map_err(|e| Status::internal(format!("Error from within flight_client.do_get(): {:?}\n", e)))?
.into_inner();
return Ok(Response::new(Box::pin(stream)));
} else {
println!("Empty type url! type url: {}", message.type_url);
}

Err(Status::unimplemented(format!(
Expand Down
6 changes: 3 additions & 3 deletions ballista/rust/scheduler/src/main.rs
Expand Up @@ -72,7 +72,7 @@ async fn start_server(
config_backend: Arc<dyn StateBackendClient>,
addr: SocketAddr,
policy: TaskSchedulingPolicy,
advertise_host: String,
advertise_host: Option<String>,
) -> Result<()> {
info!(
"Ballista v{} Scheduler listening on {:?}",
Expand All @@ -92,13 +92,13 @@ async fn start_server(
policy,
BallistaCodec::default(),
default_session_builder,
Some(advertise_host),
advertise_host,
),
_ => SchedulerServer::new(
scheduler_name,
config_backend.clone(),
BallistaCodec::default(),
Some(advertise_host),
advertise_host,
),
};

Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/scheduler/src/scheduler_server/mod.rs
Expand Up @@ -54,7 +54,7 @@ pub(crate) type SessionBuilder = fn(SessionConfig) -> SessionState;
#[derive(Clone)]
pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
pub scheduler_name: String,
pub advertise_host: String,
pub advertise_host: Option<String>,
pub(crate) state: Arc<SchedulerState<T, U>>,
pub start_time: u128,
policy: TaskSchedulingPolicy,
Expand Down Expand Up @@ -133,7 +133,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
.as_millis(),
policy,
query_stage_event_loop,
advertise_host: advertise_host.unwrap_or("127.0.0.1".to_string()),
advertise_host,
}
}

Expand Down

0 comments on commit 34f920e

Please sign in to comment.