Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bigquery/storage/managedwriter/adapt): schema->protodescriptor #6267

Merged
merged 2 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 29 additions & 15 deletions bigquery/storage/managedwriter/adapt/protoconversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ var wellKnownTypesWrapperName = "google/protobuf/wrappers.proto"
// dependencyCache is used to reduce the number of unique messages we generate by caching based on the tableschema.
//
// keys are based on the base64-encoded serialized tableschema value.
type dependencyCache map[string]protoreflect.Descriptor
type dependencyCache map[string]protoreflect.MessageDescriptor

func (dm dependencyCache) get(schema *storagepb.TableSchema) protoreflect.Descriptor {
func (dm dependencyCache) get(schema *storagepb.TableSchema) protoreflect.MessageDescriptor {
if dm == nil {
return nil
}
Expand All @@ -103,7 +103,18 @@ func (dm dependencyCache) get(schema *storagepb.TableSchema) protoreflect.Descri
return nil
}

func (dm dependencyCache) add(schema *storagepb.TableSchema, descriptor protoreflect.Descriptor) error {
func (dm dependencyCache) getFileDescriptorProtos() []*descriptorpb.FileDescriptorProto {
var fdpList []*descriptorpb.FileDescriptorProto
for _, d := range dm {
if fd := d.ParentFile(); fd != nil {
fdp := protodesc.ToFileDescriptorProto(fd)
fdpList = append(fdpList, fdp)
}
}
return fdpList
}

func (dm dependencyCache) add(schema *storagepb.TableSchema, descriptor protoreflect.MessageDescriptor) error {
if dm == nil {
return fmt.Errorf("cache is nil")
}
Expand Down Expand Up @@ -133,7 +144,7 @@ func StorageSchemaToProto3Descriptor(inSchema *storagepb.TableSchema, scope stri
}

// internal implementation of the conversion code.
func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope string, cache *dependencyCache, useProto3 bool) (protoreflect.Descriptor, error) {
func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope string, cache *dependencyCache, useProto3 bool) (protoreflect.MessageDescriptor, error) {
if inSchema == nil {
return nil, newConversionError(scope, fmt.Errorf("no input schema was provided"))
}
Expand Down Expand Up @@ -206,9 +217,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
}

// Use the local dependencies to generate a list of filenames.
depNames := []string{
wellKnownTypesWrapperName,
}
depNames := []string{wellKnownTypesWrapperName}
for _, d := range deps {
depNames = append(depNames, d.ParentFile().Path())
}
Expand All @@ -226,22 +235,27 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st

// We'll need a FileDescriptorSet as we have a FileDescriptorProto for the current
// descriptor we're building, but we need to include all the referenced dependencies.
fds := &descriptorpb.FileDescriptorSet{
File: []*descriptorpb.FileDescriptorProto{
fdp,
protodesc.ToFileDescriptorProto(wrapperspb.File_google_protobuf_wrappers_proto),
},

fdpList := []*descriptorpb.FileDescriptorProto{
fdp,
protodesc.ToFileDescriptorProto(wrapperspb.File_google_protobuf_wrappers_proto),
}
for _, d := range deps {
fds.File = append(fds.File, protodesc.ToFileDescriptorProto(d))
fdpList = append(fdpList, cache.getFileDescriptorProtos()...)

fds := &descriptorpb.FileDescriptorSet{
File: fdpList,
}

// Load the set into a registry, then interrogate it for the descriptor corresponding to the top level message.
files, err := protodesc.NewFiles(fds)
if err != nil {
return nil, err
}
return files.FindDescriptorByName(protoreflect.FullName(scope))
found, err := files.FindDescriptorByName(protoreflect.FullName(scope))
if err != nil {
return nil, err
}
return found.(protoreflect.MessageDescriptor), nil
}

// tableFieldSchemaToFieldDescriptorProto builds individual field descriptors for a proto message.
Expand Down
229 changes: 217 additions & 12 deletions bigquery/storage/managedwriter/adapt/protoconversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,28 @@ import (
"google.golang.org/protobuf/types/dynamicpb"
)

// TestSchemaToProtoConversion validates behavior around converting table schemas to
// a descriptor. The challenges here are that we use dynamic proto registries to
// do this work, which means that we're unable to do things like proto.Equal comparisons
// between MessageDescriptors directly.
//
// Instead, we compare to two forms of the message in DescriptorProto form: In the first,
// we ensure the structure of the outer message is as expected. In the second, we compare
// to the normalized form of the DescriptorProto as that encapsulates all the dependencies
// within the NestedTypes definition.
func TestSchemaToProtoConversion(t *testing.T) {
testCases := []struct {
description string
bq *storagepb.TableSchema
wantProto2 *descriptorpb.DescriptorProto
wantProto3 *descriptorpb.DescriptorProto
// The un-normalized descriptor (sans dependencies)
wantProto2 *descriptorpb.DescriptorProto
// Normalized descriptor (all dependencies nested)
wantProto2Normalized *descriptorpb.DescriptorProto

// The un-normalized descriptor (sans dependencies)
wantProto3 *descriptorpb.DescriptorProto
// Normalized descriptor
wantProto3Normalized *descriptorpb.DescriptorProto
}{
{
description: "basic",
Expand All @@ -58,6 +74,18 @@ func TestSchemaToProtoConversion(t *testing.T) {
{Name: proto.String("baz"), Number: proto.Int32(3), Type: descriptorpb.FieldDescriptorProto_TYPE_BYTES.Enum(), Label: descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()},
},
},
wantProto2Normalized: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("foo"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()},
{Name: proto.String("bar"), Number: proto.Int32(2), Type: descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(), Label: descriptorpb.FieldDescriptorProto_LABEL_REQUIRED.Enum()},
{Name: proto.String("baz"), Number: proto.Int32(3), Type: descriptorpb.FieldDescriptorProto_TYPE_BYTES.Enum(), Label: descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()},
},
},
wantProto3: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
Expand Down Expand Up @@ -107,6 +135,43 @@ func TestSchemaToProtoConversion(t *testing.T) {
},
},
},
wantProto2Normalized: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("curdate"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("rec"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String("root__rec"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
NestedType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("root__rec"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("userid"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_REQUIRED.Enum(),
},
{
Name: proto.String("location"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
},
},
},
wantProto3: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
Expand Down Expand Up @@ -205,37 +270,177 @@ func TestSchemaToProtoConversion(t *testing.T) {
},
},
},
{
description: "multiple nesting levels",
bq: &storagepb.TableSchema{
Fields: []*storagepb.TableFieldSchema{
{
Name: "outer_struct",
Type: storagepb.TableFieldSchema_STRUCT,
Mode: storagepb.TableFieldSchema_NULLABLE,
Fields: []*storagepb.TableFieldSchema{
{
Name: "inner_struct",
Type: storagepb.TableFieldSchema_STRUCT,
Mode: storagepb.TableFieldSchema_NULLABLE,
Fields: []*storagepb.TableFieldSchema{
{Name: "leaf_one", Type: storagepb.TableFieldSchema_INT64, Mode: storagepb.TableFieldSchema_NULLABLE},
{Name: "leaf_two", Type: storagepb.TableFieldSchema_INT64, Mode: storagepb.TableFieldSchema_NULLABLE},
},
},
},
},
{
Name: "other_field",
Type: storagepb.TableFieldSchema_INT64,
Mode: storagepb.TableFieldSchema_NULLABLE,
},
},
},
wantProto2: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("outer_struct"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String(".root__outer_struct"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("other_field"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
},
wantProto2Normalized: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("outer_struct"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String("root__outer_struct"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("other_field"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
NestedType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("root__outer_struct__inner_struct"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("leaf_one"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("leaf_two"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
},
{
Name: proto.String("root__outer_struct"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("inner_struct"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String("root__outer_struct__inner_struct"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
},
},
},
wantProto3: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("outer_struct"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String(".root__outer_struct"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("other_field"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String(".google.protobuf.Int64Value"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
},
},
}
for _, tc := range testCases {
// Proto2
p2d, err := StorageSchemaToProto2Descriptor(tc.bq, "root")
if err != nil {
t.Errorf("case (%s) failed proto2 conversion: %v", tc.description, err)
t.Fatalf("case (%s) failed proto2 conversion: %v", tc.description, err)
}

// convert it to DP form
// Convert to MessageDescriptor.
mDesc, ok := p2d.(protoreflect.MessageDescriptor)
if !ok {
t.Errorf("%s: couldn't convert proto2 to messagedescriptor", tc.description)
}
gotDP := protodesc.ToDescriptorProto(mDesc)
if diff := cmp.Diff(gotDP, tc.wantProto2, protocmp.Transform()); diff != "" {
t.Errorf("%s proto2: -got, +want:\n%s", tc.description, diff)
// Check the non-normalized case.
if tc.wantProto2 != nil {
gotDP := protodesc.ToDescriptorProto(mDesc)
if diff := cmp.Diff(gotDP, tc.wantProto2, protocmp.Transform()); diff != "" {
t.Errorf("%s proto2: -got, +want:\n%s", tc.description, diff)
}
}
// Check the normalized case.
if tc.wantProto2Normalized != nil {
gotDP, err := NormalizeDescriptor(mDesc)
if err != nil {
t.Errorf("failed to normalize: %v", err)
}
if diff := cmp.Diff(gotDP, tc.wantProto2Normalized, protocmp.Transform()); diff != "" {
t.Errorf("%s proto2normalized: -got, +want:\n%s", tc.description, diff)
}
}

p3d, err := StorageSchemaToProto3Descriptor(tc.bq, "root")
if err != nil {
t.Fatalf("case (%s) failed proto3 conversion: %v", tc.description, err)
}

// Convert to MessageDescriptor.
mDesc, ok = p3d.(protoreflect.MessageDescriptor)
if !ok {
t.Errorf("%s: couldn't convert proto3 to messagedescriptor", tc.description)
}
gotDP = protodesc.ToDescriptorProto(mDesc)
if diff := cmp.Diff(gotDP, tc.wantProto3, protocmp.Transform()); diff != "" {
t.Errorf("%s proto3: -got, +want:\n%s", tc.description, diff)
// Check the non-normalized case.
if tc.wantProto3 != nil {
gotDP := protodesc.ToDescriptorProto(mDesc)
if diff := cmp.Diff(gotDP, tc.wantProto3, protocmp.Transform()); diff != "" {
t.Errorf("%s proto2: -got, +want:\n%s", tc.description, diff)
}
}
// Check the normalized case.
if tc.wantProto3Normalized != nil {
gotDP, err := NormalizeDescriptor(mDesc)
if err != nil {
t.Errorf("failed to normalize: %v", err)
}
if diff := cmp.Diff(gotDP, tc.wantProto3Normalized, protocmp.Transform()); diff != "" {
t.Errorf("%s proto3normalized: -got, +want:\n%s", tc.description, diff)
}
}

}
}

Expand Down