Skip to content

Commit

Permalink
feat: implement support for more well-known-types
Browse files Browse the repository at this point in the history
- google.protobuf.Struct (mapped to a JSON STRING)
- google.type.TimeOfDay (mapped to TIME)
- google.type.Date (mapped to DATE)
- google.type.LatLng (mapped to GEOGRAPHY)

The reason for using GEOGRAPHY as default for LatLng is because of the
clustering opportunities for tables that store locations as GEOGRAPHY.

Future SchemaOptions may override to store LatLng as a record instead.

Integration tested against public BigQuery datasets. The intention is to
identify datasets that exercise the features of this library, then
perform a load/marshal/unmarshal sequence and validate that the original
result is preserved.
  • Loading branch information
odsod committed Jan 2, 2021
1 parent 154694a commit 26018fb
Show file tree
Hide file tree
Showing 23 changed files with 2,029 additions and 168 deletions.
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ internal/examples/proto/api-common-protos:
.PHONY: go-test
go-test:
$(info [$@] running Go tests...)
@go test -short -race -cover ./...

.PHONY: go-integration-test
go-integration-test:
$(info [$@] running Go tests (including integration tests)...)
@go test -race -cover ./...

.PHONY: go-mod-tidy
Expand All @@ -35,7 +40,7 @@ go-mod-tidy:
@go mod tidy -v

.PHONY: buf-check-lint
buf-check-lint: $(buf)
buf-check-lint: $(buf) internal/examples/proto/api-common-protos
$(info [$@] linting protobuf schemas...)
@$(buf) check lint

Expand Down
20 changes: 12 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# Protobuf + BigQuery + Go

Add-ons to [cloud.google.com/bigquery][google-cloud-go-bigquery] for
first-class protobuf support.
first-class protobuf support using [protobuf reflection][protobuf-apiv2].

**Work in progress:** This library is under active development.

[google-cloud-go-bigquery]: https://pkg.go.dev/cloud.google.com/go/bigquery
[protobuf-apiv2]: https://blog.golang.org/protobuf-apiv2

## Installing

Expand Down Expand Up @@ -38,12 +39,12 @@ _[Reference ≫][well-known-types]_

### Support for API Common Protos (`google.type`)

| Protobuf | BigQuery |
| -------------------- | --------- |
| google.type.Date | DATE |
| google.type.DateTime | TIMESTAMP |
| google.type.LatLng | GEOGRAPHY |
| google.type.Time | TIME |
| Protobuf | BigQuery |
| --------------------- | --------- |
| google.type.Date | DATE |
| google.type.DateTime | TIMESTAMP |
| google.type.LatLng | GEOGRAPHY |
| google.type.TimeOfDay | TIME |

_[Reference ≫][api-common-protos]_

Expand All @@ -53,7 +54,10 @@ _[Reference ≫][api-common-protos]_

### `protobq.MessageLoader`

Loads BigQuery rows into protobuf messages.
An implementation of [bigquery.ValueLoader][valueloader] that loads
BigQuery rows into protobuf messages.

[valueloader]: https://pkg.go.dev/cloud.google.com/go/bigquery#ValueLoader

```go
package main
Expand Down
144 changes: 123 additions & 21 deletions encoding/protobq/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ import (
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
"go.einride.tech/protobuf-bigquery/internal/wkt"
"google.golang.org/genproto/googleapis/type/date"
"google.golang.org/genproto/googleapis/type/latlng"
"google.golang.org/genproto/googleapis/type/timeofday"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -138,28 +143,28 @@ func (o UnmarshalOptions) loadSingularField(
return nil
}
if field.Kind() == protoreflect.MessageKind || field.Kind() == protoreflect.GroupKind {
switch field.Message().FullName() {
case wkt.Timestamp:
return o.unmarshalTimestampField(bigqueryValue, field, message)
case wkt.Duration:
return o.unmarshalDurationField(bigqueryValue, field, message)
if wkt.IsWellKnownType(string(field.Message().FullName())) {
if err := o.unmarshalWellKnownTypeField(bigqueryValue, field, message); err != nil {
return fmt.Errorf("%s: %w", field.Name(), err)
}
return nil
}
if fieldSchema.Type != bigquery.RecordFieldType {
return fmt.Errorf("unsupported BigQuery type for message: %v", fieldSchema.Type)
return fmt.Errorf("%s: unsupported BigQuery type for message: %v", field.Name(), fieldSchema.Type)
}
bigqueryMessageValue, ok := bigqueryValue.([]bigquery.Value)
if !ok {
return fmt.Errorf("unsupported BigQuery value for message: %v", bigqueryMessageValue)
}
fieldValue := message.NewField(field)
if err := o.loadMessage(bigqueryMessageValue, fieldSchema.Schema, fieldValue.Message()); err != nil {
return err
return fmt.Errorf("%s: %w", field.Name(), err)
}
message.Set(field, fieldValue)
} else {
fieldValue, err := o.unmarshalScalar(bigqueryValue, field)
if err != nil {
return err
return fmt.Errorf("%s: %w", field.Name(), err)
}
message.Set(field, fieldValue)
}
Expand All @@ -175,39 +180,62 @@ func (o UnmarshalOptions) unmarshalSingularField(
return nil
}
if field.Kind() == protoreflect.MessageKind || field.Kind() == protoreflect.GroupKind {
switch field.Message().FullName() {
case wkt.Timestamp:
return o.unmarshalTimestampField(bigqueryValue, field, message)
case wkt.Duration:
return o.unmarshalDurationField(bigqueryValue, field, message)
if wkt.IsWellKnownType(string(field.Message().FullName())) {
if err := o.unmarshalWellKnownTypeField(bigqueryValue, field, message); err != nil {
return fmt.Errorf("%s: %w", field.Name(), err)
}
return nil
}
bigqueryMessageValue, ok := bigqueryValue.(map[string]bigquery.Value)
if !ok {
return fmt.Errorf("unsupported BigQuery value for message: %v", bigqueryMessageValue)
return fmt.Errorf("%s: unsupported BigQuery value for message: %v", field.Name(), bigqueryMessageValue)
}
fieldValue := message.NewField(field)
if err := o.unmarshalMessage(bigqueryMessageValue, fieldValue.Message()); err != nil {
return err
return fmt.Errorf("%s: %w", field.Name(), err)
}
message.Set(field, fieldValue)
} else {
fieldValue, err := o.unmarshalScalar(bigqueryValue, field)
if err != nil {
return err
return fmt.Errorf("%s: %w", field.Name(), err)
}
message.Set(field, fieldValue)
}
return nil
}

func (o UnmarshalOptions) unmarshalWellKnownTypeField(
bigqueryValue bigquery.Value,
field protoreflect.FieldDescriptor,
message protoreflect.Message,
) error {
switch field.Message().FullName() {
case wkt.Timestamp:
return o.unmarshalTimestampField(bigqueryValue, field, message)
case wkt.Duration:
return o.unmarshalDurationField(bigqueryValue, field, message)
case wkt.TimeOfDay:
return o.unmarshalTimeOfDayField(bigqueryValue, field, message)
case wkt.Date:
return o.unmarshalDateField(bigqueryValue, field, message)
case wkt.LatLng:
return o.unmarshalLatLngField(bigqueryValue, field, message)
case wkt.Struct:
return o.unmarshalStructField(bigqueryValue, field, message)
default:
return fmt.Errorf("unsupported well-known-type: %s", field.Message().FullName())
}
}

func (o UnmarshalOptions) unmarshalTimestampField(
bigqueryValue bigquery.Value,
field protoreflect.FieldDescriptor,
message protoreflect.Message,
) error {
t, ok := bigqueryValue.(time.Time)
if !ok {
return fmt.Errorf("unsupported BigQuery value for google.protobuf.Timestamp: %v", bigqueryValue)
return fmt.Errorf("unsupported BigQuery value for %s: %v", wkt.Timestamp, bigqueryValue)
}
message.Set(field, protoreflect.ValueOfMessage(timestamppb.New(t).ProtoReflect()))
return nil
Expand All @@ -218,11 +246,85 @@ func (o UnmarshalOptions) unmarshalDurationField(
field protoreflect.FieldDescriptor,
message protoreflect.Message,
) error {
t, ok := bigqueryValue.(time.Duration)
var duration time.Duration
switch bigqueryValue := bigqueryValue.(type) {
case int64:
duration = time.Duration(bigqueryValue) * time.Second
case float64:
duration = time.Duration(bigqueryValue * float64(time.Second))
default:
return fmt.Errorf("unsupported BigQuery value for %s: %#v", wkt.Duration, bigqueryValue)
}
message.Set(field, protoreflect.ValueOfMessage(durationpb.New(duration).ProtoReflect()))
return nil
}

func (o UnmarshalOptions) unmarshalTimeOfDayField(
bigqueryValue bigquery.Value,
field protoreflect.FieldDescriptor,
message protoreflect.Message,
) error {
t, ok := bigqueryValue.(civil.Time)
if !ok {
return fmt.Errorf("unsupported BigQuery value for google.protobuf.Duration: %v", bigqueryValue)
return fmt.Errorf("unsupported BigQuery value for %s: %#v", wkt.TimeOfDay, bigqueryValue)
}
message.Set(field, protoreflect.ValueOfMessage((&timeofday.TimeOfDay{
Hours: int32(t.Hour),
Minutes: int32(t.Minute),
Seconds: int32(t.Second),
Nanos: int32(t.Nanosecond),
}).ProtoReflect()))
return nil
}

func (o UnmarshalOptions) unmarshalDateField(
bigqueryValue bigquery.Value,
field protoreflect.FieldDescriptor,
message protoreflect.Message,
) error {
d, ok := bigqueryValue.(civil.Date)
if !ok {
return fmt.Errorf("unsupported BigQuery value for %s: %#v", wkt.Date, bigqueryValue)
}
message.Set(field, protoreflect.ValueOfMessage((&date.Date{
Year: int32(d.Year),
Month: int32(d.Month),
Day: int32(d.Day),
}).ProtoReflect()))
return nil
}

func (o UnmarshalOptions) unmarshalLatLngField(
bigqueryValue bigquery.Value,
field protoreflect.FieldDescriptor,
message protoreflect.Message,
) error {
s, ok := bigqueryValue.(string)
if !ok {
return fmt.Errorf("unsupported BigQuery value for %s: %#v", wkt.LatLng, bigqueryValue)
}
latLng := &latlng.LatLng{}
if _, err := fmt.Sscanf(s, "POINT(%f %f)", &latLng.Longitude, &latLng.Latitude); err != nil {
return fmt.Errorf("invalid GEOGRAPHY value for %s: %#v: %w", wkt.LatLng, bigqueryValue, err)
}
message.Set(field, protoreflect.ValueOfMessage(latLng.ProtoReflect()))
return nil
}

func (o UnmarshalOptions) unmarshalStructField(
bigqueryValue bigquery.Value,
field protoreflect.FieldDescriptor,
message protoreflect.Message,
) error {
s, ok := bigqueryValue.(string)
if !ok {
return fmt.Errorf("unsupported BigQuery value for %s: %#v", wkt.Struct, bigqueryValue)
}
var structValue structpb.Struct
if err := structValue.UnmarshalJSON([]byte(s)); err != nil {
return fmt.Errorf("invalid BigQuery value for %s: %#v: %w", wkt.Struct, bigqueryValue, err)
}
message.Set(field, protoreflect.ValueOfMessage(durationpb.New(t).ProtoReflect()))
message.Set(field, protoreflect.ValueOfMessage(structValue.ProtoReflect()))
return nil
}

Expand Down Expand Up @@ -288,5 +390,5 @@ func (o UnmarshalOptions) unmarshalScalar(
case protoreflect.MessageKind, protoreflect.GroupKind:
// Fall through to return error, these should have been handled by the caller.
}
return protoreflect.Value{}, fmt.Errorf("invalid BigQuery value %v for kind %v", bigQueryValue, field.Kind())
return protoreflect.Value{}, fmt.Errorf("invalid BigQuery value %#v for kind %v", bigQueryValue, field.Kind())
}

0 comments on commit 26018fb

Please sign in to comment.