Skip to content

Commit

Permalink
Rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
Brent Gardner committed Sep 5, 2022
1 parent a40d4e4 commit 761f384
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 73 deletions.
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ pub trait TableProvider: Sync + Send {
}

/// A factory which creates [`TableProvider`]s at runtime given a URL.
///
/// For example, this can be used to create a table "on the fly"
///
/// For example, this can be used to create a table "on the fly"
/// from a directory of files only when that name is referenced.
pub trait TableProviderFactory: Sync + Send {
/// Create a TableProvider given name and url
Expand Down
162 changes: 98 additions & 64 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use crate::logical_plan::{
provider_as_source, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, DropTable, FileType, FunctionRegistry, LogicalPlan,
CreateMemoryTable, CreateView, DropTable, FunctionRegistry, LogicalPlan,
LogicalPlanBuilder, UNNAMED_TABLE,
};
use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
Expand All @@ -90,6 +90,7 @@ use crate::config::{
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES,
};
use crate::datasource::datasource::TableProviderFactory;
use crate::execution::runtime_env::RuntimeEnv;
use crate::logical_plan::plan::Explain;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
Expand Down Expand Up @@ -175,6 +176,8 @@ pub struct SessionContext {
pub session_start_time: DateTime<Utc>,
/// Shared session state for the session
pub state: Arc<RwLock<SessionState>>,
/// Dynamic table providers
pub table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
}

impl Default for SessionContext {
Expand Down Expand Up @@ -202,6 +205,7 @@ impl SessionContext {
session_id: state.session_id.clone(),
session_start_time: chrono::Utc::now(),
state: Arc::new(RwLock::new(state)),
table_factories: HashMap::default(),
}
}

Expand All @@ -211,9 +215,19 @@ impl SessionContext {
session_id: state.session_id.clone(),
session_start_time: chrono::Utc::now(),
state: Arc::new(RwLock::new(state)),
table_factories: HashMap::default(),
}
}

/// Register a `TableProviderFactory` for a given `file_type` identifier
pub fn register_table_factory(
&mut self,
file_type: &str,
factory: Arc<dyn TableProviderFactory>,
) {
self.table_factories.insert(file_type.to_string(), factory);
}

/// Return the [RuntimeEnv] used to run queries with this [SessionContext]
pub fn runtime_env(&self) -> Arc<RuntimeEnv> {
self.state.read().runtime_env.clone()
Expand All @@ -236,70 +250,12 @@ impl SessionContext {
pub async fn sql(&self, sql: &str) -> Result<Arc<DataFrame>> {
let plan = self.create_logical_plan(sql)?;
match plan {
LogicalPlan::CreateExternalTable(CreateExternalTable {
ref schema,
ref name,
ref location,
ref file_type,
ref has_header,
ref delimiter,
ref table_partition_cols,
ref if_not_exists,
}) => {
let (file_format, file_extension) = match file_type {
FileType::CSV => (
Arc::new(
CsvFormat::default()
.with_has_header(*has_header)
.with_delimiter(*delimiter as u8),
) as Arc<dyn FileFormat>,
DEFAULT_CSV_EXTENSION,
),
FileType::Parquet => (
Arc::new(ParquetFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_PARQUET_EXTENSION,
),
FileType::Avro => (
Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_AVRO_EXTENSION,
),
FileType::NdJson => (
Arc::new(JsonFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_JSON_EXTENSION,
),
};
let table = self.table(name.as_str());
match (if_not_exists, table) {
(true, Ok(_)) => self.return_empty_dataframe(),
(_, Err(_)) => {
// TODO make schema in CreateExternalTable optional instead of empty
let provided_schema = if schema.fields().is_empty() {
None
} else {
Some(Arc::new(schema.as_ref().to_owned().into()))
};
let options = ListingOptions {
format: file_format,
collect_stat: false,
file_extension: file_extension.to_owned(),
target_partitions: self.copied_config().target_partitions,
table_partition_cols: table_partition_cols.clone(),
};
self.register_listing_table(
name,
location,
options,
provided_schema,
)
.await?;
self.return_empty_dataframe()
}
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
"Table '{:?}' already exists",
name
))),
LogicalPlan::CreateExternalTable(cmd) => match cmd.file_type.as_str() {
"PARQUET" | "CSV" | "JSON" | "AVRO" => {
self.create_listing_table(&cmd).await
}
}
_ => self.create_custom_table(&cmd).await,
},

LogicalPlan::CreateMemoryTable(CreateMemoryTable {
name,
Expand Down Expand Up @@ -480,6 +436,84 @@ impl SessionContext {
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}

async fn create_custom_table(
&self,
cmd: &CreateExternalTable,
) -> Result<Arc<DataFrame>> {
let factory = &self.table_factories.get(&cmd.file_type).ok_or_else(|| {
DataFusionError::Execution(format!(
"Unable to find factory for {}",
cmd.file_type
))
})?;
let table = (*factory).create(cmd.name.as_str(), cmd.location.as_str());
self.register_table(cmd.name.as_str(), table)?;
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}

async fn create_listing_table(
&self,
cmd: &CreateExternalTable,
) -> Result<Arc<DataFrame>> {
let (file_format, file_extension) = match cmd.file_type.as_str() {
"CSV" => (
Arc::new(
CsvFormat::default()
.with_has_header(cmd.has_header)
.with_delimiter(cmd.delimiter as u8),
) as Arc<dyn FileFormat>,
DEFAULT_CSV_EXTENSION,
),
"PARQUET" => (
Arc::new(ParquetFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_PARQUET_EXTENSION,
),
"AVRO" => (
Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_AVRO_EXTENSION,
),
"JSON" => (
Arc::new(JsonFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_JSON_EXTENSION,
),
_ => Err(DataFusionError::Execution(
"Only known FileTypes can be ListingTables!".to_string(),
))?,
};
let table = self.table(cmd.name.as_str());
match (cmd.if_not_exists, table) {
(true, Ok(_)) => self.return_empty_dataframe(),
(_, Err(_)) => {
// TODO make schema in CreateExternalTable optional instead of empty
let provided_schema = if cmd.schema.fields().is_empty() {
None
} else {
Some(Arc::new(cmd.schema.as_ref().to_owned().into()))
};
let options = ListingOptions {
format: file_format,
collect_stat: false,
file_extension: file_extension.to_owned(),
target_partitions: self.copied_config().target_partitions,
table_partition_cols: cmd.table_partition_cols.clone(),
};
self.register_listing_table(
cmd.name.as_str(),
cmd.location.clone(),
options,
provided_schema,
)
.await?;
self.return_empty_dataframe()
}
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
"Table '{:?}' already exists",
cmd.name
))),
}
}

fn find_and_deregister<'a>(
&self,
table_ref: impl Into<TableReference<'a>>,
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/tests/sql/create_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,10 @@ async fn create_bad_custom_table() {
match res {
Ok(_) => panic!("Registration of tables without factories should fail"),
Err(e) => {
assert!(e.to_string().contains("Unable to find factory for"), "Registration of tables without factories should throw correct error")
}
assert!(
e.to_string().contains("Unable to find factory for"),
"Registration of tables without factories should throw correct error"
)
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ pub use builder::{table_scan, LogicalPlanBuilder};
pub use plan::{
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, DropView,
EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort, StringifiedPlan,
Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Values, Window,
EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit,
LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
Values, Window,
};

pub use display::display_schema;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use datafusion_expr::expr_rewriter::normalize_col_with_schemas;
use datafusion_expr::logical_plan::{
Analyze, CreateCatalog, CreateCatalogSchema,
CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, CreateView,
DropTable, DropView, Explain, JoinType, LogicalPlan, LogicalPlanBuilder, Partitioning,
PlanType, ToStringifiedPlan,
DropTable, DropView, Explain, JoinType, LogicalPlan, LogicalPlanBuilder,
Partitioning, PlanType, ToStringifiedPlan,
};
use datafusion_expr::utils::{
can_hash, expand_qualified_wildcard, expand_wildcard, expr_as_column_expr,
Expand Down

0 comments on commit 761f384

Please sign in to comment.