Skip to content

Commit

Permalink
DataFusion 13.0.0 (#61)
Browse files Browse the repository at this point in the history
* Update for changes in apache/arrow-rs#2711

* Clippy

* Use Vec conversion

* Update to DF 13 (#59)

* [DataFrame] - Add write_csv/write_parquet/write_json to DataFrame (#58)

* [SessionContext] - Add read_csv/read_parquet/read_avro functions to SessionContext (#57)

Co-authored-by: Francis Du <me@francis.run>

* remove patch from cargo toml

* add notes on git submodule for test data

Co-authored-by: Raphael Taylor-Davies <r.taylordavies@googlemail.com>
Co-authored-by: Francis Du <me@francis.run>
  • Loading branch information
3 people committed Oct 13, 2022
1 parent 55909a8 commit fb91556
Show file tree
Hide file tree
Showing 10 changed files with 520 additions and 277 deletions.
662 changes: 437 additions & 225 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Cargo.toml
Expand Up @@ -34,9 +34,9 @@ default = ["mimalloc"]
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
rand = "0.7"
pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3", "abi3-py37"] }
datafusion = { version = "^12.0.0", features = ["pyarrow", "avro"] }
datafusion-expr = { version = "^12.0.0" }
datafusion-common = { version = "^12.0.0", features = ["pyarrow"] }
datafusion = { version = "^13.0.0", features = ["pyarrow", "avro"] }
datafusion-expr = { version = "^13.0.0" }
datafusion-common = { version = "^13.0.0", features = ["pyarrow"] }
uuid = { version = "0.8", features = ["v4"] }
mimalloc = { version = "*", optional = true, default-features = false }
async-trait = "0.1"
Expand All @@ -51,4 +51,4 @@ name = "datafusion._internal"

[profile.release]
lto = true
codegen-units = 1
codegen-units = 1
7 changes: 7 additions & 0 deletions README.md
Expand Up @@ -163,6 +163,13 @@ python -m pip install -U pip
python -m pip install -r requirements-310.txt
```

The tests rely on test data in git submodules.

```bash
git submodule init
git submodule update
```

Whenever rust code changes (your changes or via `git pull`):

```bash
Expand Down
41 changes: 25 additions & 16 deletions src/context.rs
Expand Up @@ -24,6 +24,7 @@ use pyo3::exceptions::{PyKeyError, PyValueError};
use pyo3::prelude::*;

use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::datasource::TableProvider;
use datafusion::datasource::MemTable;
Expand Down Expand Up @@ -99,9 +100,12 @@ impl PySessionContext {
Ok(PyDataFrame::new(df))
}

fn create_dataframe(&mut self, partitions: Vec<Vec<RecordBatch>>) -> PyResult<PyDataFrame> {
let table = MemTable::try_new(partitions[0][0].schema(), partitions)
.map_err(DataFusionError::from)?;
fn create_dataframe(
&mut self,
partitions: PyArrowType<Vec<Vec<RecordBatch>>>,
) -> PyResult<PyDataFrame> {
let schema = partitions.0[0][0].schema();
let table = MemTable::try_new(schema, partitions.0).map_err(DataFusionError::from)?;

// generate a random (unique) name for this table
// table name cannot start with numeric digit
Expand Down Expand Up @@ -136,10 +140,10 @@ impl PySessionContext {
fn register_record_batches(
&mut self,
name: &str,
partitions: Vec<Vec<RecordBatch>>,
partitions: PyArrowType<Vec<Vec<RecordBatch>>>,
) -> PyResult<()> {
let schema = partitions[0][0].schema();
let table = MemTable::try_new(schema, partitions)?;
let schema = partitions.0[0][0].schema();
let table = MemTable::try_new(schema, partitions.0)?;
self.ctx
.register_table(name, Arc::new(table))
.map_err(DataFusionError::from)?;
Expand Down Expand Up @@ -182,7 +186,7 @@ impl PySessionContext {
&mut self,
name: &str,
path: PathBuf,
schema: Option<Schema>,
schema: Option<PyArrowType<Schema>>,
has_header: bool,
delimiter: &str,
schema_infer_max_records: usize,
Expand All @@ -204,7 +208,7 @@ impl PySessionContext {
.delimiter(delimiter[0])
.schema_infer_max_records(schema_infer_max_records)
.file_extension(file_extension);
options.schema = schema.as_ref();
options.schema = schema.as_ref().map(|x| &x.0);

let result = self.ctx.register_csv(name, path, options);
wait_for_future(py, result).map_err(DataFusionError::from)?;
Expand Down Expand Up @@ -277,7 +281,7 @@ impl PySessionContext {
fn read_csv(
&self,
path: PathBuf,
schema: Option<Schema>,
schema: Option<PyArrowType<Schema>>,
has_header: bool,
delimiter: &str,
schema_infer_max_records: usize,
Expand All @@ -302,12 +306,17 @@ impl PySessionContext {
.schema_infer_max_records(schema_infer_max_records)
.file_extension(file_extension)
.table_partition_cols(table_partition_cols);
options.schema = schema.as_ref();

let result = self.ctx.read_csv(path, options);
let df = PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?);

Ok(df)
if let Some(py_schema) = schema {
options.schema = Some(&py_schema.0);
let result = self.ctx.read_csv(path, options);
let df = PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?);
Ok(df)
} else {
let result = self.ctx.read_csv(path, options);
let df = PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?);
Ok(df)
}
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -346,14 +355,14 @@ impl PySessionContext {
fn read_avro(
&self,
path: &str,
schema: Option<Schema>,
schema: Option<PyArrowType<Schema>>,
table_partition_cols: Vec<String>,
file_extension: &str,
py: Python,
) -> PyResult<PyDataFrame> {
let mut options = AvroReadOptions::default().table_partition_cols(table_partition_cols);
options.file_extension = file_extension;
options.schema = schema.map(Arc::new);
options.schema = schema.map(|s| Arc::new(s.0));

let result = self.ctx.read_avro(path, options);
let df = PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?);
Expand Down
10 changes: 5 additions & 5 deletions src/dataframe.rs
Expand Up @@ -18,7 +18,7 @@
use crate::utils::wait_for_future;
use crate::{errors::DataFusionError, expression::PyExpr};
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::pyarrow::PyArrowConvert;
use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowException, PyArrowType};
use datafusion::arrow::util::pretty;
use datafusion::dataframe::DataFrame;
use datafusion::prelude::*;
Expand Down Expand Up @@ -65,8 +65,8 @@ impl PyDataFrame {
}

/// Returns the schema from the logical plan
fn schema(&self) -> Schema {
self.df.schema().into()
fn schema(&self) -> PyArrowType<Schema> {
PyArrowType(self.df.schema().into())
}

#[args(args = "*")]
Expand Down Expand Up @@ -144,7 +144,7 @@ impl PyDataFrame {
fn show(&self, py: Python, num: usize) -> PyResult<()> {
let df = self.df.limit(0, Some(num))?;
let batches = wait_for_future(py, df.collect())?;
Ok(pretty::print_batches(&batches)?)
pretty::print_batches(&batches).map_err(|err| PyArrowException::new_err(err.to_string()))
}

/// Filter out duplicate rows
Expand Down Expand Up @@ -186,7 +186,7 @@ impl PyDataFrame {
fn explain(&self, py: Python, verbose: bool, analyze: bool) -> PyResult<()> {
let df = self.df.explain(verbose, analyze)?;
let batches = wait_for_future(py, df.collect())?;
Ok(pretty::print_batches(&batches)?)
pretty::print_batches(&batches).map_err(|err| PyArrowException::new_err(err.to_string()))
}

/// Repartition a `DataFrame` based on a logical partitioning scheme.
Expand Down
10 changes: 9 additions & 1 deletion src/dataset.rs
Expand Up @@ -27,6 +27,7 @@ use std::sync::Arc;
use async_trait::async_trait;

use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::datasource::datasource::TableProviderFilterPushDown;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::{DataFusionError, Result as DFResult};
Expand Down Expand Up @@ -74,7 +75,14 @@ impl TableProvider for Dataset {
Python::with_gil(|py| {
let dataset = self.dataset.as_ref(py);
// This can panic but since we checked that self.dataset is a pyarrow.dataset.Dataset it should never
Arc::new(dataset.getattr("schema").unwrap().extract().unwrap())
Arc::new(
dataset
.getattr("schema")
.unwrap()
.extract::<PyArrowType<_>>()
.unwrap()
.0,
)
})
}

Expand Down
12 changes: 9 additions & 3 deletions src/dataset_exec.rs
Expand Up @@ -28,6 +28,7 @@ use futures::stream;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError as InnerDataFusionError, Result as DFResult};
use datafusion::execution::context::TaskContext;
Expand All @@ -54,7 +55,7 @@ impl Iterator for PyArrowBatchesAdapter {
Some(
batches
.next()?
.and_then(|batch| batch.extract())
.and_then(|batch| Ok(batch.extract::<PyArrowType<_>>()?.0))
.map_err(|err| ArrowError::ExternalError(Box::new(err))),
)
})
Expand Down Expand Up @@ -109,7 +110,12 @@ impl DatasetExec {

let scanner = dataset.call_method("scanner", (), Some(kwargs))?;

let schema = Arc::new(scanner.getattr("projected_schema")?.extract()?);
let schema = Arc::new(
scanner
.getattr("projected_schema")?
.extract::<PyArrowType<_>>()?
.0,
);

let builtins = Python::import(py, "builtins")?;
let pylist = builtins.getattr("list")?;
Expand Down Expand Up @@ -211,7 +217,7 @@ impl ExecutionPlan for DatasetExec {
let schema: SchemaRef = Arc::new(
scanner
.getattr("projected_schema")
.and_then(|schema| schema.extract())
.and_then(|schema| Ok(schema.extract::<PyArrowType<_>>()?.0))
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?,
);
let record_batches: &PyIterator = scanner
Expand Down
5 changes: 3 additions & 2 deletions src/expression.rs
Expand Up @@ -19,6 +19,7 @@ use pyo3::{basic::CompareOp, prelude::*};
use std::convert::{From, Into};

use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion_expr::{col, lit, Expr};

use datafusion::scalar::ScalarValue;
Expand Down Expand Up @@ -125,12 +126,12 @@ impl PyExpr {
self.expr.clone().is_null().into()
}

pub fn cast(&self, to: DataType) -> PyExpr {
pub fn cast(&self, to: PyArrowType<DataType>) -> PyExpr {
// self.expr.cast_to() requires DFSchema to validate that the cast
// is supported, omit that for now
let expr = Expr::Cast {
expr: Box::new(self.expr.clone()),
data_type: to,
data_type: to.0,
};
expr.into()
}
Expand Down
17 changes: 9 additions & 8 deletions src/udaf.rs
Expand Up @@ -19,9 +19,9 @@ use std::sync::Arc;

use pyo3::{prelude::*, types::PyTuple};

use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::array::{Array, ArrayRef};
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::pyarrow::PyArrowConvert;
use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowType};
use datafusion::common::ScalarValue;
use datafusion::error::{DataFusionError, Result};
use datafusion_expr::{
Expand Down Expand Up @@ -82,6 +82,7 @@ impl Accumulator for RustAccumulator {

// 1. cast states to Pyarrow array
let state = state
.data()
.to_pyarrow(py)
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?;

Expand Down Expand Up @@ -120,18 +121,18 @@ impl PyAggregateUDF {
fn new(
name: &str,
accumulator: PyObject,
input_type: DataType,
return_type: DataType,
state_type: Vec<DataType>,
input_type: PyArrowType<DataType>,
return_type: PyArrowType<DataType>,
state_type: PyArrowType<Vec<DataType>>,
volatility: &str,
) -> PyResult<Self> {
let function = create_udaf(
name,
input_type,
Arc::new(return_type),
input_type.0,
Arc::new(return_type.0),
parse_volatility(volatility)?,
to_rust_accumulator(accumulator),
Arc::new(state_type),
Arc::new(state_type.0),
);
Ok(Self { function })
}
Expand Down
25 changes: 12 additions & 13 deletions src/udf.rs
Expand Up @@ -19,9 +19,9 @@ use std::sync::Arc;

use pyo3::{prelude::*, types::PyTuple};

use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::array::{make_array, Array, ArrayData, ArrayRef};
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::pyarrow::PyArrowConvert;
use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowType};
use datafusion::error::DataFusionError;
use datafusion::physical_plan::functions::make_scalar_function;
use datafusion::physical_plan::udf::ScalarUDF;
Expand All @@ -46,15 +46,14 @@ fn to_rust_function(func: PyObject) -> ScalarFunctionImplementation {
let py_args = PyTuple::new(py, py_args);

// 2. call function
let value = func.as_ref(py).call(py_args, None);
let value = match value {
Ok(n) => Ok(n),
Err(error) => Err(DataFusionError::Execution(format!("{:?}", error))),
}?;
let value = func
.as_ref(py)
.call(py_args, None)
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;

// 3. cast to arrow::array::Array
let array = ArrayRef::from_pyarrow(value).unwrap();
Ok(array)
let array_data = ArrayData::from_pyarrow(value).unwrap();
Ok(make_array(array_data))
})
},
)
Expand All @@ -73,14 +72,14 @@ impl PyScalarUDF {
fn new(
name: &str,
func: PyObject,
input_types: Vec<DataType>,
return_type: DataType,
input_types: PyArrowType<Vec<DataType>>,
return_type: PyArrowType<DataType>,
volatility: &str,
) -> PyResult<Self> {
let function = create_udf(
name,
input_types,
Arc::new(return_type),
input_types.0,
Arc::new(return_type.0),
parse_volatility(volatility)?,
to_rust_function(func),
);
Expand Down

0 comments on commit fb91556

Please sign in to comment.