Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into like-expr
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Sep 6, 2022
2 parents 8143e6e + 191d8b7 commit 7483e3e
Show file tree
Hide file tree
Showing 25 changed files with 506 additions and 354 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,9 @@ jobs:
rustup default stable
rustup component add rustfmt
- name: Run
run: ci/scripts/rust_fmt.sh
run: |
echo '' > datafusion/proto/src/generated/datafusion.rs
ci/scripts/rust_fmt.sh
coverage:
name: coverage
Expand Down
4 changes: 4 additions & 0 deletions datafusion-examples/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

# DataFusion Examples

Prerequisites:

Run `git submodule update --init` to init test files.

## Single Process

The examples `csv_sql.rs` and `parquet_sql.rs` demonstrate building a query plan from a SQL statement and then executing the query plan against local CSV and Parquet files, respectively.
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ url = "2.2"
uuid = { version = "1.0", features = ["v4"] }

[dev-dependencies]
async-trait = "0.1.53"
criterion = "0.3"
csv = "1.1.6"
ctor = "0.1.22"
Expand Down
9 changes: 9 additions & 0 deletions datafusion/core/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,12 @@ pub trait TableProvider: Sync + Send {
Ok(TableProviderFilterPushDown::Unsupported)
}
}

/// A factory which creates [`TableProvider`]s at runtime given a URL.
///
/// For example, this can be used to create a table "on the fly"
/// from a directory of files only when that name is referenced.
pub trait TableProviderFactory: Sync + Send {
/// Create a TableProvider given name and url
fn create(&self, name: &str, url: &str) -> Arc<dyn TableProvider>;
}
176 changes: 104 additions & 72 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use crate::logical_plan::{
provider_as_source, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, DropTable, FileType, FunctionRegistry, LogicalPlan,
CreateMemoryTable, CreateView, DropTable, FunctionRegistry, LogicalPlan,
LogicalPlanBuilder, UNNAMED_TABLE,
};
use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
Expand All @@ -90,6 +90,7 @@ use crate::config::{
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES,
};
use crate::datasource::datasource::TableProviderFactory;
use crate::execution::runtime_env::RuntimeEnv;
use crate::logical_plan::plan::Explain;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
Expand Down Expand Up @@ -176,6 +177,8 @@ pub struct SessionContext {
pub session_start_time: DateTime<Utc>,
/// Shared session state for the session
pub state: Arc<RwLock<SessionState>>,
/// Dynamic table providers
pub table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
}

impl Default for SessionContext {
Expand Down Expand Up @@ -203,6 +206,7 @@ impl SessionContext {
session_id: state.session_id.clone(),
session_start_time: chrono::Utc::now(),
state: Arc::new(RwLock::new(state)),
table_factories: HashMap::default(),
}
}

Expand All @@ -212,9 +216,19 @@ impl SessionContext {
session_id: state.session_id.clone(),
session_start_time: chrono::Utc::now(),
state: Arc::new(RwLock::new(state)),
table_factories: HashMap::default(),
}
}

/// Register a `TableProviderFactory` for a given `file_type` identifier
pub fn register_table_factory(
&mut self,
file_type: &str,
factory: Arc<dyn TableProviderFactory>,
) {
self.table_factories.insert(file_type.to_string(), factory);
}

/// Return the [RuntimeEnv] used to run queries with this [SessionContext]
pub fn runtime_env(&self) -> Arc<RuntimeEnv> {
self.state.read().runtime_env.clone()
Expand All @@ -237,70 +251,12 @@ impl SessionContext {
pub async fn sql(&self, sql: &str) -> Result<Arc<DataFrame>> {
let plan = self.create_logical_plan(sql)?;
match plan {
LogicalPlan::CreateExternalTable(CreateExternalTable {
ref schema,
ref name,
ref location,
ref file_type,
ref has_header,
ref delimiter,
ref table_partition_cols,
ref if_not_exists,
}) => {
let (file_format, file_extension) = match file_type {
FileType::CSV => (
Arc::new(
CsvFormat::default()
.with_has_header(*has_header)
.with_delimiter(*delimiter as u8),
) as Arc<dyn FileFormat>,
DEFAULT_CSV_EXTENSION,
),
FileType::Parquet => (
Arc::new(ParquetFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_PARQUET_EXTENSION,
),
FileType::Avro => (
Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_AVRO_EXTENSION,
),
FileType::NdJson => (
Arc::new(JsonFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_JSON_EXTENSION,
),
};
let table = self.table(name.as_str());
match (if_not_exists, table) {
(true, Ok(_)) => self.return_empty_dataframe(),
(_, Err(_)) => {
// TODO make schema in CreateExternalTable optional instead of empty
let provided_schema = if schema.fields().is_empty() {
None
} else {
Some(Arc::new(schema.as_ref().to_owned().into()))
};
let options = ListingOptions {
format: file_format,
collect_stat: false,
file_extension: file_extension.to_owned(),
target_partitions: self.copied_config().target_partitions,
table_partition_cols: table_partition_cols.clone(),
};
self.register_listing_table(
name,
location,
options,
provided_schema,
)
.await?;
self.return_empty_dataframe()
}
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
"Table '{:?}' already exists",
name
))),
LogicalPlan::CreateExternalTable(cmd) => match cmd.file_type.as_str() {
"PARQUET" | "CSV" | "JSON" | "AVRO" => {
self.create_listing_table(&cmd).await
}
}
_ => self.create_custom_table(&cmd).await,
},

LogicalPlan::CreateMemoryTable(CreateMemoryTable {
name,
Expand All @@ -314,35 +270,33 @@ impl SessionContext {
(true, false, Ok(_)) => self.return_empty_dataframe(),
(false, true, Ok(_)) => {
self.deregister_table(name.as_str())?;
let plan = self.optimize(&input)?;
let physical =
Arc::new(DataFrame::new(self.state.clone(), &plan));
Arc::new(DataFrame::new(self.state.clone(), &input));

let batches: Vec<_> = physical.collect_partitioned().await?;
let table = Arc::new(MemTable::try_new(
Arc::new(plan.schema().as_ref().into()),
Arc::new(input.schema().as_ref().into()),
batches,
)?);

self.register_table(name.as_str(), table)?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
self.return_empty_dataframe()
}
(true, true, Ok(_)) => Err(DataFusionError::Internal(
"'IF NOT EXISTS' cannot coexist with 'REPLACE'".to_string(),
)),
(_, _, Err(_)) => {
let plan = self.optimize(&input)?;
let physical =
Arc::new(DataFrame::new(self.state.clone(), &plan));
Arc::new(DataFrame::new(self.state.clone(), &input));

let batches: Vec<_> = physical.collect_partitioned().await?;
let table = Arc::new(MemTable::try_new(
Arc::new(plan.schema().as_ref().into()),
Arc::new(input.schema().as_ref().into()),
batches,
)?);

self.register_table(name.as_str(), table)?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
self.return_empty_dataframe()
}
(false, false, Ok(_)) => Err(DataFusionError::Execution(format!(
"Table '{:?}' already exists",
Expand Down Expand Up @@ -481,6 +435,84 @@ impl SessionContext {
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}

async fn create_custom_table(
&self,
cmd: &CreateExternalTable,
) -> Result<Arc<DataFrame>> {
let factory = &self.table_factories.get(&cmd.file_type).ok_or_else(|| {
DataFusionError::Execution(format!(
"Unable to find factory for {}",
cmd.file_type
))
})?;
let table = (*factory).create(cmd.name.as_str(), cmd.location.as_str());
self.register_table(cmd.name.as_str(), table)?;
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}

async fn create_listing_table(
&self,
cmd: &CreateExternalTable,
) -> Result<Arc<DataFrame>> {
let (file_format, file_extension) = match cmd.file_type.as_str() {
"CSV" => (
Arc::new(
CsvFormat::default()
.with_has_header(cmd.has_header)
.with_delimiter(cmd.delimiter as u8),
) as Arc<dyn FileFormat>,
DEFAULT_CSV_EXTENSION,
),
"PARQUET" => (
Arc::new(ParquetFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_PARQUET_EXTENSION,
),
"AVRO" => (
Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_AVRO_EXTENSION,
),
"JSON" => (
Arc::new(JsonFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_JSON_EXTENSION,
),
_ => Err(DataFusionError::Execution(
"Only known FileTypes can be ListingTables!".to_string(),
))?,
};
let table = self.table(cmd.name.as_str());
match (cmd.if_not_exists, table) {
(true, Ok(_)) => self.return_empty_dataframe(),
(_, Err(_)) => {
// TODO make schema in CreateExternalTable optional instead of empty
let provided_schema = if cmd.schema.fields().is_empty() {
None
} else {
Some(Arc::new(cmd.schema.as_ref().to_owned().into()))
};
let options = ListingOptions {
format: file_format,
collect_stat: false,
file_extension: file_extension.to_owned(),
target_partitions: self.copied_config().target_partitions,
table_partition_cols: cmd.table_partition_cols.clone(),
};
self.register_listing_table(
cmd.name.as_str(),
cmd.location.clone(),
options,
provided_schema,
)
.await?;
self.return_empty_dataframe()
}
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
"Table '{:?}' already exists",
cmd.name
))),
}
}

fn find_and_deregister<'a>(
&self,
table_ref: impl Into<TableReference<'a>>,
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ pub use datafusion_expr::{
build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
},
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
CreateView, CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint,
JoinType, Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition,
StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values,
CreateView, CrossJoin, DropTable, EmptyRelation, JoinConstraint, JoinType, Limit,
LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition, StringifiedPlan,
Subquery, TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values,
},
lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, nullif,
octet_length, or, power, random, regexp_match, regexp_replace, repeat, replace,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ pub use datafusion_expr::{
display::{GraphvizVisitor, IndentVisitor},
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, EmptyRelation,
Explain, Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit,
LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values, Window,
},
Expand Down
15 changes: 7 additions & 8 deletions datafusion/core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ pub use crate::execution::context::{SessionConfig, SessionContext};
pub use crate::execution::options::{
AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
};
pub use crate::logical_plan::{
approx_percentile_cont, array, ascii, avg, bit_length, btrim, cast, character_length,
chr, coalesce, col, concat, concat_ws, count, create_udf, date_part, date_trunc,
digest, exists, from_unixtime, in_list, in_subquery, initcap, left, length, lit,
lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, octet_length,
random, regexp_match, regexp_replace, repeat, replace, reverse, right, rpad, rtrim,
scalar_subquery, sha224, sha256, sha384, sha512, split_part, starts_with, strpos,
substr, sum, to_hex, translate, trim, upper, Column, Expr, JoinType, Partitioning,

pub use datafusion_common::Column;
pub use datafusion_expr::{
expr_fn::*,
lit,
logical_plan::{JoinType, Partitioning},
Expr,
};

0 comments on commit 7483e3e

Please sign in to comment.