Skip to content

Commit

Permalink
feat: ignore timezone info when copy from external files
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Apr 6, 2023
1 parent f8b6a6b commit ab4c0e8
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 19 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,11 +423,13 @@ pub enum Error {
},

#[snafu(display(
"File Schema mismatch, expected table schema: {} but found :{}",
"File schema mismatch at index {}, expected table schema: {} but found :{}",
index,
table_schema,
file_schema
))]
InvalidSchema {
index: usize,
table_schema: String,
file_schema: String,
},
Expand Down
145 changes: 137 additions & 8 deletions src/datanode/src/sql/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ use common_datasource::util::find_dir_and_filename;
use common_query::Output;
use common_recordbatch::error::DataTypesSnafu;
use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
use datatypes::arrow::datatypes::{DataType, SchemaRef};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::vectors::{Helper, VectorRef};
use futures_util::TryStreamExt;
use regex::Regex;
use snafu::{ensure, ResultExt};
use snafu::ResultExt;
use table::engine::TableReference;
use table::requests::{CopyTableRequest, InsertRequest};
use tokio::io::BufReader;
Expand Down Expand Up @@ -80,13 +81,7 @@ impl SqlHandler {
.await
.context(error::ReadParquetSnafu)?;

ensure!(
builder.schema() == table.schema().arrow_schema(),
error::InvalidSchemaSnafu {
table_schema: table.schema().arrow_schema().to_string(),
file_schema: (*(builder.schema())).to_string()
}
);
ensure_schema_matches_ignore_timezone(builder.schema(), table.schema().arrow_schema())?;

let stream = builder
.build()
Expand Down Expand Up @@ -143,3 +138,137 @@ impl SqlHandler {
Ok(Output::AffectedRows(result.iter().sum()))
}
}

fn ensure_schema_matches_ignore_timezone(left: &SchemaRef, right: &SchemaRef) -> Result<()> {
let not_match = left
.fields
.iter()
.zip(right.fields.iter())
.map(|(l, r)| (l.data_type(), r.data_type()))
.enumerate()
.find(|(_, (l, r))| !data_type_equals_ignore_timezone(l, r));

if let Some((index, _)) = not_match {
error::InvalidSchemaSnafu {
index,
table_schema: left.to_string(),
file_schema: right.to_string(),
}
.fail()
} else {
Ok(())
}
}

fn data_type_equals_ignore_timezone(l: &DataType, r: &DataType) -> bool {
match (l, r) {
(DataType::List(a), DataType::List(b))
| (DataType::LargeList(a), DataType::LargeList(b)) => {
a.is_nullable() == b.is_nullable()
&& data_type_equals_ignore_timezone(a.data_type(), b.data_type())
}
(DataType::FixedSizeList(a, a_size), DataType::FixedSizeList(b, b_size)) => {
a_size == b_size
&& a.is_nullable() == b.is_nullable()
&& data_type_equals_ignore_timezone(a.data_type(), b.data_type())
}
(DataType::Struct(a), DataType::Struct(b)) => {
a.len() == b.len()
&& a.iter().zip(b).all(|(a, b)| {
a.is_nullable() == b.is_nullable()
&& data_type_equals_ignore_timezone(a.data_type(), b.data_type())
})
}
(DataType::Map(a_field, a_is_sorted), DataType::Map(b_field, b_is_sorted)) => {
a_field == b_field && a_is_sorted == b_is_sorted
}
(DataType::Timestamp(l_unit, _), DataType::Timestamp(r_unit, _)) => l_unit == r_unit,
_ => l == r,
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use datatypes::arrow::datatypes::{Field, Schema};

use super::*;

fn test_schema_matches(l: (DataType, bool), r: (DataType, bool), matches: bool) {
let s1 = Arc::new(Schema::new(vec![Field::new("col", l.0, l.1)]));
let s2 = Arc::new(Schema::new(vec![Field::new("col", r.0, r.1)]));
let res = ensure_schema_matches_ignore_timezone(&s1, &s2);
assert_eq!(matches, res.is_ok())
}

#[test]
fn test_ensure_datatype_matches_ignore_timezone() {
test_schema_matches(
(
DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
true,
),
(
DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
true,
),
true,
);

test_schema_matches(
(
DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Second,
Some("UTC".to_string()),
),
true,
),
(
DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
true,
),
true,
);

test_schema_matches(
(
DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Second,
Some("UTC".to_string()),
),
true,
),
(
DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Second,
Some("PDT".to_string()),
),
true,
),
true,
);

test_schema_matches(
(
DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Second,
Some("UTC".to_string()),
),
true,
),
(
DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Millisecond,
Some("UTC".to_string()),
),
true,
),
false,
);

test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true);

test_schema_matches((DataType::Int8, true), (DataType::Int16, true), false);
}
}
1 change: 1 addition & 0 deletions src/datatypes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ test = []

[dependencies]
arrow.workspace = true
arrow-array = "36"
arrow-schema.workspace = true
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
Expand Down
22 changes: 12 additions & 10 deletions src/datatypes/src/vectors/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,16 +238,18 @@ impl Helper {
ArrowDataType::Date64 => Arc::new(DateTimeVector::try_from_arrow_array(array)?),
ArrowDataType::List(_) => Arc::new(ListVector::try_from_arrow_array(array)?),
ArrowDataType::Timestamp(unit, _) => match unit {
TimeUnit::Second => Arc::new(TimestampSecondVector::try_from_arrow_array(array)?),
TimeUnit::Millisecond => {
Arc::new(TimestampMillisecondVector::try_from_arrow_array(array)?)
}
TimeUnit::Microsecond => {
Arc::new(TimestampMicrosecondVector::try_from_arrow_array(array)?)
}
TimeUnit::Nanosecond => {
Arc::new(TimestampNanosecondVector::try_from_arrow_array(array)?)
}
TimeUnit::Second => Arc::new(
TimestampSecondVector::try_from_arrow_timestamp_array(array)?,
),
TimeUnit::Millisecond => Arc::new(
TimestampMillisecondVector::try_from_arrow_timestamp_array(array)?,
),
TimeUnit::Microsecond => Arc::new(
TimestampMicrosecondVector::try_from_arrow_timestamp_array(array)?,
),
TimeUnit::Nanosecond => Arc::new(
TimestampNanosecondVector::try_from_arrow_timestamp_array(array)?,
),
},
ArrowDataType::Float16
| ArrowDataType::Time32(_)
Expand Down
45 changes: 45 additions & 0 deletions src/datatypes/src/vectors/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use std::sync::Arc;

use arrow::array::{
Array, ArrayBuilder, ArrayData, ArrayIter, ArrayRef, PrimitiveArray, PrimitiveBuilder,
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use arrow_schema::DataType;
use serde_json::Value as JsonValue;
use snafu::OptionExt;

Expand Down Expand Up @@ -70,6 +73,48 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
Ok(Self::new(concrete_array))
}

/// Converts arrow timestamp array to vectors, ignoring time zone info.
pub fn try_from_arrow_timestamp_array(array: impl AsRef<dyn Array>) -> Result<Self> {
let array = array.as_ref();
let array_data = match array.data_type() {
DataType::Timestamp(unit, _) => match unit {
arrow_schema::TimeUnit::Second => array
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.with_timezone_opt(None)
.data()
.clone(),
arrow_schema::TimeUnit::Millisecond => array
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.with_timezone_opt(None)
.data()
.clone(),
arrow_schema::TimeUnit::Microsecond => array
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.with_timezone_opt(None)
.data()
.clone(),
arrow_schema::TimeUnit::Nanosecond => array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.with_timezone_opt(None)
.data()
.clone(),
},
_ => {
unreachable!()
}
};
let concrete_array = PrimitiveArray::<T::ArrowPrimitive>::from(array_data);
Ok(Self::new(concrete_array))
}

pub fn from_slice<P: AsRef<[T::Native]>>(slice: P) -> Self {
let iter = slice.as_ref().iter().copied();
Self {
Expand Down

0 comments on commit ab4c0e8

Please sign in to comment.