Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into only_support_deci…
Browse files Browse the repository at this point in the history
…mal_scalar
  • Loading branch information
liukun4515 committed Dec 5, 2021
2 parents 76c224c + d047900 commit bbc0a8f
Show file tree
Hide file tree
Showing 23 changed files with 53 additions and 69 deletions.
2 changes: 1 addition & 1 deletion ballista-examples/Cargo.toml
Expand Up @@ -26,7 +26,7 @@ license = "Apache-2.0"
keywords = [ "arrow", "distributed", "query", "sql" ]
edition = "2021"
publish = false
rust-version = "1.56"
rust-version = "1.57"

[dependencies]
datafusion = { path = "../datafusion" }
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/client/Cargo.toml
Expand Up @@ -24,7 +24,7 @@ homepage = "https://github.com/apache/arrow-datafusion"
repository = "https://github.com/apache/arrow-datafusion"
authors = ["Apache Arrow <dev@arrow.apache.org>"]
edition = "2021"
rust-version = "1.56"
rust-version = "1.57"

[dependencies]
ballista-core = { path = "../core", version = "0.6.0" }
Expand Down
12 changes: 6 additions & 6 deletions ballista/rust/core/src/config.rs
Expand Up @@ -31,22 +31,22 @@ pub const BALLISTA_DEFAULT_SHUFFLE_PARTITIONS: &str = "ballista.shuffle.partitio
#[derive(Debug, Clone)]
pub struct ConfigEntry {
name: String,
description: String,
data_type: DataType,
_description: String,
_data_type: DataType,
default_value: Option<String>,
}

impl ConfigEntry {
fn new(
name: String,
description: String,
data_type: DataType,
_description: String,
_data_type: DataType,
default_value: Option<String>,
) -> Self {
Self {
name,
description,
data_type,
_description,
_data_type,
default_value,
}
}
Expand Down
12 changes: 1 addition & 11 deletions ballista/rust/core/src/serde/scheduler/mod.rs
Expand Up @@ -101,23 +101,13 @@ impl From<protobuf::ExecutorMetadata> for ExecutorMeta {
}

/// Summary of executed partition
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, Default)]
pub struct PartitionStats {
pub(crate) num_rows: Option<u64>,
pub(crate) num_batches: Option<u64>,
pub(crate) num_bytes: Option<u64>,
}

impl Default for PartitionStats {
fn default() -> Self {
Self {
num_rows: None,
num_batches: None,
num_bytes: None,
}
}
}

impl fmt::Display for PartitionStats {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
Expand Down
6 changes: 3 additions & 3 deletions ballista/rust/executor/src/flight_service.rs
Expand Up @@ -54,12 +54,12 @@ type FlightDataReceiver = Receiver<Result<FlightData, Status>>;
#[derive(Clone)]
pub struct BallistaFlightService {
/// Executor
executor: Arc<Executor>,
_executor: Arc<Executor>,
}

impl BallistaFlightService {
pub fn new(executor: Arc<Executor>) -> Self {
Self { executor }
pub fn new(_executor: Arc<Executor>) -> Self {
Self { _executor }
}
}

Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/scheduler/src/state/mod.rs
Expand Up @@ -567,7 +567,7 @@ fn find_unresolved_shuffles(
Ok(plan
.children()
.iter()
.map(|child| find_unresolved_shuffles(child))
.map(find_unresolved_shuffles)
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/Cargo.toml
Expand Up @@ -25,7 +25,7 @@ homepage = "https://github.com/apache/arrow-datafusion"
repository = "https://github.com/apache/arrow-datafusion"
license = "Apache-2.0"
publish = false
rust-version = "1.56"
rust-version = "1.57"

[features]
simd = ["datafusion/simd"]
Expand Down
14 changes: 6 additions & 8 deletions benchmarks/src/bin/tpch.rs
Expand Up @@ -75,10 +75,9 @@ struct BallistaBenchmarkOpt {
#[structopt(short = "i", long = "iterations", default_value = "3")]
iterations: usize,

/// Batch size when reading CSV or Parquet files
#[structopt(short = "s", long = "batch-size", default_value = "8192")]
batch_size: usize,

// /// Batch size when reading CSV or Parquet files
// #[structopt(short = "s", long = "batch-size", default_value = "8192")]
// batch_size: usize,
/// Path to data files
#[structopt(parse(from_os_str), required = true, short = "p", long = "path")]
path: PathBuf,
Expand All @@ -87,10 +86,9 @@ struct BallistaBenchmarkOpt {
#[structopt(short = "f", long = "format", default_value = "csv")]
file_format: String,

/// Load the data into a MemTable before executing the query
#[structopt(short = "m", long = "mem-table")]
mem_table: bool,

// /// Load the data into a MemTable before executing the query
// #[structopt(short = "m", long = "mem-table")]
// mem_table: bool,
/// Number of partitions to process in parallel
#[structopt(short = "p", long = "partitions", default_value = "2")]
partitions: usize,
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Expand Up @@ -24,7 +24,7 @@ keywords = [ "arrow", "datafusion", "ballista", "query", "sql" ]
license = "Apache-2.0"
homepage = "https://github.com/apache/arrow-datafusion"
repository = "https://github.com/apache/arrow-datafusion"
rust-version = "1.56"
rust-version = "1.57"

[dependencies]
clap = "2.33"
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Dockerfile
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

FROM rust:1.56 as builder
FROM rust:1.57 as builder

COPY ./datafusion /usr/src/datafusion

Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Expand Up @@ -26,7 +26,7 @@ license = "Apache-2.0"
keywords = [ "arrow", "query", "sql" ]
edition = "2021"
publish = false
rust-version = "1.56"
rust-version = "1.57"

[[example]]
name = "avro_sql"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/Cargo.toml
Expand Up @@ -31,7 +31,7 @@ include = [
"Cargo.toml",
]
edition = "2021"
rust-version = "1.56"
rust-version = "1.57"

[lib]
name = "datafusion"
Expand Down
6 changes: 5 additions & 1 deletion datafusion/src/physical_plan/datetime_expressions.rs
Expand Up @@ -42,6 +42,7 @@ use arrow::{
};
use chrono::prelude::*;
use chrono::Duration;
use std::borrow::Borrow;

/// given a function `op` that maps a `&str` to a Result of an arrow native type,
/// returns a `PrimitiveArray` after the application
Expand Down Expand Up @@ -77,7 +78,10 @@ where
})?;

// first map is the iterator, second is for the `Option<_>`
array.iter().map(|x| x.map(|x| op(x)).transpose()).collect()
array
.iter()
.map(|x| x.map(op.borrow()).transpose())
.collect()
}

// given an function that maps a `&str` to a arrow native type,
Expand Down
9 changes: 5 additions & 4 deletions datafusion/src/physical_plan/expressions/average.rs
Expand Up @@ -37,8 +37,6 @@ use super::{format_state_name, sum};
#[derive(Debug)]
pub struct Avg {
name: String,
data_type: DataType,
nullable: bool,
expr: Arc<dyn PhysicalExpr>,
}

Expand Down Expand Up @@ -69,11 +67,14 @@ impl Avg {
name: impl Into<String>,
data_type: DataType,
) -> Self {
// Average is always Float64, but Avg::new() has a data_type
// parameter to keep a consistent signature with the other
// Aggregate expressions.
assert_eq!(data_type, DataType::Float64);

Self {
name: name.into(),
expr,
data_type,
nullable: true,
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/physical_plan/sort_preserving_merge.rs
Expand Up @@ -346,7 +346,7 @@ struct SortPreservingMergeStream {
receivers: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,

/// Drop helper for tasks feeding the [`receivers`](Self::receivers)
drop_helper: AbortOnDropMany<()>,
_drop_helper: AbortOnDropMany<()>,

/// For each input stream maintain a dequeue of SortKeyCursor
///
Expand Down Expand Up @@ -379,7 +379,7 @@ struct SortPreservingMergeStream {
impl SortPreservingMergeStream {
fn new(
receivers: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
drop_helper: AbortOnDropMany<()>,
_drop_helper: AbortOnDropMany<()>,
schema: SchemaRef,
expressions: &[PhysicalSortExpr],
target_batch_size: usize,
Expand All @@ -394,7 +394,7 @@ impl SortPreservingMergeStream {
schema,
cursors,
receivers,
drop_helper,
_drop_helper,
column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(),
sort_options: expressions.iter().map(|x| x.options).collect(),
target_batch_size,
Expand Down
5 changes: 1 addition & 4 deletions datafusion/src/physical_plan/string_expressions.rs
Expand Up @@ -117,10 +117,7 @@ where
let string_array = downcast_string_arg!(args[0], "string", T);

// first map is the iterator, second is for the `Option<_>`
Ok(string_array
.iter()
.map(|string| string.map(|s| op(s)))
.collect())
Ok(string_array.iter().map(|string| string.map(&op)).collect())
}

fn handle<'a, F, R>(args: &'a [ColumnarValue], op: F, name: &str) -> Result<ColumnarValue>
Expand Down
10 changes: 1 addition & 9 deletions datafusion/src/physical_plan/windows/built_in.rs
Expand Up @@ -18,10 +18,8 @@
//! Physical exec for built-in window function expressions.

use crate::error::{DataFusionError, Result};
use crate::logical_plan::window_frames::WindowFrame;
use crate::physical_plan::{
expressions::PhysicalSortExpr,
window_functions::{BuiltInWindowFunction, BuiltInWindowFunctionExpr},
expressions::PhysicalSortExpr, window_functions::BuiltInWindowFunctionExpr,
PhysicalExpr, WindowExpr,
};
use arrow::compute::concat;
Expand All @@ -33,28 +31,22 @@ use std::sync::Arc;
/// A window expr that takes the form of a built in window function
#[derive(Debug)]
pub struct BuiltInWindowExpr {
fun: BuiltInWindowFunction,
expr: Arc<dyn BuiltInWindowFunctionExpr>,
partition_by: Vec<Arc<dyn PhysicalExpr>>,
order_by: Vec<PhysicalSortExpr>,
window_frame: Option<WindowFrame>,
}

impl BuiltInWindowExpr {
/// create a new built-in window function expression
pub(super) fn new(
fun: BuiltInWindowFunction,
expr: Arc<dyn BuiltInWindowFunctionExpr>,
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: &[PhysicalSortExpr],
window_frame: Option<WindowFrame>,
) -> Self {
Self {
fun,
expr,
partition_by: partition_by.to_vec(),
order_by: order_by.to_vec(),
window_frame,
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions datafusion/src/physical_plan/windows/mod.rs
Expand Up @@ -64,11 +64,9 @@ pub fn create_window_expr(
window_frame,
)),
WindowFunction::BuiltInWindowFunction(fun) => Arc::new(BuiltInWindowExpr::new(
fun.clone(),
create_built_in_window_expr(fun, args, input_schema, name)?,
partition_by,
order_by,
window_frame,
)),
})
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/scalar.rs
Expand Up @@ -70,7 +70,7 @@ pub enum ScalarValue {
/// large binary
LargeBinary(Option<Vec<u8>>),
/// list of nested ScalarValue (boxed to reduce size_of(ScalarValue))
#[allow(clippy::box_vec)]
#[allow(clippy::box_collection)]
List(Option<Box<Vec<ScalarValue>>>, Box<DataType>),
/// Date stored as a signed 32bit int
Date32(Option<i32>),
Expand All @@ -89,7 +89,7 @@ pub enum ScalarValue {
/// Interval with DayTime unit
IntervalDayTime(Option<i64>),
/// struct of nested ScalarValue (boxed to reduce size_of(ScalarValue))
#[allow(clippy::box_vec)]
#[allow(clippy::box_collection)]
Struct(Option<Box<Vec<ScalarValue>>>, Box<Vec<Field>>),
}

Expand Down
12 changes: 8 additions & 4 deletions datafusion/src/sql/parser.rs
Expand Up @@ -85,7 +85,7 @@ pub struct CreateExternalTable {
#[derive(Debug, Clone, PartialEq)]
pub enum Statement {
/// ANSI SQL AST node
Statement(SQLStatement),
Statement(Box<SQLStatement>),
/// Extension: `CREATE EXTERNAL TABLE`
CreateExternalTable(CreateExternalTable),
}
Expand Down Expand Up @@ -167,13 +167,17 @@ impl<'a> DFParser<'a> {
}
_ => {
// use the native parser
Ok(Statement::Statement(self.parser.parse_statement()?))
Ok(Statement::Statement(Box::from(
self.parser.parse_statement()?,
)))
}
}
}
_ => {
// use the native parser
Ok(Statement::Statement(self.parser.parse_statement()?))
Ok(Statement::Statement(Box::from(
self.parser.parse_statement()?,
)))
}
}
}
Expand All @@ -183,7 +187,7 @@ impl<'a> DFParser<'a> {
if self.parser.parse_keyword(Keyword::EXTERNAL) {
self.parse_create_external_table()
} else {
Ok(Statement::Statement(self.parser.parse_create()?))
Ok(Statement::Statement(Box::from(self.parser.parse_create()?)))
}
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/test/exec.rs
Expand Up @@ -549,7 +549,7 @@ impl ExecutionPlan for BlockingExec {
async fn execute(&self, _partition: usize) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(BlockingStream {
schema: Arc::clone(&self.schema),
refs: Arc::clone(&self.refs),
_refs: Arc::clone(&self.refs),
}))
}

Expand Down Expand Up @@ -577,7 +577,7 @@ pub struct BlockingStream {
schema: SchemaRef,

/// Ref-counting helper to check if the stream are still in memory.
refs: Arc<()>,
_refs: Arc<()>,
}

impl Stream for BlockingStream {
Expand Down
2 changes: 1 addition & 1 deletion dev/docker/ballista-base.dockerfile
Expand Up @@ -23,7 +23,7 @@


# Base image extends debian:buster-slim
FROM rust:1.56.0-buster AS builder
FROM rust:1.57.0-buster AS builder

RUN apt update && apt -y install musl musl-dev musl-tools libssl-dev openssl

Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Expand Up @@ -25,7 +25,7 @@ description = "Build and run queries against data"
readme = "README.md"
license = "Apache-2.0"
edition = "2021"
rust-version = "1.56"
rust-version = "1.57"

[dependencies]
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
Expand Down

0 comments on commit bbc0a8f

Please sign in to comment.