Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace &Option<T> with Option<&T> #4446

Merged
merged 4 commits into from Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions datafusion-examples/examples/custom_datasource.rs
Expand Up @@ -118,7 +118,7 @@ impl Debug for CustomDataSource {
impl CustomDataSource {
pub(crate) async fn create_physical_plan(
&self,
projections: &Option<Vec<usize>>,
projections: Option<&Vec<usize>>,
schema: SchemaRef,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(CustomExec::new(projections, schema, self.clone())))
Expand Down Expand Up @@ -177,7 +177,7 @@ impl TableProvider for CustomDataSource {
async fn scan(
&self,
_state: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
// filters and limit can be used here to inject some push-down operations if needed
_filters: &[Expr],
_limit: Option<usize>,
Expand All @@ -194,11 +194,11 @@ struct CustomExec {

impl CustomExec {
fn new(
projections: &Option<Vec<usize>>,
projections: Option<&Vec<usize>>,
schema: SchemaRef,
db: CustomDataSource,
) -> Self {
let projected_schema = project_schema(&schema, projections.as_ref()).unwrap();
let projected_schema = project_schema(&schema, projections).unwrap();
Self {
db,
projected_schema,
Expand Down
12 changes: 9 additions & 3 deletions datafusion/common/src/scalar.rs
Expand Up @@ -2160,7 +2160,7 @@ impl ScalarValue {
fn eq_array_decimal(
array: &ArrayRef,
index: usize,
value: &Option<i128>,
value: Option<&i128>,
precision: u8,
scale: i8,
) -> Result<bool> {
Expand Down Expand Up @@ -2196,8 +2196,14 @@ impl ScalarValue {
pub fn eq_array(&self, array: &ArrayRef, index: usize) -> bool {
match self {
ScalarValue::Decimal128(v, precision, scale) => {
ScalarValue::eq_array_decimal(array, index, v, *precision, *scale)
.unwrap()
ScalarValue::eq_array_decimal(
array,
index,
v.as_ref(),
*precision,
*scale,
)
.unwrap()
}
ScalarValue::Boolean(val) => {
eq_array_primitive!(array, index, BooleanArray, val)
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/dataframe.rs
Expand Up @@ -795,12 +795,11 @@ impl TableProvider for DataFrame {
async fn scan(
&self,
_ctx: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut expr = projection
.as_ref()
// construct projections
.map_or_else(
|| {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/datasource.rs
Expand Up @@ -61,7 +61,7 @@ pub trait TableProvider: Sync + Send {
async fn scan(
&self,
ctx: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
// limit can be used to reduce the amount scanned
// from the datasource as a performance optimization.
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/empty.rs
Expand Up @@ -69,12 +69,12 @@ impl TableProvider for EmptyTable {
async fn scan(
&self,
_ctx: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// even though there is no data, projections apply
let projected_schema = project_schema(&self.schema, projection.as_ref())?;
let projected_schema = project_schema(&self.schema, projection)?;
Ok(Arc::new(
EmptyExec::new(false, projected_schema).with_partitions(self.partitions),
))
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/src/datasource/listing/table.rs
Expand Up @@ -523,7 +523,7 @@ impl TableProvider for ListingTable {
async fn scan(
&self,
ctx: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand All @@ -533,7 +533,7 @@ impl TableProvider for ListingTable {
// if no files need to be read, return an `EmptyExec`
if partitioned_file_lists.is_empty() {
let schema = self.schema();
let projected_schema = project_schema(&schema, projection.as_ref())?;
let projected_schema = project_schema(&schema, projection)?;
return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
}

Expand Down Expand Up @@ -562,7 +562,7 @@ impl TableProvider for ListingTable {
file_schema: Arc::clone(&self.file_schema),
file_groups: partitioned_file_lists,
statistics,
projection: projection.clone(),
projection: projection.cloned(),
limit,
output_ordering: self.try_create_output_ordering()?,
table_partition_cols,
Expand Down Expand Up @@ -686,7 +686,7 @@ mod tests {
let table = load_table(&ctx, "alltypes_plain.parquet").await?;
let projection = None;
let exec = table
.scan(&ctx.state(), &projection, &[], None)
.scan(&ctx.state(), projection, &[], None)
.await
.expect("Scan table");

Expand Down Expand Up @@ -716,7 +716,7 @@ mod tests {
.with_schema(schema);
let table = ListingTable::try_new(config)?;

let exec = table.scan(&state, &None, &[], None).await?;
let exec = table.scan(&state, None, &[], None).await?;
assert_eq!(exec.statistics().num_rows, Some(8));
assert_eq!(exec.statistics().total_byte_size, Some(671));

Expand Down Expand Up @@ -855,7 +855,7 @@ mod tests {
let filter = Expr::not_eq(col("p1"), lit("v1"));

let scan = table
.scan(&ctx.state(), &None, &[filter], None)
.scan(&ctx.state(), None, &[filter], None)
.await
.expect("Empty execution plan");

Expand Down
18 changes: 7 additions & 11 deletions datafusion/core/src/datasource/memory.rs
Expand Up @@ -69,7 +69,7 @@ impl MemTable {
ctx: &SessionState,
) -> Result<Self> {
let schema = t.schema();
let exec = t.scan(ctx, &None, &[], None).await?;
let exec = t.scan(ctx, None, &[], None).await?;
let partition_count = exec.output_partitioning().partition_count();

let tasks = (0..partition_count)
Expand Down Expand Up @@ -136,14 +136,14 @@ impl TableProvider for MemTable {
async fn scan(
&self,
_ctx: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(MemoryExec::try_new(
&self.batches.clone(),
self.schema(),
projection.clone(),
projection.cloned(),
)?))
}
}
Expand Down Expand Up @@ -184,7 +184,7 @@ mod tests {

// scan with projection
let exec = provider
.scan(&session_ctx.state(), &Some(vec![2, 1]), &[], None)
.scan(&session_ctx.state(), Some(&vec![2, 1]), &[], None)
.await?;

let mut it = exec.execute(0, task_ctx)?;
Expand Down Expand Up @@ -218,9 +218,7 @@ mod tests {

let provider = MemTable::try_new(schema, vec![vec![batch]])?;

let exec = provider
.scan(&session_ctx.state(), &None, &[], None)
.await?;
let exec = provider.scan(&session_ctx.state(), None, &[], None).await?;
let mut it = exec.execute(0, task_ctx)?;
let batch1 = it.next().await.unwrap()?;
assert_eq!(3, batch1.schema().fields().len());
Expand Down Expand Up @@ -253,7 +251,7 @@ mod tests {
let projection: Vec<usize> = vec![0, 4];

match provider
.scan(&session_ctx.state(), &Some(projection), &[], None)
.scan(&session_ctx.state(), Some(&projection), &[], None)
.await
{
Err(DataFusionError::ArrowError(ArrowError::SchemaError(e))) => {
Expand Down Expand Up @@ -381,9 +379,7 @@ mod tests {
let provider =
MemTable::try_new(Arc::new(merged_schema), vec![vec![batch1, batch2]])?;

let exec = provider
.scan(&session_ctx.state(), &None, &[], None)
.await?;
let exec = provider.scan(&session_ctx.state(), None, &[], None).await?;
let mut it = exec.execute(0, task_ctx)?;
let batch1 = it.next().await.unwrap()?;
assert_eq!(3, batch1.schema().fields().len());
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/view.rs
Expand Up @@ -61,8 +61,8 @@ impl ViewTable {
}

/// Get definition ref
pub fn definition(&self) -> &Option<String> {
&self.definition
pub fn definition(&self) -> Option<&String> {
self.definition.as_ref()
}

/// Get logical_plan ref
Expand Down Expand Up @@ -104,7 +104,7 @@ impl TableProvider for ViewTable {
async fn scan(
&self,
state: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Expand Up @@ -181,8 +181,8 @@ fn swap_reverting_projection(
}

/// Swaps join sides for filter column indices and produces new JoinFilter
fn swap_join_filter(filter: &Option<JoinFilter>) -> Option<JoinFilter> {
filter.as_ref().map(|filter| {
fn swap_join_filter(filter: Option<&JoinFilter>) -> Option<JoinFilter> {
filter.map(|filter| {
let column_indices = filter
.column_indices()
.iter()
Expand Down Expand Up @@ -334,7 +334,7 @@ fn try_collect_left(
Arc::clone(left),
Arc::clone(right),
hash_join.on().to_vec(),
hash_join.filter().clone(),
hash_join.filter().cloned(),
hash_join.join_type(),
PartitionMode::CollectLeft,
hash_join.null_equals_null(),
Expand All @@ -345,7 +345,7 @@ fn try_collect_left(
Arc::clone(left),
Arc::clone(right),
hash_join.on().to_vec(),
hash_join.filter().clone(),
hash_join.filter().cloned(),
hash_join.join_type(),
PartitionMode::CollectLeft,
hash_join.null_equals_null(),
Expand Down Expand Up @@ -377,7 +377,7 @@ fn partitioned_hash_join(hash_join: &HashJoinExec) -> Result<Arc<dyn ExecutionPl
Arc::clone(left),
Arc::clone(right),
hash_join.on().to_vec(),
hash_join.filter().clone(),
hash_join.filter().cloned(),
hash_join.join_type(),
PartitionMode::Partitioned,
hash_join.null_equals_null(),
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/physical_plan/joins/hash_join.rs
Expand Up @@ -248,8 +248,8 @@ impl HashJoinExec {
}

/// Filters applied before join output
pub fn filter(&self) -> &Option<JoinFilter> {
&self.filter
pub fn filter(&self) -> Option<&JoinFilter> {
self.filter.as_ref()
}

/// How the join is performed
Expand Down Expand Up @@ -698,7 +698,7 @@ fn build_join_indices(
left_data: &JoinLeftData,
on_left: &[Column],
on_right: &[Column],
filter: &Option<JoinFilter>,
filter: Option<&JoinFilter>,
random_state: &RandomState,
null_equals_null: &bool,
) -> Result<(UInt64Array, UInt32Array)> {
Expand Down Expand Up @@ -1363,7 +1363,7 @@ impl HashJoinStream {
left_data,
&self.on_left,
&self.on_right,
&self.filter,
self.filter.as_ref(),
&self.random_state,
&self.null_equals_null,
);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/planner.rs
Expand Up @@ -480,7 +480,7 @@ impl DefaultPhysicalPlanner {
// referred to in the query
let filters = unnormalize_cols(filters.iter().cloned());
let unaliased: Vec<Expr> = filters.into_iter().map(unalias).collect();
source.scan(session_state, projection, &unaliased, *fetch).await
source.scan(session_state, projection.as_ref(), &unaliased, *fetch).await
}
LogicalPlan::Values(Values {
values,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/test_util.rs
Expand Up @@ -321,7 +321,7 @@ impl TableProvider for TestTableProvider {
async fn scan(
&self,
_ctx: &SessionState,
_projection: &Option<Vec<usize>>,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/custom_sources.rs
Expand Up @@ -193,12 +193,12 @@ impl TableProvider for CustomTableProvider {
async fn scan(
&self,
_state: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(CustomExecutionPlan {
projection: projection.clone(),
projection: projection.cloned(),
}))
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/provider_filter_pushdown.rs
Expand Up @@ -143,7 +143,7 @@ impl TableProvider for CustomProvider {
async fn scan(
&self,
_state: &SessionState,
_: &Option<Vec<usize>>,
_: Option<&Vec<usize>>,
filters: &[Expr],
_: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/tests/row.rs
Expand Up @@ -34,7 +34,7 @@ async fn test_with_parquet() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
let exec = get_exec("alltypes_plain.parquet", projection.as_ref(), None).await?;
let schema = exec.schema().clone();

let batches = collect(exec, task_ctx).await?;
Expand All @@ -55,7 +55,7 @@ async fn test_with_parquet_word_aligned() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]);
let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
let exec = get_exec("alltypes_plain.parquet", projection.as_ref(), None).await?;
let schema = exec.schema().clone();

let batches = collect(exec, task_ctx).await?;
Expand All @@ -73,7 +73,7 @@ async fn test_with_parquet_word_aligned() -> Result<()> {

async fn get_exec(
file_name: &str,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let testdata = datafusion::test_util::parquet_test_data();
Expand Down Expand Up @@ -103,7 +103,7 @@ async fn get_exec(
file_schema,
file_groups,
statistics,
projection: projection.clone(),
projection: projection.cloned(),
limit,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/information_schema.rs
Expand Up @@ -191,7 +191,7 @@ async fn information_schema_tables_table_types() {
async fn scan(
&self,
_ctx: &SessionState,
_: &Option<Vec<usize>>,
_: Option<&Vec<usize>>,
_: &[Expr],
_: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down