Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
add avro record names when converting arrow schema to avro
Browse files Browse the repository at this point in the history
  • Loading branch information
Samrose-Ahmed committed Nov 13, 2022
1 parent c23d813 commit 1982195
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 10 deletions.
26 changes: 16 additions & 10 deletions src/io/avro/write/schema.rs
Expand Up @@ -8,10 +8,11 @@ use crate::error::{Error, Result};

/// Converts a [`Schema`] to an Avro [`Record`].
pub fn to_record(schema: &Schema) -> Result<Record> {
let mut name_counter: i32 = 0;
let fields = schema
.fields
.iter()
.map(field_to_field)
.map(|f| field_to_field(&f, &mut name_counter))
.collect::<Result<_>>()?;
Ok(Record {
name: "".to_string(),
Expand All @@ -22,20 +23,25 @@ pub fn to_record(schema: &Schema) -> Result<Record> {
})
}

fn field_to_field(field: &Field) -> Result<AvroField> {
let schema = type_to_schema(field.data_type(), field.is_nullable)?;
fn field_to_field(field: &Field, name_counter: &mut i32) -> Result<AvroField> {
let schema = type_to_schema(field.data_type(), field.is_nullable, name_counter)?;
Ok(AvroField::new(&field.name, schema))
}

fn type_to_schema(data_type: &DataType, is_nullable: bool) -> Result<AvroSchema> {
fn type_to_schema(data_type: &DataType, is_nullable: bool, name_counter: &mut i32) -> Result<AvroSchema> {
Ok(if is_nullable {
AvroSchema::Union(vec![AvroSchema::Null, _type_to_schema(data_type)?])
AvroSchema::Union(vec![AvroSchema::Null, _type_to_schema(data_type, name_counter)?])
} else {
_type_to_schema(data_type)?
_type_to_schema(data_type, name_counter)?
})
}

fn _type_to_schema(data_type: &DataType) -> Result<AvroSchema> {
fn _get_field_name(name_counter: &mut i32) -> String {
*name_counter += 1;
format!("r{}", name_counter)
}

fn _type_to_schema(data_type: &DataType, name_counter: &mut i32) -> Result<AvroSchema> {
Ok(match data_type.to_logical_type() {
DataType::Null => AvroSchema::Null,
DataType::Boolean => AvroSchema::Boolean,
Expand All @@ -48,13 +54,13 @@ fn _type_to_schema(data_type: &DataType) -> Result<AvroSchema> {
DataType::Utf8 => AvroSchema::String(None),
DataType::LargeUtf8 => AvroSchema::String(None),
DataType::LargeList(inner) | DataType::List(inner) => AvroSchema::Array(Box::new(
type_to_schema(&inner.data_type, inner.is_nullable)?,
type_to_schema(&inner.data_type, inner.is_nullable, name_counter)?,
)),
DataType::Struct(fields) => AvroSchema::Record(Record::new(
"",
_get_field_name(name_counter),
fields
.iter()
.map(field_to_field)
.map(|f| field_to_field(&f, name_counter))
.collect::<Result<Vec<_>>>()?,
)),
DataType::Date32 => AvroSchema::Int(Some(IntLogical::Date)),
Expand Down
85 changes: 85 additions & 0 deletions tests/it/io/avro/write.rs
Expand Up @@ -6,6 +6,7 @@ use arrow2::io::avro::avro_schema::file::{Block, CompressedBlock, Compression};
use arrow2::io::avro::avro_schema::write::{compress, write_block, write_metadata};
use arrow2::io::avro::write;
use arrow2::types::months_days_ns;
use avro_schema::schema::{ Schema as AvroSchema, Record, Field as AvroField, };

use super::read::read_avro;

Expand Down Expand Up @@ -284,6 +285,90 @@ fn struct_data() -> Chunk<Box<dyn Array>> {
])
}

fn avro_record() -> Record {
Record {
name: "".to_string(),
namespace: None,
doc: None,
aliases: vec![],
fields: vec![
AvroField {
name: "struct".to_string(),
doc: None,
schema: AvroSchema::Record(Record {
name: "r1".to_string(),
namespace: None,
doc: None,
aliases: vec![],
fields: vec![
AvroField {
name: "item1".to_string(),
doc: None,
schema: AvroSchema::Int(None),
default: None,
order: None,
aliases: vec![],
},
AvroField {
name: "item2".to_string(),
doc: None,
schema: AvroSchema::Union(vec![AvroSchema::Null, AvroSchema::Int(None)]),
default: None,
order: None,
aliases: vec![],
},
],
}),
default: None,
order: None,
aliases: vec![],
},
AvroField {
name: "struct nullable".to_string(),
doc: None,
schema: AvroSchema::Union(vec![
AvroSchema::Null,
AvroSchema::Record(Record {
name: "r2".to_string(),
namespace: None,
doc: None,
aliases: vec![],
fields: vec![
AvroField {
name: "item1".to_string(),
doc: None,
schema: AvroSchema::Int(None),
default: None,
order: None,
aliases: vec![],
},
AvroField {
name: "item2".to_string(),
doc: None,
schema: AvroSchema::Union(vec![AvroSchema::Null, AvroSchema::Int(None)]),
default: None,
order: None,
aliases: vec![],
},
],
}),
]),
default: None,
order: None,
aliases: vec![],
},
],
}
}

#[test]
fn avro_record_schema() -> Result<()> {
let arrow_schema = struct_schema();
let record = write::to_record(&arrow_schema)?;
assert_eq!(record, avro_record());
Ok(())
}

#[test]
fn struct_() -> Result<()> {
let write_schema = struct_schema();
Expand Down

0 comments on commit 1982195

Please sign in to comment.