Skip to content

Commit

Permalink
Fix parsing struct stats after schema evolution (#901)
Browse files Browse the repository at this point in the history
# Description
When a delta table's schema is evolved the struct stat schemas in
checkpoints are also evolved. Since the struct stats are stored in a
columnar way adding a single file with the new columns will cause nulls
to appear in the struct stats for all other files. This is a significant
difference compared to the json stats.

Unfortunately I overlooked this in
#656 for both nullCounts and
min/max values. This caused parsed struct stats to have extra columns
full of nulls. I don't know if this was actually an issue at all but it
should be fixed even if just for the sake of the warnings spam.
```
[2022-10-24T22:13:22Z WARN  deltalake::action::parquet_read] Expect type of nullCount field to be struct or int64, got: null
[2022-10-24T22:13:22Z WARN  deltalake::action::parquet_read] Expect type of nullCount field to be struct or int64, got: null
[2022-10-24T22:13:22Z WARN  deltalake::action::parquet_read] Expect type of nullCount field to be struct or int64, got: null
[2022-10-24T22:13:22Z WARN  deltalake::action::parquet_read] Expect type of nullCount field to be struct or int64, got: null
[2022-10-24T22:13:22Z WARN  deltalake::action::parquet_read] Expect type of nullCount field to be struct or int64, got: null
``` 

# Related Issue(s)
- Relates to #653 but for the
most part its an already solved issue.

# Changes:
- Replace the test data with similar test data that includes a schema
evolution.
- Add error handling for min/max values to ensure warnings will be
logged for other unexpected types (there probably shouldn't be any). As
a total rust noob I originally filled with nulls but I think that was a
mistake.
- Ignore nulls for min/max stats and null count stats since these are
expected after schema evolution and should be ignored without logging a
warning.

Usual disclaimer on a PR from me: I don't know what I'm doing writing
rust code. (thanks to wjones for tidying up my dodgy rust code 🙂)

Co-authored-by: Will Jones <willjones127@gmail.com>
  • Loading branch information
Tom-Newton and wjones127 committed Nov 3, 2022
1 parent ab19515 commit fae50cc
Show file tree
Hide file tree
Showing 42 changed files with 121 additions and 112 deletions.
163 changes: 79 additions & 84 deletions rust/src/action/parquet_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,50 +142,43 @@ impl Add {

for (i, (name, _)) in record.get_column_iter().enumerate() {
match name.as_str() {
"numRecords" => match record.get_long(i) {
Ok(v) => {
"numRecords" => if let Ok(v) = record.get_long(i) {
stats.num_records = v;
}
_ => {
} else {
log::error!("Expect type of stats_parsed field numRecords to be long, got: {}", record);
}
}
"minValues" => match record.get_group(i) {
Ok(row) => {
for (name, field) in row.get_column_iter() {
stats.min_values.insert(name.clone(), field.into());
"minValues" => if let Ok(row) = record.get_group(i) {
for (name, field) in row.get_column_iter() {
if !matches!(field, Field::Null) {
if let Some(values) = field_to_value_stat(field, name) {
stats.min_values.insert(name.clone(), values);
}
}
}
_ => {
log::error!("Expect type of stats_parsed field minRecords to be struct, got: {}", record);
}
} else {
log::error!("Expect type of stats_parsed field minRecords to be struct, got: {}", record);
}
"maxValues" => match record.get_group(i) {
Ok(row) => {
for (name, field) in row.get_column_iter() {
stats.max_values.insert(name.clone(), field.into());
"maxValues" => if let Ok(row) = record.get_group(i) {
for (name, field) in row.get_column_iter() {
if !matches!(field, Field::Null) {
if let Some(values) = field_to_value_stat(field, name) {
stats.max_values.insert(name.clone(), values);
}
}
}
_ => {
log::error!("Expect type of stats_parsed field maxRecords to be struct, got: {}", record);
}
} else {
log::error!("Expect type of stats_parsed field maxRecords to be struct, got: {}", record);
}
"nullCount" => match record.get_group(i) {
Ok(row) => {
for (name, field) in row.get_column_iter() {
match field.try_into() {
Ok(count) => {
stats.null_count.insert(name.clone(), count);
},
_ => {
log::warn!("Expect type of nullCount field to be struct or int64, got: {}", field);
},
};
"nullCount" => if let Ok(row) = record.get_group(i) {
for (name, field) in row.get_column_iter() {
if !matches!(field, Field::Null) {
if let Some(count) = field_to_count_stat(field, name) {
stats.null_count.insert(name.clone(), count);
}
}
}
_ => {
log::error!("Expect type of stats_parsed field nullCount to be struct, got: {}", record);
}
} else {
log::error!("Expect type of stats_parsed field nullCount to be struct, got: {}", record);
}
_ => {
log::warn!(
Expand All @@ -202,68 +195,70 @@ impl Add {
}
}

impl From<&Field> for ColumnValueStat {
fn from(field: &Field) -> Self {
match field {
Field::Group(group) => ColumnValueStat::Column(HashMap::from_iter(
group
.get_column_iter()
.map(|(field_name, field)| (field_name.clone(), field.into())),
)),
_ => ColumnValueStat::Value(primitive_parquet_field_to_json_value(field)),
fn field_to_value_stat(field: &Field, field_name: &str) -> Option<ColumnValueStat> {
match field {
Field::Group(group) => {
let values = group.get_column_iter().filter_map(|(name, sub_field)| {
field_to_value_stat(sub_field, name).map(|val| (name.clone(), val))
});
Some(ColumnValueStat::Column(HashMap::from_iter(values)))
}
_ => {
if let Ok(val) = primitive_parquet_field_to_json_value(field) {
Some(ColumnValueStat::Value(val))
} else {
log::warn!(
"Unexpected type when parsing min/max values for {}. Found {}",
field_name,
field
);
None
}
}
}
}

impl TryFrom<&Field> for ColumnCountStat {
type Error = &'static str;

fn try_from(field: &Field) -> Result<Self, Self::Error> {
match field {
Field::Group(group) => Ok(ColumnCountStat::Column(HashMap::from_iter(
group
.get_column_iter()
.filter_map(|(field_name, field)| match field.try_into() {
Ok(value) => Some((field_name.clone(), value)),
_ => {
log::warn!(
"Unexpected type when parsing nullCounts for {}. Found {}",
field_name,
field
);
None
}
}),
))),
Field::Long(value) => Ok(ColumnCountStat::Value(*value)),
_ => Err("Invalid type for nullCounts"),
fn field_to_count_stat(field: &Field, field_name: &str) -> Option<ColumnCountStat> {
match field {
Field::Group(group) => {
let counts = group.get_column_iter().filter_map(|(name, sub_field)| {
field_to_count_stat(sub_field, name).map(|count| (name.clone(), count))
});
Some(ColumnCountStat::Column(HashMap::from_iter(counts)))
}
Field::Long(value) => Some(ColumnCountStat::Value(*value)),
_ => {
log::warn!(
"Unexpected type when parsing nullCounts for {}. Found {}",
field_name,
field
);
None
}
}
}

fn primitive_parquet_field_to_json_value(field: &Field) -> serde_json::Value {
fn primitive_parquet_field_to_json_value(field: &Field) -> Result<serde_json::Value, &'static str> {
match field {
Field::Null => serde_json::Value::Null,
Field::Bool(value) => json!(value),
Field::Byte(value) => json!(value),
Field::Short(value) => json!(value),
Field::Int(value) => json!(value),
Field::Long(value) => json!(value),
Field::Float(value) => json!(value),
Field::Double(value) => json!(value),
Field::Str(value) => json!(value),
Field::Bool(value) => Ok(json!(value)),
Field::Byte(value) => Ok(json!(value)),
Field::Short(value) => Ok(json!(value)),
Field::Int(value) => Ok(json!(value)),
Field::Long(value) => Ok(json!(value)),
Field::Float(value) => Ok(json!(value)),
Field::Double(value) => Ok(json!(value)),
Field::Str(value) => Ok(json!(value)),
Field::Decimal(decimal) => match BigInt::from_signed_bytes_be(decimal.data()).to_f64() {
Some(int) => json!(int / (10_i64.pow((decimal.scale()).try_into().unwrap()) as f64)),
_ => serde_json::Value::Null,
Some(int) => Ok(json!(
int / (10_i64.pow((decimal.scale()).try_into().unwrap()) as f64)
)),
_ => Err("Invalid type for min/max values."),
},
Field::TimestampMillis(timestamp) => {
serde_json::Value::String(convert_timestamp_millis_to_string(*timestamp))
}
Field::Date(date) => serde_json::Value::String(convert_date_to_string(*date)),
_ => {
log::warn!("Unexpected field type {:?}", field,);
serde_json::Value::Null
}
Field::TimestampMillis(timestamp) => Ok(serde_json::Value::String(
convert_timestamp_millis_to_string(*timestamp),
)),
Field::Date(date) => Ok(serde_json::Value::String(convert_date_to_string(*date))),
_ => Err("Invalid type for min/max values."),
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"tableSizeBytes":0,"numFiles":0,"numMetadata":1,"numProtocol":1,"protocol":{"minReaderVersion":1,"minWriterVersion":2},"metadata":{"id":"8d3d2b8a-f091-4d7d-8a37-432a9beaf17b","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"integer\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpoint.writeStatsAsJson":"false","delta.checkpoint.writeStatsAsStruct":"true"},"createdTime":1666652369483},"histogramOpt":{"sortedBinBoundaries":[0,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,12582912,16777216,20971520,25165824,29360128,33554432,37748736,41943040,50331648,58720256,67108864,75497472,83886080,92274688,100663296,109051904,117440512,125829120,130023424,134217728,138412032,142606336,146800640,150994944,167772160,184549376,201326592,218103808,234881024,251658240,268435456,285212672,301989888,318767104,335544320,352321536,369098752,385875968,402653184,419430400,436207616,452984832,469762048,486539264,503316480,520093696,536870912,553648128,570425344,587202560,603979776,671088640,738197504,805306368,872415232,939524096,1006632960,1073741824,1140850688,1207959552,1275068416,1342177280,1409286144,1476395008,1610612736,1744830464,1879048192,2013265920,2147483648,2415919104,2684354560,2952790016,3221225472,3489660928,3758096384,4026531840,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944],"fileCounts":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"totalBytes":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"commitInfo":{"timestamp":1666652369577,"userId":"6114986638742036","userName":"dummy_username","operation":"CREATE OR REPLACE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{\"delta.checkpoint.writeStatsAsJson\":\"false\",\"delta.checkpoint.writeStatsAsStruct\":\"true\"}"},"notebook":{"notebookId":"1829280694121074"},"clusterId":"1007-161845-fa2h8e50","isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/10.4.x-scala2.12","txnId":"a8510a45-92dc-4e9f-9f7a-42bbcc9b752d"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"9c4df48c-6085-4dcf-b73e-13147a5a405e","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"integer\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpoint.writeStatsAsJson":"false","delta.checkpoint.writeStatsAsStruct":"true"},"createdTime":1656252116073}}
{"commitInfo":{"timestamp":1656252116149,"userId":"6114986638742036","userName":"tomnewton@wayve.ai","operation":"CREATE OR REPLACE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{\"delta.checkpoint.writeStatsAsJson\":\"false\",\"delta.checkpoint.writeStatsAsStruct\":\"true\"}"},"notebook":{"notebookId":"1829280694121074"},"clusterId":"0622-151429-s7rz8ws","isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/10.4.x-scala2.12","txnId":"af1b716b-5ec8-41f6-9cc2-bb89e010f943"}}
{"metaData":{"id":"8d3d2b8a-f091-4d7d-8a37-432a9beaf17b","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"integer\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpoint.writeStatsAsJson":"false","delta.checkpoint.writeStatsAsStruct":"true"},"createdTime":1666652369483}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"tableSizeBytes":5489,"numFiles":1,"numMetadata":1,"numProtocol":1,"protocol":{"minReaderVersion":1,"minWriterVersion":2},"metadata":{"id":"8d3d2b8a-f091-4d7d-8a37-432a9beaf17b","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"integer\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"null\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"boolean\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal\",\"type\":\"decimal(8,5)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"string\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"binary\",\"type\":\"binary\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}},{\"name\":\"timestamp\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"struct\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"struct_element\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"map\",\"type\":{\"type\":\"map\",\"keyType\":\"string\",\"valueType\":\"string\",\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"array\",\"type\":{\"type\":\"array\",\"elementType\":\"string\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"nested_struct\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"struct_element\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"nested_struct_element\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"struct_of_array_of_map\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"struct_element\",\"type\":{\"type\":\"array\",\"elementType\":{\"type\":\"map\",\"keyType\":\"string\",\"valueType\":\"string\",\"valueContainsNull\":true},\"containsNull\":true},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpoint.writeStatsAsJson":"false","delta.checkpoint.writeStatsAsStruct":"true"},"createdTime":1666652369483},"histogramOpt":{"sortedBinBoundaries":[0,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,12582912,16777216,20971520,25165824,29360128,33554432,37748736,41943040,50331648,58720256,67108864,75497472,83886080,92274688,100663296,109051904,117440512,125829120,130023424,134217728,138412032,142606336,146800640,150994944,167772160,184549376,201326592,218103808,234881024,251658240,268435456,285212672,301989888,318767104,335544320,352321536,369098752,385875968,402653184,419430400,436207616,452984832,469762048,486539264,503316480,520093696,536870912,553648128,570425344,587202560,603979776,671088640,738197504,805306368,872415232,939524096,1006632960,1073741824,1140850688,1207959552,1275068416,1342177280,1409286144,1476395008,1610612736,1744830464,1879048192,2013265920,2147483648,2415919104,2684354560,2952790016,3221225472,3489660928,3758096384,4026531840,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944],"fileCounts":[1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"totalBytes":[5489,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}}

0 comments on commit fae50cc

Please sign in to comment.