From 34f920edebd548305155ac2981fd75ce8ca44f0a Mon Sep 17 00:00:00 2001 From: Dalton Modlin Date: Tue, 4 Oct 2022 11:49:32 -0600 Subject: [PATCH] Add flag to select which advertise IP for arrow Flight SQL #2 - Update logic in job_to_fetch_part to use advertise-host flag when it exists - Remove default from advertise_host in scheduler_config_spec.toml - Wrap scheduler_server advertise_host variable in Option - Update scheduler's main.rs to reflect advertise_host being wrapped in Option --- .../rust/scheduler/scheduler_config_spec.toml | 3 +- ballista/rust/scheduler/src/flight_sql.rs | 35 +++++++++++++------ ballista/rust/scheduler/src/main.rs | 6 ++-- .../scheduler/src/scheduler_server/mod.rs | 4 +-- 4 files changed, 31 insertions(+), 17 deletions(-) diff --git a/ballista/rust/scheduler/scheduler_config_spec.toml b/ballista/rust/scheduler/scheduler_config_spec.toml index 8e44689e6..1350b9382 100644 --- a/ballista/rust/scheduler/scheduler_config_spec.toml +++ b/ballista/rust/scheduler/scheduler_config_spec.toml @@ -27,8 +27,7 @@ doc = "Print version of this executable" [[param]] name = "advertise_host" type = "String" -default = "std::string::String::from(\"127.0.0.1\")" -doc = "IP address for proxying flight results via scheduler." +doc = "Route for proxying flight results via scheduler. Should be of the form 'IP:PORT'" [[param]] abbr = "b" diff --git a/ballista/rust/scheduler/src/flight_sql.rs b/ballista/rust/scheduler/src/flight_sql.rs index ede191081..7ab6fb9bd 100644 --- a/ballista/rust/scheduler/src/flight_sql.rs +++ b/ballista/rust/scheduler/src/flight_sql.rs @@ -196,13 +196,28 @@ impl FlightSqlServiceImpl { ) -> Result, Status> { let mut fieps: Vec<_> = vec![]; for loc in completed.partition_location.iter() { - let (host, port) = if let Some(ref md) = loc.executor_meta { - (md.host.clone(), md.port) - } else { - Err(Status::internal( - "Invalid partition location, missing executor metadata".to_string(), - ))? + let (host, port) = match self.server.advertise_host { + Some(_) => { + let advertise_host_flag: Vec<&str> = self + .server + .advertise_host + .as_ref() + .unwrap() + .split(":") + .collect(); + (advertise_host_flag[0].to_string(), advertise_host_flag[1].parse().unwrap()) + } + None => { + if let Some(ref md) = loc.executor_meta { + (md.host.clone(), md.port) + } else { + Err(Status::internal( + "Invalid partition location, missing executor metadata and advertise_host flag is undefined.".to_string(), + ))? + } + } }; + let fetch = if let Some(ref id) = loc.partition_id { let fetch = protobuf::FetchPartition { job_id: id.job_id.clone(), @@ -213,9 +228,7 @@ impl FlightSqlServiceImpl { port, }; protobuf::Action { - action_type: Some(protobuf::action::ActionType::FetchPartition( - fetch, - )), + action_type: Some(FetchPartition(fetch)), settings: vec![], } } else { @@ -457,9 +470,11 @@ impl FlightSqlService for FlightSqlServiceImpl { let stream = flight_client .do_get(request) .await - .map_err(|e| Status::internal(format!("{:?}", e)))? + .map_err(|e| Status::internal(format!("Error from within flight_client.do_get(): {:?}\n", e)))? .into_inner(); return Ok(Response::new(Box::pin(stream))); + } else { + println!("Empty type url! type url: {}", message.type_url); } Err(Status::unimplemented(format!( diff --git a/ballista/rust/scheduler/src/main.rs b/ballista/rust/scheduler/src/main.rs index 2c60d1a23..95aab18bd 100644 --- a/ballista/rust/scheduler/src/main.rs +++ b/ballista/rust/scheduler/src/main.rs @@ -72,7 +72,7 @@ async fn start_server( config_backend: Arc, addr: SocketAddr, policy: TaskSchedulingPolicy, - advertise_host: String, + advertise_host: Option, ) -> Result<()> { info!( "Ballista v{} Scheduler listening on {:?}", @@ -92,13 +92,13 @@ async fn start_server( policy, BallistaCodec::default(), default_session_builder, - Some(advertise_host), + advertise_host, ), _ => SchedulerServer::new( scheduler_name, config_backend.clone(), BallistaCodec::default(), - Some(advertise_host), + advertise_host, ), }; diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs index 462e4ec25..be186da9c 100644 --- a/ballista/rust/scheduler/src/scheduler_server/mod.rs +++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs @@ -54,7 +54,7 @@ pub(crate) type SessionBuilder = fn(SessionConfig) -> SessionState; #[derive(Clone)] pub struct SchedulerServer { pub scheduler_name: String, - pub advertise_host: String, + pub advertise_host: Option, pub(crate) state: Arc>, pub start_time: u128, policy: TaskSchedulingPolicy, @@ -133,7 +133,7 @@ impl SchedulerServer