Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure the row count is preserved when coalescing over empty records #3439

Merged
merged 2 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 6 additions & 2 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::execution::context::TaskContext;
use arrow::compute::kernels::concat::concat;
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use arrow::record_batch::{RecordBatch, RecordBatchOptions};
use futures::stream::{Stream, StreamExt};
use log::trace;

Expand Down Expand Up @@ -291,7 +291,11 @@ pub fn concat_batches(
batches.len(),
row_count
);
RecordBatch::try_new(schema.clone(), arrays)

let mut options = RecordBatchOptions::default();
options.row_count = Some(row_count);
Comment on lines +295 to +296
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an FYI (not important) you can write this same pattern in what is likely more idiomatic rust (avoid mut) like:

Suggested change
let mut options = RecordBatchOptions::default();
options.row_count = Some(row_count);
let options = RecordBatchOptions{
row_count:Some(row_count),
..Default::default()
};

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to fail since RecordBatchOptions is non-exhaustive and comes directly from arrow-rs. I am not really familiar with this part of Rust semantics, but there seems to be an existing issue about it.

error[E0639]: cannot create non-exhaustive struct using struct expression
   --> /home/isidentical/projects/arrow-datafusion/datafusion/core/src/physical_plan/coalesce_batches.rs:295:19
    |
295 |       let options = RecordBatchOptions {
    |  ___________________^
296 | |         row_count: Some(row_count),
297 | |         ..Default::default()
298 | |     };
    | |_____^

For more information about this error, try `rustc --explain E0639`.

Is there any way to achieve it? The only other example of RecordBatchOptions construction present in DF is the following which uses the same pattern as the code present in this PR but I'd be happy to refactor both if there is a more idiomatic way.
https://github.com/apache/arrow-datafusion/blob/9956f80f197550051db7debae15d5c706afc22a3/datafusion/core/src/physical_plan/file_format/mod.rs#L286-L288

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for trying @isidentical -- I will file a follow on in arrow-rs to make this API more ergonomic

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed apache/arrow-rs#2728 as a follow on (I also hit the same thing in #3454 FWIW)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI apache/arrow-rs#2729 fixed this issue and #3483 includes using the new API.


RecordBatch::try_new_with_options(schema.clone(), arrays, &options)
}

#[cfg(test)]
Expand Down
14 changes: 11 additions & 3 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,14 +765,22 @@ async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec<RecordBatch>
.unwrap();
let logical_schema = plan.schema();

// We are not really interested in the direct output of optimized_logical_plan
// since the physical plan construction already optimizes the given logical plan
// and we want to avoid double-optimization as a consequence. So we just construct
// it here to make sure that it doesn't fail at this step and get the optimized
// schema (to assert later that the logical and optimized schemas are the same).
let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan);
let plan = ctx
let optimized_logical_plan = ctx
.optimize(&plan)
.map_err(|e| format!("{:?} at {}", e, msg))
.unwrap();
let optimized_logical_schema = plan.schema();
let optimized_logical_schema = optimized_logical_plan.schema();

let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
let msg = format!(
"Creating physical plan for '{}': {:?}",
sql, optimized_logical_plan
);
let plan = ctx
isidentical marked this conversation as resolved.
Show resolved Hide resolved
.create_physical_plan(&plan)
.await
Expand Down