Skip to content

Commit

Permalink
Ensure the row count is preserved when coalescing over empty records
Browse files Browse the repository at this point in the history
  • Loading branch information
isidentical committed Sep 10, 2022
1 parent 4d22076 commit 471f730
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
8 changes: 6 additions & 2 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Expand Up @@ -33,7 +33,7 @@ use crate::execution::context::TaskContext;
use arrow::compute::kernels::concat::concat;
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use arrow::record_batch::{RecordBatch, RecordBatchOptions};
use futures::stream::{Stream, StreamExt};
use log::trace;

Expand Down Expand Up @@ -291,7 +291,11 @@ pub fn concat_batches(
batches.len(),
row_count
);
RecordBatch::try_new(schema.clone(), arrays)

let mut options = RecordBatchOptions::default();
options.row_count = Some(row_count);

RecordBatch::try_new_with_options(schema.clone(), arrays, &options)
}

#[cfg(test)]
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/sql/mod.rs
Expand Up @@ -766,13 +766,13 @@ async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec<RecordBatch>
let logical_schema = plan.schema();

let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan);
let plan = ctx
let optimized_logical_plan = ctx
.optimize(&plan)
.map_err(|e| format!("{:?} at {}", e, msg))
.unwrap();
let optimized_logical_schema = plan.schema();
let optimized_logical_schema = optimized_logical_plan.schema();

let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
let msg = format!("Creating physical plan for '{}': {:?}", sql, optimized_logical_plan);
let plan = ctx
.create_physical_plan(&plan)
.await
Expand Down

0 comments on commit 471f730

Please sign in to comment.