Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom / Dynamic table provider factories #3311

Merged
merged 2 commits into from
Sep 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,9 @@ jobs:
rustup default stable
rustup component add rustfmt
- name: Run
run: ci/scripts/rust_fmt.sh
run: |
echo '' > datafusion/proto/src/generated/datafusion.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a left over from #3333 -- but I also don't think it hurts to leave it in this PR

ci/scripts/rust_fmt.sh

coverage:
name: coverage
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ url = "2.2"
uuid = { version = "1.0", features = ["v4"] }

[dev-dependencies]
async-trait = "0.1.53"
criterion = "0.3"
csv = "1.1.6"
ctor = "0.1.22"
Expand Down
9 changes: 9 additions & 0 deletions datafusion/core/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,12 @@ pub trait TableProvider: Sync + Send {
Ok(TableProviderFilterPushDown::Unsupported)
}
}

/// A factory which creates [`TableProvider`]s at runtime given a URL.
///
/// For example, this can be used to create a table "on the fly"
/// from a directory of files only when that name is referenced.
pub trait TableProviderFactory: Sync + Send {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should create be async?

The TableProvider will have to know the Schema which will most likely have to be inferred by reading from the ObjectStore (or involve a network call otherwise if there is some sort of metastore involved).

/// Create a TableProvider given name and url
fn create(&self, name: &str, url: &str) -> Arc<dyn TableProvider>;
}
162 changes: 98 additions & 64 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use crate::logical_plan::{
provider_as_source, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, DropTable, FileType, FunctionRegistry, LogicalPlan,
CreateMemoryTable, CreateView, DropTable, FunctionRegistry, LogicalPlan,
LogicalPlanBuilder, UNNAMED_TABLE,
};
use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
Expand All @@ -90,6 +90,7 @@ use crate::config::{
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES,
};
use crate::datasource::datasource::TableProviderFactory;
use crate::execution::runtime_env::RuntimeEnv;
use crate::logical_plan::plan::Explain;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
Expand Down Expand Up @@ -175,6 +176,8 @@ pub struct SessionContext {
pub session_start_time: DateTime<Utc>,
/// Shared session state for the session
pub state: Arc<RwLock<SessionState>>,
/// Dynamic table providers
pub table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
}

impl Default for SessionContext {
Expand Down Expand Up @@ -202,6 +205,7 @@ impl SessionContext {
session_id: state.session_id.clone(),
session_start_time: chrono::Utc::now(),
state: Arc::new(RwLock::new(state)),
table_factories: HashMap::default(),
}
}

Expand All @@ -211,9 +215,19 @@ impl SessionContext {
session_id: state.session_id.clone(),
session_start_time: chrono::Utc::now(),
state: Arc::new(RwLock::new(state)),
table_factories: HashMap::default(),
}
}

/// Register a `TableProviderFactory` for a given `file_type` identifier
pub fn register_table_factory(
&mut self,
file_type: &str,
factory: Arc<dyn TableProviderFactory>,
) {
self.table_factories.insert(file_type.to_string(), factory);
}

/// Return the [RuntimeEnv] used to run queries with this [SessionContext]
pub fn runtime_env(&self) -> Arc<RuntimeEnv> {
self.state.read().runtime_env.clone()
Expand All @@ -236,70 +250,12 @@ impl SessionContext {
pub async fn sql(&self, sql: &str) -> Result<Arc<DataFrame>> {
let plan = self.create_logical_plan(sql)?;
match plan {
LogicalPlan::CreateExternalTable(CreateExternalTable {
ref schema,
ref name,
ref location,
ref file_type,
ref has_header,
ref delimiter,
ref table_partition_cols,
ref if_not_exists,
}) => {
let (file_format, file_extension) = match file_type {
FileType::CSV => (
Arc::new(
CsvFormat::default()
.with_has_header(*has_header)
.with_delimiter(*delimiter as u8),
) as Arc<dyn FileFormat>,
DEFAULT_CSV_EXTENSION,
),
FileType::Parquet => (
Arc::new(ParquetFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_PARQUET_EXTENSION,
),
FileType::Avro => (
Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_AVRO_EXTENSION,
),
FileType::NdJson => (
Arc::new(JsonFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_JSON_EXTENSION,
),
};
let table = self.table(name.as_str());
match (if_not_exists, table) {
(true, Ok(_)) => self.return_empty_dataframe(),
(_, Err(_)) => {
// TODO make schema in CreateExternalTable optional instead of empty
let provided_schema = if schema.fields().is_empty() {
None
} else {
Some(Arc::new(schema.as_ref().to_owned().into()))
};
let options = ListingOptions {
format: file_format,
collect_stat: false,
file_extension: file_extension.to_owned(),
target_partitions: self.copied_config().target_partitions,
table_partition_cols: table_partition_cols.clone(),
};
self.register_listing_table(
name,
location,
options,
provided_schema,
)
.await?;
self.return_empty_dataframe()
}
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
"Table '{:?}' already exists",
name
))),
LogicalPlan::CreateExternalTable(cmd) => match cmd.file_type.as_str() {
"PARQUET" | "CSV" | "JSON" | "AVRO" => {
self.create_listing_table(&cmd).await
}
}
_ => self.create_custom_table(&cmd).await,
},

LogicalPlan::CreateMemoryTable(CreateMemoryTable {
name,
Expand Down Expand Up @@ -480,6 +436,84 @@ impl SessionContext {
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}

async fn create_custom_table(
&self,
cmd: &CreateExternalTable,
) -> Result<Arc<DataFrame>> {
let factory = &self.table_factories.get(&cmd.file_type).ok_or_else(|| {
DataFusionError::Execution(format!(
"Unable to find factory for {}",
cmd.file_type
))
})?;
let table = (*factory).create(cmd.name.as_str(), cmd.location.as_str());
self.register_table(cmd.name.as_str(), table)?;
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}

async fn create_listing_table(
&self,
cmd: &CreateExternalTable,
) -> Result<Arc<DataFrame>> {
let (file_format, file_extension) = match cmd.file_type.as_str() {
"CSV" => (
Arc::new(
CsvFormat::default()
.with_has_header(cmd.has_header)
.with_delimiter(cmd.delimiter as u8),
) as Arc<dyn FileFormat>,
DEFAULT_CSV_EXTENSION,
),
"PARQUET" => (
Arc::new(ParquetFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_PARQUET_EXTENSION,
),
"AVRO" => (
Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_AVRO_EXTENSION,
),
"JSON" => (
Arc::new(JsonFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_JSON_EXTENSION,
),
_ => Err(DataFusionError::Execution(
"Only known FileTypes can be ListingTables!".to_string(),
))?,
};
let table = self.table(cmd.name.as_str());
match (cmd.if_not_exists, table) {
(true, Ok(_)) => self.return_empty_dataframe(),
(_, Err(_)) => {
// TODO make schema in CreateExternalTable optional instead of empty
let provided_schema = if cmd.schema.fields().is_empty() {
None
} else {
Some(Arc::new(cmd.schema.as_ref().to_owned().into()))
};
let options = ListingOptions {
format: file_format,
collect_stat: false,
file_extension: file_extension.to_owned(),
target_partitions: self.copied_config().target_partitions,
table_partition_cols: cmd.table_partition_cols.clone(),
};
self.register_listing_table(
cmd.name.as_str(),
cmd.location.clone(),
options,
provided_schema,
)
.await?;
self.return_empty_dataframe()
}
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
"Table '{:?}' already exists",
cmd.name
))),
}
}

fn find_and_deregister<'a>(
&self,
table_ref: impl Into<TableReference<'a>>,
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ pub use datafusion_expr::{
build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
},
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
CreateView, CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint,
JoinType, Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition,
StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values,
CreateView, CrossJoin, DropTable, EmptyRelation, JoinConstraint, JoinType, Limit,
LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition, StringifiedPlan,
Subquery, TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values,
},
lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, nullif,
octet_length, or, power, random, regexp_match, regexp_replace, repeat, replace,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ pub use datafusion_expr::{
display::{GraphvizVisitor, IndentVisitor},
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, EmptyRelation,
Explain, Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit,
LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values, Window,
},
Expand Down
75 changes: 75 additions & 0 deletions datafusion/core/tests/sql/create_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use async_trait::async_trait;
use std::any::Any;
use std::io::Write;

use datafusion::datasource::datasource::TableProviderFactory;
use datafusion::execution::context::SessionState;
use datafusion_expr::TableType;
use tempfile::TempDir;

use super::*;
Expand Down Expand Up @@ -360,6 +365,76 @@ async fn create_pipe_delimited_csv_table() -> Result<()> {
Ok(())
}

struct TestTableProvider {}

impl TestTableProvider {}

#[async_trait]
impl TableProvider for TestTableProvider {
fn as_any(&self) -> &dyn Any {
unimplemented!("TestTableProvider is a stub for testing.")
}

fn schema(&self) -> SchemaRef {
unimplemented!("TestTableProvider is a stub for testing.")
}

fn table_type(&self) -> TableType {
unimplemented!("TestTableProvider is a stub for testing.")
}

async fn scan(
&self,
_ctx: &SessionState,
_projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!("TestTableProvider is a stub for testing.")
}
}

struct TestTableFactory {}

impl TableProviderFactory for TestTableFactory {
fn create(&self, _name: &str, _path: &str) -> Arc<dyn TableProvider> {
Arc::new(TestTableProvider {})
}
}

#[tokio::test]
async fn create_custom_table() -> Result<()> {
let mut ctx = SessionContext::new();
ctx.register_table_factory("DELTATABLE", Arc::new(TestTableFactory {}));

let sql = "CREATE EXTERNAL TABLE dt STORED AS DELTATABLE LOCATION 's3://bucket/schema/table';";
ctx.sql(sql).await.unwrap();

Comment on lines +408 to +412
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is very cool -- I like it a lot 💯

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, affirmation of general direction was what I was looking for, I'll clean it up and get it submitted, ty!

let cat = ctx.catalog("datafusion").unwrap();
let schema = cat.schema("public").unwrap();
let exists = schema.table_exist("dt");
assert!(exists, "Table should have been created!");

Ok(())
}

#[tokio::test]
async fn create_bad_custom_table() {
let ctx = SessionContext::new();

let sql = "CREATE EXTERNAL TABLE dt STORED AS DELTATABLE LOCATION 's3://bucket/schema/table';";
let res = ctx.sql(sql).await;
match res {
Ok(_) => panic!("Registration of tables without factories should fail"),
Err(e) => {
assert!(
e.to_string().contains("Unable to find factory for"),
"Registration of tables without factories should throw correct error"
)
}
}
}

#[tokio::test]
async fn create_csv_table_empty_file() -> Result<()> {
let ctx =
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/sql/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,7 @@ async fn timestamp_sub_interval_days() -> Result<()> {
}

#[tokio::test]
#[ignore] // https://github.com/apache/arrow-datafusion/issues/3327
async fn timestamp_add_interval_months() -> Result<()> {
let ctx = SessionContext::new();

Expand Down
6 changes: 3 additions & 3 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ pub use builder::{table_scan, LogicalPlanBuilder};
pub use plan::{
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, DropView,
EmptyRelation, Explain, Extension, FileType, Filter, Join, JoinConstraint, JoinType,
Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition,
Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit,
LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
Values, Window,
};

Expand Down