Skip to content

Commit

Permalink
DataFusion 37.1 upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Apr 29, 2024
1 parent 2755bb8 commit 97f07db
Show file tree
Hide file tree
Showing 15 changed files with 821 additions and 815 deletions.
1,449 changes: 735 additions & 714 deletions Cargo.lock

Large diffs are not rendered by default.

52 changes: 18 additions & 34 deletions Cargo.toml
Expand Up @@ -2,20 +2,20 @@
members = ["clade"]

[workspace.dependencies]
arrow = "50.0.0"
arrow-buffer = "50.0.0"
arrow-csv = "50.0.0"
arrow-flight = "50.0.0"
arrow = "51.0.0"
arrow-buffer = "51.0.0"
arrow-csv = "51.0.0"
arrow-flight = "51.0.0"
# For the JSON format support
# https://github.com/apache/arrow-rs/pull/2868
# https://github.com/apache/arrow-rs/pull/2724
arrow-integration-test = "50.0.0"
arrow-schema = "50.0.0"
arrow-integration-test = "51.0.0"
arrow-schema = "51.0.0"
async-trait = "0.1.64"

datafusion = "36.0.0"
datafusion-common = "36.0.0"
datafusion-expr = "36.0.0"
datafusion = "37.1.0"
datafusion-common = "37.1.0"
datafusion-expr = "37.1.0"

itertools = ">=0.10.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] }
Expand Down Expand Up @@ -50,22 +50,6 @@ object-store-gcs = ["object_store/gcp"]
object-store-s3 = ["object_store/aws"]
remote-tables = ["dep:datafusion-remote-tables"]

[patch.crates-io]
# Pick up https://github.com/apache/arrow-rs/pull/5282
arrow-arith = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-array = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-buffer = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-cast = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-csv = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-data = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-ipc = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-json = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-ord = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-row = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-schema = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-select = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-string = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }

[dependencies]
arrow = { workspace = true }
arrow-buffer = { workspace = true }
Expand All @@ -80,14 +64,14 @@ async-trait = { workspace = true }
base64 = "0.21.0"

bytes = "1.4.0"
chrono = { version = "0.4", default_features = false }
chrono = { version = "0.4", default-features = false }
clade = { path = "clade" }
clap = { version = "3.2.19", features = [ "derive" ] }
config = "0.13.3"

# PG wire protocol support
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-36-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-36-upgrade", optional = true }
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-37.1-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-37.1-upgrade", optional = true }

dashmap = "5.4.0"

Expand All @@ -97,16 +81,16 @@ datafusion-expr = { workspace = true }

datafusion-remote-tables = { path = "./datafusion_remote_tables", optional = true }

deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "c223bb66dd518fe2f7a6d5ba29e67267aaf95876", features = ["datafusion"] }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "28ad3950d90573fa8ff413c336b657b8561e1d41", features = ["datafusion"] }

futures = "0.3"
hex = ">=0.4.0"
itertools = { workspace = true }
lazy_static = ">=1.4.0"
metrics = { version = "0.22.1" }
metrics-exporter-prometheus = { version = "0.13.1" }
moka = { version = "0.12.5", default_features = false, features = ["future", "atomic64", "quanta"] }
object_store = "0.9"
moka = { version = "0.12.5", default-features = false, features = ["future", "atomic64", "quanta"] }
object_store = "0.9.1"
percent-encoding = "2.2.0"
prost = "0.12.1"

Expand All @@ -122,15 +106,15 @@ rustyline = "13.0"
serde = "1.0.156"
serde_json = "1.0.93"
sha2 = ">=0.10.1"
sqlparser = { version = "0.43", features = ["visitor"] }
sqlparser = { version = "0.44", features = ["visitor"] }
sqlx = { version = "0.7.1", features = [ "runtime-tokio-rustls", "sqlite", "any", "uuid" ] }
strum = ">=0.24"
strum_macros = ">=0.24"
tempfile = "3"
thiserror = "1"
tokio = { workspace = true }
tokio-graceful-shutdown = { version = "0.14" }
tonic = { version = "0.10.0", optional = true }
tonic = { version = "0.11.0", optional = true }
tower = "0.4"
tracing = { workspace = true }
tracing-log = "0.2"
Expand All @@ -152,7 +136,7 @@ aws-credential-types = { version = "1.1.5", features = ["hardcoded-credentials"]
aws-sdk-sts = { version = "1.3.1", features = ["behavior-version-latest"] }
rstest = "*"
serial_test = "2"
tonic-reflection = "0.10"
tonic-reflection = "0.11"
wiremock = "0.5"

[build-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion clade/Cargo.toml
Expand Up @@ -5,7 +5,7 @@ edition = "2021"

[dependencies]
prost = "0.12"
tonic = "0.10"
tonic = "0.11"

[build-dependencies]
tonic-build = "0.10"
2 changes: 1 addition & 1 deletion datafusion_remote_tables/Cargo.toml
Expand Up @@ -19,7 +19,7 @@ arrow-schema = { workspace = true }
async-trait = { workspace = true }

# Remote query execution for a variety of DBs
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-36-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-37.1-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }

datafusion = { workspace = true }
datafusion-common = { workspace = true }
Expand Down
12 changes: 6 additions & 6 deletions datafusion_remote_tables/src/filter_pushdown.rs
Expand Up @@ -5,7 +5,7 @@ use arrow::temporal_conversions::{
use datafusion::common::{Column, DataFusionError};
use datafusion::error::Result;
use datafusion::scalar::ScalarValue;
use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_expr::expr::InList;
use datafusion_expr::{BinaryExpr, Expr, Operator};
use itertools::Itertools;
Expand Down Expand Up @@ -109,9 +109,9 @@ pub trait FilterPushdownConverter {
}

impl<T: FilterPushdownConverter> TreeNodeVisitor for FilterPushdownVisitor<T> {
type N = Expr;
type Node = Expr;

fn pre_visit(&mut self, expr: &Expr) -> Result<VisitRecursion> {
fn f_down(&mut self, expr: &Expr) -> Result<TreeNodeRecursion> {
match expr {
Expr::Column(_)
| Expr::Literal(_)
Expand Down Expand Up @@ -140,10 +140,10 @@ impl<T: FilterPushdownConverter> TreeNodeVisitor for FilterPushdownVisitor<T> {
)));
}
};
Ok(VisitRecursion::Continue)
Ok(TreeNodeRecursion::Continue)
}

fn post_visit(&mut self, expr: &Expr) -> Result<VisitRecursion> {
fn f_up(&mut self, expr: &Expr) -> Result<TreeNodeRecursion> {
match expr {
// Column and Literal are the only two leaf nodes atm - they don't depend on any SQL
// expression being on the stack.
Expand Down Expand Up @@ -235,7 +235,7 @@ impl<T: FilterPushdownConverter> TreeNodeVisitor for FilterPushdownVisitor<T> {
}
_ => {}
};
Ok(VisitRecursion::Continue)
Ok(TreeNodeRecursion::Continue)
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/context/delta.rs
Expand Up @@ -13,7 +13,7 @@ use datafusion::{
error::DataFusionError,
execution::context::TaskContext,
parquet::{arrow::ArrowWriter, file::properties::WriterProperties},
physical_plan::ExecutionPlan,
physical_plan::{ExecutionPlan, ExecutionPlanProperties},
sql::TableReference,
};
use deltalake::kernel::{Action, Add, Schema as DeltaSchema};
Expand All @@ -29,6 +29,7 @@ use object_store::path::Path;
use std::fs::File;
use std::sync::Arc;
use tempfile::{NamedTempFile, TempPath};

use tokio::fs::File as AsyncFile;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::sync::Semaphore;
Expand Down
1 change: 1 addition & 0 deletions src/context/logical.rs
Expand Up @@ -301,6 +301,7 @@ impl SeafowlContext {
source: CopyToSource::Relation(table_name),
target,
options,
..
}) if options.contains(&CONVERT_TO_DELTA) => {
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(SeafowlExtensionNode::ConvertTable(ConvertTable {
Expand Down
37 changes: 21 additions & 16 deletions src/context/physical.rs
Expand Up @@ -38,7 +38,7 @@ use datafusion::{
physical_plan::{ExecutionPlan, SendableRecordBatchStream},
sql::TableReference,
};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Column as ColumnExpr, ResolvedTableReference, SchemaReference};
use datafusion_expr::logical_plan::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
Expand Down Expand Up @@ -279,21 +279,26 @@ impl SeafowlContext {
// are qualified thanks to https://github.com/apache/arrow-datafusion/pull/7316
//
// This leads to a panic unless we strip out the qualifier first.
filter.push(predicate.clone().transform(&|expr| {
Ok(
if let Expr::Column(ColumnExpr {
relation: Some(_),
name,
}) = &expr
{
Transformed::Yes(Expr::Column(
ColumnExpr::new_unqualified(name),
))
} else {
Transformed::No(expr)
},
)
})?);
filter.push(
predicate
.clone()
.transform(&|expr| {
Ok(
if let Expr::Column(ColumnExpr {
relation: Some(_),
name,
}) = &expr
{
Transformed::yes(Expr::Column(
ColumnExpr::new_unqualified(name),
))
} else {
Transformed::no(expr)
},
)
})
.data()?,
);

// A WHERE clause has been used; employ it to prune the update down to only
// a subset of files, while inheriting the rest from the previous version
Expand Down
7 changes: 7 additions & 0 deletions src/datafusion/parser.rs
Expand Up @@ -182,6 +182,10 @@ impl<'a> DFParser<'a> {
source: CopyToSource::Relation(table_name),
target: location,
options: vec![CONVERT_TO_DELTA.clone()],
partitioned_by: vec![],
has_header: false,

stored_as: None,
}))
}

Expand Down Expand Up @@ -238,6 +242,9 @@ impl<'a> DFParser<'a> {
source,
target,
options,
partitioned_by: vec![],
has_header: false,
stored_as: None,
}))
}

Expand Down
28 changes: 13 additions & 15 deletions src/frontend/http.rs
Expand Up @@ -6,7 +6,7 @@ use std::time::Instant;
use std::{net::SocketAddr, sync::Arc};
use warp::{hyper, Rejection};

use arrow::json::writer::record_batches_to_json_rows;
use arrow::json::writer::{LineDelimited, WriterBuilder};
use arrow::record_batch::RecordBatch;
#[cfg(feature = "frontend-arrow-flight")]
use arrow_flight::flight_service_client::FlightServiceClient;
Expand All @@ -18,7 +18,7 @@ use bytes::Buf;
use datafusion::datasource::DefaultTableSource;

use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::FileType;
use datafusion_expr::logical_plan::{LogicalPlan, TableScan};
use deltalake::parquet::data_type::AsBytes;
Expand Down Expand Up @@ -68,12 +68,12 @@ struct ETagBuilderVisitor {
}

impl TreeNodeVisitor for ETagBuilderVisitor {
type N = LogicalPlan;
type Node = LogicalPlan;

fn pre_visit(
fn f_down(
&mut self,
plan: &LogicalPlan,
) -> Result<VisitRecursion, DataFusionError> {
) -> Result<TreeNodeRecursion, DataFusionError> {
if let LogicalPlan::TableScan(TableScan { source, .. }) = plan {
// TODO handle external Parquet tables too
if let Some(default_table_source) =
Expand All @@ -91,7 +91,7 @@ impl TreeNodeVisitor for ETagBuilderVisitor {
}
}
}
Ok(VisitRecursion::Continue)
Ok(TreeNodeRecursion::Continue)
}
}

Expand Down Expand Up @@ -130,14 +130,12 @@ struct QueryBody {
fn batch_to_json(
maybe_batch: Result<RecordBatch, DataFusionError>,
) -> Result<Vec<u8>, ArrowError> {
let batch = maybe_batch?;
let mut buf = Vec::new();
for row in record_batches_to_json_rows(&[&maybe_batch?])? {
buf.extend(
serde_json::to_vec(&row)
.map_err(|error| ArrowError::JsonError(error.to_string()))?,
);
buf.push(b'\n');
}
let mut writer = WriterBuilder::new()
.with_explicit_nulls(true)
.build::<_, LineDelimited>(&mut buf);
writer.write(&batch)?;
Ok(buf)
}

Expand Down Expand Up @@ -1220,7 +1218,7 @@ pub mod tests {
let error_msg = String::from_utf8_lossy(resp.body());
assert_eq!(
error_msg,
"Json error: data type Decimal128(38, 10) not supported in nested map for json writer"
"Invalid argument error: JSON Writer does not support data type: Decimal128(38, 10)"
);
}

Expand Down Expand Up @@ -1681,7 +1679,7 @@ SELECT
assert_eq!(
resp.body(),
&Bytes::from(
r#"{"bigint_val":1000000000,"bool_val":true,"char_val":"c","date_val":"2022-01-01","double_val":12.345678910111213,"float_val":12.345,"int_array_val":[1,2,3,4,5],"integer_val":1000000,"real_val":12.345,"smallint_val":1000,"string_val":"string","text_array_val":["one","two"],"text_val":"text","timestamp_val":"2022-01-01T12:03:11.123456","tinyint_val":1,"varchar_val":"varchar"}
r#"{"tinyint_val":1,"smallint_val":1000,"integer_val":1000000,"bigint_val":1000000000,"char_val":"c","varchar_val":"varchar","text_val":"text","string_val":"string","float_val":12.345,"real_val":12.345,"double_val":12.345678910111213,"bool_val":true,"date_val":"2022-01-01","timestamp_val":"2022-01-01T12:03:11.123456","int_array_val":[1,2,3,4,5],"text_array_val":["one","two"]}
"#
)
);
Expand Down

0 comments on commit 97f07db

Please sign in to comment.