Skip to content

Commit

Permalink
draft: Fail Unsupported Columns (#318) (#323)
Browse files Browse the repository at this point in the history
# Fail Unsupported Columns (#318)

Updates the `FileService` and `PrepareService` to fail for unsupported
columns (Decimal) rather than silently drop the columns.
  • Loading branch information
kevinjnguyen committed May 9, 2023
2 parents 7433cbb + 99fa46d commit 00c2c6b
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 127 deletions.
7 changes: 5 additions & 2 deletions crates/sparrow-main/src/serve/file_service.rs
Expand Up @@ -64,7 +64,7 @@ async fn get_metadata(
let file_metadata = match request.source_data {
Some(source_data) => get_source_metadata(&object_store_registry, &source_data)
.await
.or_else(|_| anyhow::bail!("failed getting source metadata")),
.or_else(|e| anyhow::bail!("failed getting source metadata: {}", e)),
None => anyhow::bail!("missing request source"),
}?;

Expand All @@ -90,7 +90,10 @@ pub(crate) async fn get_source_metadata(
let metadata = RawMetadata::try_from(source, object_store_registry)
.await
.attach_printable_lazy(|| format!("Source: {:?}", source))
.change_context(Error::Schema("unable to get raw metadata".to_owned()))?;
.change_context(Error::Schema(format!(
"unable to read schema from: {:?}",
source
)))?;
let schema = Schema::try_from(metadata.table_schema.as_ref())
.into_report()
.attach_printable_lazy(|| {
Expand Down
52 changes: 8 additions & 44 deletions crates/sparrow-main/tests/e2e/parquet_tests.rs
Expand Up @@ -6,6 +6,7 @@ use uuid::Uuid;

use crate::{DataFixture, QueryFixture};

#[tokio::test]
/// Create a table from a Parquet Decimal column backed by a fixed length array.
///
/// This uses multiple files to ensure we trigger merge code which may have
Expand Down Expand Up @@ -60,8 +61,8 @@ use crate::{DataFixture, QueryFixture};
/// ```
///
/// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
pub(crate) async fn decimal_fixed_len_multipart_data_fixture() -> DataFixture {
DataFixture::new()
async fn test_decimal_column_fails_prepare() {
let data_fixture = DataFixture::new()
.with_table_from_files(
TableConfig::new(
"Numbers",
Expand All @@ -76,48 +77,11 @@ pub(crate) async fn decimal_fixed_len_multipart_data_fixture() -> DataFixture {
"regressions/decimal_fixed_len_part2.parquet",
],
)
.await
.unwrap()
}

#[tokio::test]
async fn test_decimal_fixed_len_ignored() {
// This test shows that a query with decimal columns and multiple files works.
// This requires ignoring the fields because they can't be merged.
insta::assert_snapshot!(QueryFixture::new("Numbers").run_to_csv(&decimal_fixed_len_multipart_data_fixture().await).await.unwrap(), @r###"
_time,_subsort,_key_hash,_key,time,subsort,key,x
1996-12-20T00:39:57.000000000,9223372036854775808,3650215962958587783,A,1996-12-20T00:39:57.000000000,0,A,5.0
1996-12-20T00:39:58.000000000,9223372036854775808,11753611437813598533,B,1996-12-20T00:39:58.000000000,0,B,8.0
1996-12-20T00:39:59.000000000,9223372036854775808,3650215962958587783,A,1996-12-20T00:39:59.000000000,0,A,10.0
1996-12-20T00:40:00.000000000,9223372036854775808,3650215962958587783,A,1996-12-20T00:40:00.000000000,0,A,
1996-12-20T00:40:01.000000000,9223372036854775808,3650215962958587783,A,1996-12-20T00:40:01.000000000,0,A,11.0
1996-12-20T00:40:02.000000000,9223372036854775808,3650215962958587783,A,1996-12-20T00:40:02.000000000,0,A,
1997-12-20T00:39:57.000000000,9223372036854775808,3650215962958587783,A,1997-12-20T00:39:57.000000000,0,A,5.0
1997-12-20T00:39:58.000000000,9223372036854775808,11753611437813598533,B,1997-12-20T00:39:58.000000000,0,B,8.0
1997-12-20T00:39:59.000000000,9223372036854775808,3650215962958587783,A,1997-12-20T00:39:59.000000000,0,A,10.0
1997-12-20T00:40:00.000000000,9223372036854775808,3650215962958587783,A,1997-12-20T00:40:00.000000000,0,A,
1997-12-20T00:40:01.000000000,9223372036854775808,3650215962958587783,A,1997-12-20T00:40:01.000000000,0,A,11.0
1997-12-20T00:40:02.000000000,9223372036854775808,3650215962958587783,A,1997-12-20T00:40:02.000000000,0,A,
"###)
}

#[tokio::test]
async fn test_decimal_fixed_len_unused() {
insta::assert_snapshot!(QueryFixture::new("{ x: Numbers.x, sqrt_x: sqrt(Numbers.x) }").run_to_csv(&decimal_fixed_len_multipart_data_fixture().await).await.unwrap(), @r###"
_time,_subsort,_key_hash,_key,x,sqrt_x
1996-12-20T00:39:57.000000000,9223372036854775808,3650215962958587783,A,5.0,2.23606797749979
1996-12-20T00:39:58.000000000,9223372036854775808,11753611437813598533,B,8.0,2.8284271247461903
1996-12-20T00:39:59.000000000,9223372036854775808,3650215962958587783,A,10.0,3.1622776601683795
1996-12-20T00:40:00.000000000,9223372036854775808,3650215962958587783,A,,
1996-12-20T00:40:01.000000000,9223372036854775808,3650215962958587783,A,11.0,3.3166247903554
1996-12-20T00:40:02.000000000,9223372036854775808,3650215962958587783,A,,
1997-12-20T00:39:57.000000000,9223372036854775808,3650215962958587783,A,5.0,2.23606797749979
1997-12-20T00:39:58.000000000,9223372036854775808,11753611437813598533,B,8.0,2.8284271247461903
1997-12-20T00:39:59.000000000,9223372036854775808,3650215962958587783,A,10.0,3.1622776601683795
1997-12-20T00:40:00.000000000,9223372036854775808,3650215962958587783,A,,
1997-12-20T00:40:01.000000000,9223372036854775808,3650215962958587783,A,11.0,3.3166247903554
1997-12-20T00:40:02.000000000,9223372036854775808,3650215962958587783,A,,
"###)
.await;
assert_eq!(
data_fixture.err().unwrap().to_string(),
"Internal error: invalid schema provided\n"
);
}

#[tokio::test]
Expand Down
128 changes: 75 additions & 53 deletions crates/sparrow-runtime/src/metadata/raw_metadata.rs
Expand Up @@ -11,7 +11,6 @@ use tempfile::NamedTempFile;
use sparrow_api::kaskada::v1alpha::source_data::{self, Source};

use sparrow_api::kaskada::v1alpha::PulsarConfig;
use tracing::info;

use crate::execute::pulsar_schema;
use crate::metadata::file_from_path;
Expand All @@ -32,8 +31,10 @@ pub enum Error {
ReadSchema,
#[display(fmt = "pulsar subscription error")]
PulsarSubscription,
#[display(fmt = "Failed to get pulsar schema: {_0}")]
#[display(fmt = "failed to get pulsar schema: {_0}")]
PulsarSchema(String),
#[display(fmt = "unsupport column detected: '{_0}")]
UnsupportedColumn(String),
}

impl error_stack::Context for Error {}
Expand Down Expand Up @@ -85,14 +86,14 @@ impl RawMetadata {
}

/// Create `RawMetadata` from a raw schema.
pub fn from_raw_schema(raw_schema: SchemaRef) -> Self {
pub fn from_raw_schema(raw_schema: SchemaRef) -> error_stack::Result<Self, Error> {
// Convert the raw schema to a table schema.
let table_schema = convert_schema(raw_schema.as_ref());
let table_schema = convert_schema(raw_schema.as_ref())?;

Self {
Ok(Self {
raw_schema,
table_schema,
}
})
}

/// Create a `RawMetadata` from a parquet string path and object store registry
Expand All @@ -117,8 +118,6 @@ impl RawMetadata {
let path = format!("/{}", path);
let path = std::path::Path::new(&path);
Self::try_from_parquet_path(path)
.into_report()
.change_context_lazy(|| Error::ReadSchema)
}
_ => {
let download_file = NamedTempFile::new().map_err(|_| Error::Download)?;
Expand All @@ -127,8 +126,6 @@ impl RawMetadata {
.await
.change_context_lazy(|| Error::Download)?;
Self::try_from_parquet_path(download_file.path())
.into_report()
.change_context_lazy(|| Error::ReadSchema)
}
}
}
Expand Down Expand Up @@ -195,16 +192,20 @@ impl RawMetadata {

Ok(PulsarMetadata {
user_schema: Arc::new(pulsar_schema),
sparrow_metadata: Self::from_raw_schema(Arc::new(Schema::new(new_fields))),
sparrow_metadata: Self::from_raw_schema(Arc::new(Schema::new(new_fields)))?,
})
}

/// Create a `RawMetadata` fram a Parquet file path.
fn try_from_parquet_path(path: &std::path::Path) -> anyhow::Result<Self> {
let file = file_from_path(path)?;
let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?;
fn try_from_parquet_path(path: &std::path::Path) -> error_stack::Result<Self, Error> {
let file = file_from_path(path)
.into_report()
.change_context_lazy(|| Error::LocalFile)?;
let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
.into_report()
.change_context_lazy(|| Error::ReadSchema)?;
let raw_schema = parquet_reader.schema();
Ok(Self::from_raw_schema(raw_schema.clone()))
Self::from_raw_schema(raw_schema.clone())
}

/// Create a `RawMetadata` from a reader of a CSV file or string.
Expand All @@ -229,12 +230,20 @@ impl RawMetadata {
.change_context_lazy(|| Error::ReadSchema)?;

let raw_schema = raw_reader.schema();
Ok(Self::from_raw_schema(raw_schema))
Self::from_raw_schema(raw_schema)
}
}

/// Converts the schema to special case Timestamps fields.
///
/// Converts the schema to a table schema
fn convert_schema(schema: &Schema) -> error_stack::Result<SchemaRef, Error> {
let fields = schema
.fields()
.iter()
.map(convert_field)
.collect::<Result<Vec<_>, _>>()?;
Ok(Arc::new(Schema::new(fields)))
}

/// Arrow doesn't support time zones very well; it assumes all have a time zone
/// of `None`, which will use system time. Sparrow only operates on
/// [arrow::datatypes::TimestampNanosecondType], and currently cannot pass
Expand All @@ -246,36 +255,31 @@ impl RawMetadata {
/// Arrow also does not support Decimal types. As of now, we are currently
/// dropping the columns that are Decimal types since we do not support at query
/// time either.
fn convert_schema(schema: &Schema) -> SchemaRef {
let fields = schema
.fields()
.iter()
.filter_map(|field| {
match field.data_type() {
DataType::Timestamp(time_unit, Some(tz)) => {
// TODO: We discard this because the conversion from an Arrow
// schema to the Schema protobuf currently fails on such timestamp columns.
info!(
"Discarding time zone {:?} on timestamp column '{}'",
tz,
field.name()
);
Some(Field::new(
field.name(),
DataType::Timestamp(time_unit.clone(), None),
field.is_nullable(),
))
}
DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => {
// TODO: Support decimal columns
info!("Discarding decimal column '{}'", field.name());
None
}
_ => Some(field.clone()),
}
})
.collect();
Arc::new(Schema::new(fields))
fn convert_field(field: &Field) -> error_stack::Result<Field, Error> {
match field.data_type() {
DataType::Timestamp(time_unit, Some(tz)) => {
// TODO: We discard this because the conversion from an Arrow
// schema to the Schema protobuf currently fails on such timestamp columns.
tracing::warn!(
"Time zones are unsupported. Interpreting column '{}' with time zone '{}' as UTC",
tz,
field.name()
);
Ok(Field::new(
field.name(),
DataType::Timestamp(time_unit.clone(), None),
field.is_nullable(),
))
}
DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => {
tracing::warn!("Decimal columns are unsupported: '{}'", field.name());
error_stack::bail!(Error::UnsupportedColumn(format!(
"Decimal columns are unsupported: {}",
field.name()
)))
}
_ => Ok(field.clone()),
}
}

#[cfg(test)]
Expand All @@ -301,7 +305,7 @@ mod tests {
Field::new("c", DataType::Int64, true),
]));

let metadata = RawMetadata::from_raw_schema(raw_schema.clone());
let metadata = RawMetadata::from_raw_schema(raw_schema.clone()).unwrap();
assert_eq!(metadata.raw_schema, raw_schema);
assert_eq!(metadata.table_schema, raw_schema);
}
Expand All @@ -316,8 +320,6 @@ mod tests {
DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned())),
false,
),
// Decimal column should be dropped.
Field::new("decimal", DataType::Decimal128(10, 12), false),
Field::new("subsort", DataType::UInt64, false),
Field::new("key", DataType::UInt64, false),
Field::new("a", DataType::Int64, true),
Expand All @@ -339,7 +341,7 @@ mod tests {
Field::new("c", DataType::Int64, true),
]));

let metadata = RawMetadata::from_raw_schema(raw_schema.clone());
let metadata = RawMetadata::from_raw_schema(raw_schema.clone()).unwrap();
assert_eq!(metadata.raw_schema, raw_schema);
assert_eq!(metadata.table_schema, converted_schema);
}
Expand Down Expand Up @@ -395,8 +397,28 @@ mod tests {
),
]));

let metadata = RawMetadata::from_raw_schema(raw_schema.clone());
let metadata = RawMetadata::from_raw_schema(raw_schema.clone()).unwrap();
assert_eq!(metadata.raw_schema, raw_schema);
assert_eq!(metadata.table_schema, converted_schema);
}

#[test]
fn test_raw_metadata_decimal_errors() {
let raw_schema = Arc::new(Schema::new(vec![Field::new(
"decimal_col",
DataType::Decimal128(0, 0),
false,
)]));

let metadata = RawMetadata::from_raw_schema(raw_schema.clone());
match metadata {
Ok(_) => panic!("should not have succeeded"),
Err(e) => {
assert_eq!(
e.as_error().to_string(),
"unsupport column detected: 'Decimal columns are unsupported: decimal_col"
)
}
}
}
}
6 changes: 4 additions & 2 deletions crates/sparrow-runtime/src/prepare.rs
Expand Up @@ -426,7 +426,8 @@ fn reader_from_parquet<'a, R: parquet::file::reader::ChunkReader + 'static>(
let batch_size = get_batch_size(num_rows, num_files);
let parquet_reader = parquet_reader.with_batch_size(batch_size);

let raw_metadata = RawMetadata::from_raw_schema(parquet_reader.schema().clone());
let raw_metadata = RawMetadata::from_raw_schema(parquet_reader.schema().clone())
.change_context_lazy(|| Error::ReadSchema)?;
let reader = parquet_reader
.build()
.into_report()
Expand Down Expand Up @@ -457,7 +458,8 @@ fn reader_from_csv<'a, R: std::io::Read + std::io::Seek + Send + 'static>(
.build(reader)
.into_report()
.change_context(Error::CreateCsvReader)?;
let raw_metadata = RawMetadata::from_raw_schema(reader.schema());
let raw_metadata =
RawMetadata::from_raw_schema(reader.schema()).change_context_lazy(|| Error::ReadSchema)?;
let stream_reader = futures::stream::iter(reader);

PrepareIter::try_new(stream_reader, config, raw_metadata, prepare_hash, slice)
Expand Down
2 changes: 2 additions & 0 deletions crates/sparrow-runtime/src/prepare/error.rs
Expand Up @@ -26,6 +26,8 @@ pub enum Error {
SortingBatch,
#[display(fmt = "determine metadata")]
DetermineMetadata,
#[display(fmt = "invalid schema provided")]
ReadSchema,
#[display(fmt = "failed to write Parquet file")]
WriteParquetData,
#[display(fmt = "failed to write metadata file")]
Expand Down
25 changes: 0 additions & 25 deletions wren/compute/manager.go
Expand Up @@ -18,7 +18,6 @@ import (
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
_ "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/runtime/protoiface"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -690,30 +689,6 @@ func reMapSparrowError(ctx context.Context, err error) error {
return outStatus.Err()
}

// converts diagnostics in non-executable responses into error details to preserve legacy behavior
// TODO: update the python client to be able to handle non-executable responses in the response body
// TODO: remove this and pass back non-executable responses in the response body instead of as an error
func (m *Manager) ReMapAnalysisError(ctx context.Context, analysis *v1alpha.Analysis) error {
subLogger := log.Ctx(ctx).With().Str("method", "manager.reMapAnalysisError").Logger()
if analysis != nil {
if !analysis.CanExecute {
if analysis.FenlDiagnostics != nil {
diagCount := len(analysis.FenlDiagnostics.FenlDiagnostics)
if diagCount > 0 {
outStatus := status.New(codes.InvalidArgument, fmt.Sprintf("%d errors in Fenl statements; see error details", diagCount))
outStatus, err := outStatus.WithDetails(analysis.FenlDiagnostics)
if err != nil {
subLogger.Error().Err(err).Interface("fenl_diagnostics", analysis.FenlDiagnostics).Msg("unable to add diagnostic to re-mapped error details")
}
return outStatus.Err()
}
}
return status.Error(codes.Internal, "internal compute error")
}
}
return nil
}

func (m *Manager) getTablesForCompile(ctx context.Context, owner *ent.Owner) ([]*v1alpha.ComputeTable, error) {
subLogger := log.Ctx(ctx).With().Str("method", "manager.getTablesForCompile").Logger()
computeTables := []*v1alpha.ComputeTable{}
Expand Down

0 comments on commit 00c2c6b

Please sign in to comment.