Skip to content

Commit

Permalink
Make DataFusion Core compile (#3)
Browse files Browse the repository at this point in the history
* wip

* more

* Make scalar.rs compile

* Fix various compilation error due to API difference

* Make datafusion core compile

* fmt

* wip
  • Loading branch information
yjshen committed Sep 17, 2021
1 parent 428efa8 commit 478f606
Show file tree
Hide file tree
Showing 23 changed files with 467 additions and 166 deletions.
43 changes: 20 additions & 23 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Expand Up @@ -34,14 +34,11 @@ use crate::utils;
use crate::serde::protobuf::ShuffleWritePartition;
use crate::serde::scheduler::{PartitionLocation, PartitionStats};
use async_trait::async_trait;
use datafusion::arrow::array::{
Array, ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder,
UInt64Builder,
};
use datafusion::arrow::array::*;
use datafusion::arrow::compute::take;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::ipc::reader::FileReader;
use datafusion::arrow::ipc::writer::FileWriter;
use datafusion::arrow::io::ipc::read::FileReader;
use datafusion::arrow::io::ipc::write::FileWriter;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::hash_utils::create_hashes;
Expand Down Expand Up @@ -244,7 +241,7 @@ impl ShuffleWriterExec {
.collect::<Result<Vec<Arc<dyn Array>>>>()?;

let output_batch =
RecordBatch::try_new(input_batch.schema(), columns)?;
RecordBatch::try_new(input_batch.schema().clone(), columns)?;

// write non-empty batch out

Expand Down Expand Up @@ -356,18 +353,18 @@ impl ExecutionPlan for ShuffleWriterExec {

// build metadata result batch
let num_writers = part_loc.len();
let mut partition_builder = UInt32Builder::new(num_writers);
let mut path_builder = StringBuilder::new(num_writers);
let mut num_rows_builder = UInt64Builder::new(num_writers);
let mut num_batches_builder = UInt64Builder::new(num_writers);
let mut num_bytes_builder = UInt64Builder::new(num_writers);
let mut partition_builder = UInt32Vec::with_capacity(num_writers);
let mut path_builder = MutableUtf8Array::with_capacity(num_writers);
let mut num_rows_builder = UInt64Vec::with_capacity(num_writers);
let mut num_batches_builder = UInt64Vec::with_capacity(num_writers);
let mut num_bytes_builder = UInt64Vec::with_capacity(num_writers);

for loc in &part_loc {
path_builder.append_value(loc.path.clone())?;
partition_builder.append_value(loc.partition_id as u32)?;
num_rows_builder.append_value(loc.num_rows)?;
num_batches_builder.append_value(loc.num_batches)?;
num_bytes_builder.append_value(loc.num_bytes)?;
path_builder.push(Some(loc.path.clone()));
partition_builder.push(Some(loc.partition_id as u32));
num_rows_builder.push(Some(loc.num_rows));
num_batches_builder.push(Some(loc.num_batches));
num_bytes_builder.push(Some(loc.num_bytes));
}

// build arrays
Expand Down Expand Up @@ -428,17 +425,17 @@ fn result_schema() -> SchemaRef {
]))
}

struct ShuffleWriter {
struct ShuffleWriter<'a> {
path: String,
writer: FileWriter<File>,
writer: FileWriter<'a, File>,
num_batches: u64,
num_rows: u64,
num_bytes: u64,
}

impl ShuffleWriter {
impl<'a> ShuffleWriter<'a> {
fn new(path: &str, schema: &Schema) -> Result<Self> {
let file = File::create(path)
let mut file = File::create(path)
.map_err(|e| {
BallistaError::General(format!(
"Failed to create partition file at {}: {:?}",
Expand All @@ -451,7 +448,7 @@ impl ShuffleWriter {
num_rows: 0,
num_bytes: 0,
path: path.to_owned(),
writer: FileWriter::try_new(file, schema)?,
writer: FileWriter::try_new(&mut file, schema)?,
})
}

Expand Down Expand Up @@ -480,7 +477,7 @@ impl ShuffleWriter {
#[cfg(test)]
mod tests {
use super::*;
use datafusion::arrow::array::{StringArray, StructArray, UInt32Array, UInt64Array};
use datafusion::arrow::array::{Utf8Array, StructArray, UInt32Array, UInt64Array};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::expressions::Column;
use datafusion::physical_plan::limit::GlobalLimitExec;
Expand Down
2 changes: 0 additions & 2 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Expand Up @@ -61,7 +61,6 @@ use datafusion::physical_plan::{
expressions::{
col, Avg, BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr,
IsNullExpr, Literal, NegativeExpr, NotExpr, PhysicalSortExpr, TryCastExpr,
DEFAULT_DATAFUSION_CAST_OPTIONS,
},
filter::FilterExec,
functions::{self, BuiltinScalarFunction, ScalarFunctionExpr},
Expand Down Expand Up @@ -620,7 +619,6 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
ExprType::Cast(e) => Arc::new(CastExpr::new(
convert_box_required!(e.expr)?,
convert_required!(e.arrow_type)?,
DEFAULT_DATAFUSION_CAST_OPTIONS,
)),
ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
convert_box_required!(e.expr)?,
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/src/utils.rs
Expand Up @@ -31,6 +31,7 @@ use crate::serde::scheduler::PartitionStats;

use crate::config::BallistaConfig;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::{
array::*,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/benches/physical_plan.rs
Expand Up @@ -51,7 +51,7 @@ fn sort_preserving_merge_operator(batches: Vec<RecordBatch>, sort: &[&str]) {

let exec = MemoryExec::try_new(
&batches.into_iter().map(|rb| vec![rb]).collect::<Vec<_>>(),
schema,
schema.clone(),
None,
)
.unwrap();
Expand Down

0 comments on commit 478f606

Please sign in to comment.