Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into alamb/update_arrow_…
Browse files Browse the repository at this point in the history
…19.0.0
  • Loading branch information
alamb committed Jul 23, 2022
2 parents 924a40d + 7b0f2f8 commit dde2b4f
Show file tree
Hide file tree
Showing 25 changed files with 2,957 additions and 18 deletions.
2 changes: 1 addition & 1 deletion benchmarks/queries/q20.sql
Expand Up @@ -28,7 +28,7 @@ where
l_partkey = ps_partkey
and l_suppkey = ps_suppkey
and l_shipdate >= date '1994-01-01'
and l_shipdate < 'date 1994-01-01' + interval '1' year
and l_shipdate < date '1994-01-01' + interval '1' year
)
)
and s_nationkey = n_nationkey
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Expand Up @@ -40,7 +40,7 @@ pyarrow = ["pyo3"]
[dependencies]
arrow = { version = "19.0.0", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.85.0", optional = true }
cranelift-module = { version = "0.86.1", optional = true }
object_store = { version = "0.3", optional = true }
ordered-float = "3.0"
parquet = { version = "19.0.0", features = ["arrow"], optional = true }
Expand Down
27 changes: 27 additions & 0 deletions datafusion/common/src/error.rs
Expand Up @@ -83,6 +83,30 @@ pub enum DataFusionError {
#[cfg(feature = "jit")]
/// Error occurs during code generation
JITError(ModuleError),
/// Error with additional context
Context(String, Box<DataFusionError>),
}

#[macro_export]
macro_rules! context {
($desc:expr, $err:expr) => {
datafusion_common::DataFusionError::Context(
format!("{} at {}:{}", $desc, file!(), line!()),
Box::new($err),
)
};
}

#[macro_export]
macro_rules! plan_err {
($desc:expr) => {
Err(datafusion_common::DataFusionError::Plan(format!(
"{} at {}:{}",
$desc,
file!(),
line!()
)))
};
}

/// Schema-related errors
Expand Down Expand Up @@ -285,6 +309,9 @@ impl Display for DataFusionError {
DataFusionError::ObjectStore(ref desc) => {
write!(f, "Object Store error: {}", desc)
}
DataFusionError::Context(ref desc, ref err) => {
write!(f, "{}\ncaused by\n{}", desc, *err)
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/Cargo.toml
Expand Up @@ -94,6 +94,8 @@ uuid = { version = "1.0", features = ["v4"] }

[dev-dependencies]
criterion = "0.3"
csv = "1.1.6"
ctor = "0.1.22"
doc-comment = "0.3"
env_logger = "0.9"
fuzz-utils = { path = "fuzz-utils" }
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/execution/context.rs
Expand Up @@ -102,6 +102,9 @@ use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion_common::ScalarValue;
use datafusion_expr::TableSource;
use datafusion_optimizer::decorrelate_scalar_subquery::DecorrelateScalarSubquery;
use datafusion_optimizer::decorrelate_where_exists::DecorrelateWhereExists;
use datafusion_optimizer::decorrelate_where_in::DecorrelateWhereIn;
use datafusion_optimizer::filter_null_join_keys::FilterNullJoinKeys;
use datafusion_sql::{
parser::DFParser,
Expand Down Expand Up @@ -1356,6 +1359,9 @@ impl SessionState {
// Simplify expressions first to maximize the chance
// of applying other optimizations
Arc::new(SimplifyExpressions::new()),
Arc::new(DecorrelateWhereExists::new()),
Arc::new(DecorrelateWhereIn::new()),
Arc::new(DecorrelateScalarSubquery::new()),
Arc::new(SubqueryFilterToJoin::new()),
Arc::new(EliminateFilter::new()),
Arc::new(CommonSubexprEliminate::new()),
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Expand Up @@ -35,7 +35,7 @@ use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use futures::stream::{Stream, StreamExt};
use log::debug;
use log::trace;

use super::expressions::PhysicalSortExpr;
use super::metrics::{BaselineMetrics, MetricsSet};
Expand Down Expand Up @@ -286,7 +286,7 @@ pub fn concat_batches(
)?;
arrays.push(array);
}
debug!(
trace!(
"Combined {} batches containing {} rows",
batches.len(),
row_count
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/file_format/mod.rs
Expand Up @@ -26,6 +26,8 @@ mod file_stream;
mod json;
mod parquet;

pub(crate) use self::csv::plan_to_csv;
pub use self::csv::CsvExec;
pub(crate) use self::parquet::plan_to_parquet;
pub use self::parquet::ParquetExec;
use arrow::{
Expand All @@ -36,8 +38,6 @@ use arrow::{
record_batch::RecordBatch,
};
pub use avro::AvroExec;
pub(crate) use csv::plan_to_csv;
pub use csv::CsvExec;
pub(crate) use json::plan_to_json;
pub use json::NdJsonExec;

Expand Down
111 changes: 110 additions & 1 deletion datafusion/core/tests/sql/mod.rs
Expand Up @@ -49,6 +49,7 @@ use datafusion_expr::Volatility;
use object_store::path::Path;
use std::fs::File;
use std::io::Write;
use std::ops::Sub;
use std::path::PathBuf;
use tempfile::TempDir;

Expand Down Expand Up @@ -108,6 +109,7 @@ mod explain;
mod idenfifers;
pub mod information_schema;
mod partitioned_csv;
mod subqueries;
#[cfg(feature = "unicode_expressions")]
pub mod unicode;

Expand Down Expand Up @@ -483,7 +485,43 @@ fn get_tpch_table_schema(table: &str) -> Schema {
Field::new("n_comment", DataType::Utf8, false),
]),

_ => unimplemented!(),
"supplier" => Schema::new(vec![
Field::new("s_suppkey", DataType::Int64, false),
Field::new("s_name", DataType::Utf8, false),
Field::new("s_address", DataType::Utf8, false),
Field::new("s_nationkey", DataType::Int64, false),
Field::new("s_phone", DataType::Utf8, false),
Field::new("s_acctbal", DataType::Float64, false),
Field::new("s_comment", DataType::Utf8, false),
]),

"partsupp" => Schema::new(vec![
Field::new("ps_partkey", DataType::Int64, false),
Field::new("ps_suppkey", DataType::Int64, false),
Field::new("ps_availqty", DataType::Int32, false),
Field::new("ps_supplycost", DataType::Float64, false),
Field::new("ps_comment", DataType::Utf8, false),
]),

"part" => Schema::new(vec![
Field::new("p_partkey", DataType::Int64, false),
Field::new("p_name", DataType::Utf8, false),
Field::new("p_mfgr", DataType::Utf8, false),
Field::new("p_brand", DataType::Utf8, false),
Field::new("p_type", DataType::Utf8, false),
Field::new("p_size", DataType::Int32, false),
Field::new("p_container", DataType::Utf8, false),
Field::new("p_retailprice", DataType::Float64, false),
Field::new("p_comment", DataType::Utf8, false),
]),

"region" => Schema::new(vec![
Field::new("r_regionkey", DataType::Int64, false),
Field::new("r_name", DataType::Utf8, false),
Field::new("r_comment", DataType::Utf8, false),
]),

_ => unimplemented!("Table: {}", table),
}
}

Expand All @@ -499,6 +537,77 @@ async fn register_tpch_csv(ctx: &SessionContext, table: &str) -> Result<()> {
Ok(())
}

async fn register_tpch_csv_data(
ctx: &SessionContext,
table_name: &str,
data: &str,
) -> Result<()> {
let schema = Arc::new(get_tpch_table_schema(table_name));

let mut reader = ::csv::ReaderBuilder::new()
.has_headers(false)
.from_reader(data.as_bytes());
let records: Vec<_> = reader.records().map(|it| it.unwrap()).collect();

let mut cols: Vec<Box<dyn ArrayBuilder>> = vec![];
for field in schema.fields().iter() {
match field.data_type() {
DataType::Utf8 => cols.push(Box::new(StringBuilder::new(records.len()))),
DataType::Date32 => cols.push(Box::new(Date32Builder::new(records.len()))),
DataType::Int32 => cols.push(Box::new(Int32Builder::new(records.len()))),
DataType::Int64 => cols.push(Box::new(Int64Builder::new(records.len()))),
DataType::Float64 => cols.push(Box::new(Float64Builder::new(records.len()))),
_ => {
let msg = format!("Not implemented: {}", field.data_type());
Err(DataFusionError::Plan(msg))?
}
}
}

for record in records.iter() {
for (idx, val) in record.iter().enumerate() {
let col = cols.get_mut(idx).unwrap();
let field = schema.field(idx);
match field.data_type() {
DataType::Utf8 => {
let sb = col.as_any_mut().downcast_mut::<StringBuilder>().unwrap();
sb.append_value(val)?;
}
DataType::Date32 => {
let sb = col.as_any_mut().downcast_mut::<Date32Builder>().unwrap();
let dt = NaiveDate::parse_from_str(val.trim(), "%Y-%m-%d").unwrap();
let dt = dt.sub(NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32;
sb.append_value(dt)?;
}
DataType::Int32 => {
let sb = col.as_any_mut().downcast_mut::<Int32Builder>().unwrap();
sb.append_value(val.trim().parse().unwrap())?;
}
DataType::Int64 => {
let sb = col.as_any_mut().downcast_mut::<Int64Builder>().unwrap();
sb.append_value(val.trim().parse().unwrap())?;
}
DataType::Float64 => {
let sb = col.as_any_mut().downcast_mut::<Float64Builder>().unwrap();
sb.append_value(val.trim().parse().unwrap())?;
}
_ => Err(DataFusionError::Plan(format!(
"Not implemented: {}",
field.data_type()
)))?,
}
}
}
let cols: Vec<ArrayRef> = cols.iter_mut().map(|it| it.finish()).collect();

let batch = RecordBatch::try_new(Arc::clone(&schema), cols)?;

let table = Arc::new(MemTable::try_new(Arc::clone(&schema), vec![vec![batch]])?);
let _ = ctx.register_table(table_name, table).unwrap();

Ok(())
}

async fn register_aggregate_csv_by_sql(ctx: &SessionContext) {
let testdata = datafusion::test_util::arrow_test_data();

Expand Down

0 comments on commit dde2b4f

Please sign in to comment.