Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter/adapt): add NormalizeDescriptor
Browse files Browse the repository at this point in the history
This functionality supports the "bring your own proto" case for writing
data.

Towards: googleapis#4366
  • Loading branch information
shollyman committed Aug 26, 2021
1 parent 08169ba commit 410b2a6
Show file tree
Hide file tree
Showing 4 changed files with 1,089 additions and 0 deletions.
136 changes: 136 additions & 0 deletions bigquery/storage/managedwriter/adapt/protoconversion.go
Expand Up @@ -279,3 +279,139 @@ func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, i
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
}, nil
}

// ConvertToProtoSchema builds a self-contained DescriptorProto suitable for communicating schema
// information with the BigQuery Storage write API. It's primarily used for cases where users are
// interested in sending data using a predefined protocol buffer message.
//
// The storage API accepts a single DescriptorProto for decoding message data. However, the nature
// of protocol buffers allows one to define complex messages using independent messages, which normally would
// then require passing of either multiple DescriptorProto messages, or something like a FileDescriptorProto.
//
// DescriptorProto allows one to communicate the structure of a single message, but does contain a mechanism
// for encoding inner nested messages. This function leverages that to encode the DescriptorProto by transforming
// external messages to nested messages, as well as handling other tasks like fully qualifying name references.
//
// Other details of note:
// * Well known types are not normalized.
// * Enums are encapsulated in an outer struct.
//
// Issues:
// Syntax isn't part of DescriptorProto or MessageOptions - still need that option in ProtoSchema.
func NormalizeDescriptor(in protoreflect.MessageDescriptor) (*descriptorpb.DescriptorProto, error) {
return normalizeDescriptorInternal(in, newStringSet(), newStringSet(), newStringSet(), nil)
}

func normalizeDescriptorInternal(in protoreflect.MessageDescriptor, visitedTypes, enumTypes, structTypes *stringSet, root *descriptorpb.DescriptorProto) (*descriptorpb.DescriptorProto, error) {
if in == nil {
return nil, fmt.Errorf("no messagedescriptor provided")
}
resultDP := &descriptorpb.DescriptorProto{}
if root == nil {
root = resultDP
}
fullProtoName := string(in.FullName())
normalizedProtoName := normalizeName(fullProtoName)
resultDP.Name = proto.String(normalizedProtoName)
visitedTypes.add(fullProtoName)
for i := 0; i < in.Fields().Len(); i++ {
inField := in.Fields().Get(i)
resultFDP := protodesc.ToFieldDescriptorProto(inField)
if inField.Kind() == protoreflect.MessageKind || inField.Kind() == protoreflect.GroupKind {
// Handle fields that reference messages.
// Groups are a proto2-ism which predated nested messages.
msgFullName := string(inField.Message().FullName())
if !skipNormalization(msgFullName) {
// for everything but well known types, normalize.
normName := normalizeName(string(msgFullName))
if structTypes.contains(msgFullName) {
resultFDP.TypeName = proto.String(normName)
} else {
if visitedTypes.contains(msgFullName) {
return nil, fmt.Errorf("recursize type not supported: %s", inField.FullName())
}
visitedTypes.add(msgFullName)
dp, err := normalizeDescriptorInternal(inField.Message(), visitedTypes, enumTypes, structTypes, root)
if err != nil {
return nil, fmt.Errorf("error converting message %s: %v", inField.FullName(), err)
}
root.NestedType = append(root.NestedType, dp)
visitedTypes.delete(msgFullName)
lastNested := root.GetNestedType()[len(root.GetNestedType())-1].GetName()
resultFDP.TypeName = proto.String(lastNested)
}
}
}
if inField.Kind() == protoreflect.EnumKind {
// Deal with enum types. BigQuery doesn't support enum types directly.
enumFullName := string(inField.Enum().FullName())
enclosingTypeName := normalizeName(enumFullName) + "_E"
enumName := string(inField.Enum().Name())
actualName := fmt.Sprintf("%s.%s", enclosingTypeName, enumName)
if enumTypes.contains(enumFullName) {
resultFDP.TypeName = proto.String(actualName)
} else {
enumDP := protodesc.ToEnumDescriptorProto(inField.Enum())
enumDP.Name = proto.String(enumName)
resultDP.NestedType = append(resultDP.NestedType, &descriptorpb.DescriptorProto{
Name: proto.String(enclosingTypeName),
EnumType: []*descriptorpb.EnumDescriptorProto{enumDP},
})
resultFDP.TypeName = proto.String(actualName)
enumTypes.add(enumFullName)
}
}
resultDP.Field = append(resultDP.Field, resultFDP)
}
structTypes.add(fullProtoName)
return resultDP, nil
}

type stringSet struct {
m map[string]struct{}
}

func (s *stringSet) contains(k string) bool {
_, ok := s.m[k]
return ok
}

func (s *stringSet) add(k string) {
s.m[k] = struct{}{}
}

func (s *stringSet) delete(k string) {
delete(s.m, k)
}

func newStringSet() *stringSet {
return &stringSet{
m: make(map[string]struct{}),
}
}

func normalizeName(in string) string {
return strings.ReplaceAll(in, ".", "_")
}

// these types don't get normalized into the fully-contained structure.
var normalizationSkipList = []string{
".google.protobuf.DoubleValue",
".google.protobuf.FloatValue",
".google.protobuf.Int64Value",
".google.protobuf.UInt64Value",
".google.protobuf.Int32Value",
".google.protobuf.Uint32Value",
".google.protobuf.BoolValue",
".google.protobuf.StringValue",
".google.protobuf.BytesValue",
}

func skipNormalization(fullName string) bool {
for _, v := range normalizationSkipList {
if v == fullName {
return true
}
}
return false
}
187 changes: 187 additions & 0 deletions bigquery/storage/managedwriter/adapt/protoconversion_test.go
Expand Up @@ -19,6 +19,7 @@ import (
"reflect"
"testing"

"cloud.google.com/go/bigquery/storage/managedwriter/testdata"
"github.com/google/go-cmp/cmp"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
"google.golang.org/protobuf/encoding/protojson"
Expand Down Expand Up @@ -350,3 +351,189 @@ func TestProtoJSONSerialization(t *testing.T) {
}

}

func TestNormalizeDescriptor(t *testing.T) {
testCases := []struct {
description string
in protoreflect.MessageDescriptor
wantErr bool
want *descriptorpb.DescriptorProto
}{
{
description: "nil",
in: nil,
wantErr: true,
},
{
description: "AllSupportedTypes",
in: (&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor(),
want: &descriptorpb.DescriptorProto{
Name: proto.String("testdata_AllSupportedTypes"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("int32_value"),
JsonName: proto.String("int32Value"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("int64_value"),
JsonName: proto.String("int64Value"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("uint32_value"),
JsonName: proto.String("uint32Value"),
Number: proto.Int32(3),
Type: descriptorpb.FieldDescriptorProto_TYPE_UINT32.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("uint64_value"),
JsonName: proto.String("uint64Value"),
Number: proto.Int32(4),
Type: descriptorpb.FieldDescriptorProto_TYPE_UINT64.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("float_value"),
JsonName: proto.String("floatValue"),
Number: proto.Int32(5),
Type: descriptorpb.FieldDescriptorProto_TYPE_FLOAT.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("double_value"),
JsonName: proto.String("doubleValue"),
Number: proto.Int32(6),
Type: descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("bool_value"),
JsonName: proto.String("boolValue"),
Number: proto.Int32(7),
Type: descriptorpb.FieldDescriptorProto_TYPE_BOOL.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("enum_value"),
JsonName: proto.String("enumValue"),
TypeName: proto.String("testdata_TestEnum_E.TestEnum"),
Number: proto.Int32(8),
Type: descriptorpb.FieldDescriptorProto_TYPE_ENUM.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("string_value"),
JsonName: proto.String("stringValue"),
Number: proto.Int32(9),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_REQUIRED.Enum(),
},
},
NestedType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("testdata_TestEnum_E"),
EnumType: []*descriptorpb.EnumDescriptorProto{
{
Name: proto.String("TestEnum"),
Value: []*descriptorpb.EnumValueDescriptorProto{
{
Name: proto.String("TestEnum0"),
Number: proto.Int32(0),
},
{
Name: proto.String("TestEnum1"),
Number: proto.Int32(1),
},
},
},
},
},
},
},
},
{
description: "ContainsRecursive",
in: (&testdata.ContainsRecursive{}).ProtoReflect().Descriptor(),
wantErr: true,
},
{
description: "RecursiveTypeTopMessage",
in: (&testdata.RecursiveTypeTopMessage{}).ProtoReflect().Descriptor(),
wantErr: true,
},
{
description: "ComplexType",
in: (&testdata.ComplexType{}).ProtoReflect().Descriptor(),
want: &descriptorpb.DescriptorProto{
Name: proto.String("testdata_ComplexType"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("nested_repeated_type"),
JsonName: proto.String("nestedRepeatedType"),
Number: proto.Int32(1),
TypeName: proto.String("testdata_NestedType"),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum(),
},
{
Name: proto.String("inner_type"),
JsonName: proto.String("innerType"),
Number: proto.Int32(2),
TypeName: proto.String("testdata_InnerType"),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
NestedType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("testdata_InnerType"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("value"),
JsonName: proto.String("value"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum(),
},
},
},
{
Name: proto.String("testdata_NestedType"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("inner_type"),
JsonName: proto.String("innerType"),
Number: proto.Int32(1),
TypeName: proto.String("testdata_InnerType"),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum(),
},
},
},
},
},
},
}

for _, tc := range testCases {
gotDP, err := NormalizeDescriptor(tc.in)

if tc.wantErr && err == nil {
t.Errorf("%s: wanted err but got success", tc.description)
continue
}
if !tc.wantErr && err != nil {
t.Errorf("%s: wanted success, got err: %v", tc.description, err)
continue
}
if diff := cmp.Diff(gotDP, tc.want, protocmp.Transform()); diff != "" {
t.Errorf("%s: -got, +want:\n%s", tc.description, diff)
}
}
}

0 comments on commit 410b2a6

Please sign in to comment.