Skip to content

Commit

Permalink
feat(pubsub): support bigquery subscriptions (#6119)
Browse files Browse the repository at this point in the history
* feat(pubsub): support bigquery subscriptions

* fix formatting and update comments

* run gofmt

* simplify fake behavior for bqconfig

* fix comments

* add comments to exported consts

* add SubscriptionState field

* add comment to exported field

* add comment to exported type

* fix comment
  • Loading branch information
hongalex committed Jun 22, 2022
1 parent 7822869 commit 81f704a
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 4 deletions.
2 changes: 2 additions & 0 deletions pubsub/integration_test.go
Expand Up @@ -625,6 +625,7 @@ func TestIntegration_UpdateSubscription(t *testing.T) {
ServiceAccountEmail: serviceAccountEmail,
},
},
State: SubscriptionStateActive,
}
opt := cmpopts.IgnoreUnexported(SubscriptionConfig{})
if diff := testutil.Diff(got, want, opt); diff != "" {
Expand Down Expand Up @@ -658,6 +659,7 @@ func TestIntegration_UpdateSubscription(t *testing.T) {
RetentionDuration: 2 * time.Hour,
Labels: map[string]string{"label": "value"},
ExpirationPolicy: 25 * time.Hour,
State: SubscriptionStateActive,
}

if !testutil.Equal(got, want, opt) {
Expand Down
12 changes: 12 additions & 0 deletions pubsub/pstest/fake.go
Expand Up @@ -480,6 +480,11 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p
if ps.PushConfig == nil {
ps.PushConfig = &pb.PushConfig{}
}
if ps.BigqueryConfig == nil {
ps.BigqueryConfig = &pb.BigQueryConfig{}
} else if ps.BigqueryConfig.Table != "" {
ps.BigqueryConfig.State = pb.BigQueryConfig_ACTIVE
}
ps.TopicMessageRetentionDuration = top.proto.MessageRetentionDuration
var deadLetterTopic *topic
if ps.DeadLetterPolicy != nil {
Expand Down Expand Up @@ -579,6 +584,12 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti
case "push_config":
sub.proto.PushConfig = req.Subscription.PushConfig

case "bigquery_config":
sub.proto.BigqueryConfig = req.GetSubscription().GetBigqueryConfig()
if sub.proto.GetBigqueryConfig().GetTable() != "" {
sub.proto.GetBigqueryConfig().State = pb.BigQueryConfig_ACTIVE
}

case "ack_deadline_seconds":
a := req.Subscription.AckDeadlineSeconds
if err := checkAckDeadline(a); err != nil {
Expand Down Expand Up @@ -788,6 +799,7 @@ func newSubscription(t *topic, mu *sync.Mutex, timeNowFunc func() time.Time, dea
if at == 0 {
at = 10 * time.Second
}
ps.State = pb.Subscription_ACTIVE
return &subscription{
topic: t,
deadLetterTopic: deadLetterTopic,
Expand Down
58 changes: 58 additions & 0 deletions pubsub/pstest/fake_test.go
Expand Up @@ -1455,3 +1455,61 @@ func TestTopicRetentionAdmin(t *testing.T) {
t.Errorf("sub.TopicMessageRetentionDuration mismatch: %s", diff)
}
}

func TestSubscriptionPushPull(t *testing.T) {
ctx := context.Background()
pclient, sclient, _, cleanup := newFake(ctx, t)
defer cleanup()

top := mustCreateTopic(ctx, t, pclient, &pb.Topic{
Name: "projects/P/topics/T",
})

// Create a push subscription.
pc := &pb.PushConfig{
PushEndpoint: "some-endpoint",
}
got := mustCreateSubscription(ctx, t, sclient, &pb.Subscription{
AckDeadlineSeconds: minAckDeadlineSecs,
Name: "projects/P/subscriptions/S",
Topic: top.Name,
PushConfig: pc,
})

if diff := testutil.Diff(got.PushConfig, pc); diff != "" {
t.Errorf("sub.PushConfig mismatch: %s", diff)
}

// Update the subscription to write to BigQuery instead.
updateSub := got
updateSub.PushConfig = &pb.PushConfig{}
bqc := &pb.BigQueryConfig{
Table: "some-table",
}
updateSub.BigqueryConfig = bqc
got = mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{
Subscription: updateSub,
UpdateMask: &field_mask.FieldMask{Paths: []string{"push_config", "bigquery_config"}},
})
if diff := testutil.Diff(got.PushConfig, new(pb.PushConfig)); diff != "" {
t.Errorf("sub.PushConfig should be zero value\n%s", diff)
}
want := bqc
want.State = pb.BigQueryConfig_ACTIVE
if diff := testutil.Diff(got.BigqueryConfig, want); diff != "" {
t.Errorf("sub.BigQueryConfig mismatch: %s", diff)
}

// Switch back to a pull subscription.
updateSub.BigqueryConfig = &pb.BigQueryConfig{}
got = mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{
Subscription: updateSub,
UpdateMask: &field_mask.FieldMask{Paths: []string{"bigquery_config"}},
})
if diff := testutil.Diff(got.PushConfig, new(pb.PushConfig)); diff != "" {
t.Errorf("sub.PushConfig should be zero value\n%s", diff)
}
if diff := testutil.Diff(got.BigqueryConfig, new(pb.BigQueryConfig)); diff != "" {
t.Errorf("sub.BigqueryConfig should be zero value\n%s", diff)
}
}
137 changes: 133 additions & 4 deletions pubsub/subscription.go
Expand Up @@ -210,14 +210,106 @@ func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ {
}
}

// BigQueryConfigState denotes the possible states for a BigQuery Subscription.
type BigQueryConfigState int

const (
// BigQueryConfigStateUnspecified is the default value. This value is unused.
BigQueryConfigStateUnspecified = iota

// BigQueryConfigActive means the subscription can actively send messages to BigQuery.
BigQueryConfigActive

// BigQueryConfigPermissionDenied means the subscription cannot write to the BigQuery table because of permission denied errors.
BigQueryConfigPermissionDenied

// BigQueryConfigNotFound means the subscription cannot write to the BigQuery table because it does not exist.
BigQueryConfigNotFound

// BigQueryConfigSchemaMismatch means the subscription cannot write to the BigQuery table due to a schema mismatch.
BigQueryConfigSchemaMismatch
)

// BigQueryConfig configures the subscription to deliver to a BigQuery table.
type BigQueryConfig struct {
// The name of the table to which to write data, of the form
// {projectId}:{datasetId}.{tableId}
Table string

// When true, use the topic's schema as the columns to write to in BigQuery,
// if it exists.
UseTopicSchema bool

// When true, write the subscription name, message_id, publish_time,
// attributes, and ordering_key to additional columns in the table. The
// subscription name, message_id, and publish_time fields are put in their own
// columns while all other message properties (other than data) are written to
// a JSON object in the attributes column.
WriteMetadata bool

// When true and use_topic_schema is true, any fields that are a part of the
// topic schema that are not part of the BigQuery table schema are dropped
// when writing to BigQuery. Otherwise, the schemas must be kept in sync and
// any messages with extra fields are not written and remain in the
// subscription's backlog.
DropUnknownFields bool

// This is an output-only field that indicates whether or not the subscription can
// receive messages. This field is set only in responses from the server;
// it is ignored if it is set in any requests.
State BigQueryConfigState
}

func (bc *BigQueryConfig) toProto() *pb.BigQueryConfig {
if bc == nil {
return nil
}
pbCfg := &pb.BigQueryConfig{
Table: bc.Table,
UseTopicSchema: bc.UseTopicSchema,
WriteMetadata: bc.WriteMetadata,
DropUnknownFields: bc.DropUnknownFields,
State: pb.BigQueryConfig_State(bc.State),
}
return pbCfg
}

// SubscriptionState denotes the possible states for a Subscription.
type SubscriptionState int

const (
// SubscriptionStateUnspecified is the default value. This value is unused.
SubscriptionStateUnspecified = iota

// SubscriptionStateActive means the subscription can actively send messages to BigQuery.
SubscriptionStateActive

// SubscriptionStateResourceError means the subscription receive messages because of an
// error with the resource to which it pushes messages.
// See the more detailed error state in the corresponding configuration.
SubscriptionStateResourceError
)

// SubscriptionConfig describes the configuration of a subscription.
type SubscriptionConfig struct {
// The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
name string

Topic *Topic
// The topic from which this subscription is receiving messages.
Topic *Topic

// If push delivery is used with this subscription, this field is
// used to configure it. Either `PushConfig` or `BigQueryConfig` can be set,
// but not both. If both are empty, then the subscriber will pull and ack
// messages using API methods.
PushConfig PushConfig

// If delivery to BigQuery is used with this subscription, this field is
// used to configure it. Either `PushConfig` or `BigQueryConfig` can be set,
// but not both. If both are empty, then the subscriber will pull and ack
// messages using API methods.
BigQueryConfig BigQueryConfig

// The default maximum time after a subscriber receives a message before
// the subscriber should acknowledge the message. Note: messages which are
// obtained via Subscription.Receive need not be acknowledged within this
Expand Down Expand Up @@ -290,6 +382,12 @@ type SubscriptionConfig struct {
// This is an output only field, meaning it will only appear in responses from the backend
// and will be ignored if sent in a request.
TopicMessageRetentionDuration time.Duration

// State indicates whether or not the subscription can receive messages.
// This is an output-only field that indicates whether or not the subscription can
// receive messages. This field is set only in responses from the server;
// it is ignored if it is set in any requests.
State SubscriptionState
}

// String returns the globally unique printable name of the subscription config.
Expand Down Expand Up @@ -317,6 +415,10 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil {
pbPushConfig = cfg.PushConfig.toProto()
}
var pbBigQueryConfig *pb.BigQueryConfig
if cfg.BigQueryConfig.Table != "" {
pbBigQueryConfig = cfg.BigQueryConfig.toProto()
}
var retentionDuration *durpb.Duration
if cfg.RetentionDuration != 0 {
retentionDuration = durpb.New(cfg.RetentionDuration)
Expand All @@ -333,6 +435,7 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
Name: name,
Topic: cfg.Topic.name,
PushConfig: pbPushConfig,
BigqueryConfig: pbBigQueryConfig,
AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())),
RetainAckedMessages: cfg.RetainAckedMessages,
MessageRetentionDuration: retentionDuration,
Expand Down Expand Up @@ -371,11 +474,14 @@ func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionC
RetryPolicy: rp,
Detached: pbSub.Detached,
TopicMessageRetentionDuration: pbSub.TopicMessageRetentionDuration.AsDuration(),
State: SubscriptionState(pbSub.State),
}
pc := protoToPushConfig(pbSub.PushConfig)
if pc != nil {
if pc := protoToPushConfig(pbSub.PushConfig); pc != nil {
subC.PushConfig = *pc
}
if bq := protoToBQConfig(pbSub.GetBigqueryConfig()); bq != nil {
subC.BigQueryConfig = *bq
}
return subC, nil
}

Expand All @@ -398,6 +504,20 @@ func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig {
return pc
}

func protoToBQConfig(pbBQ *pb.BigQueryConfig) *BigQueryConfig {
if pbBQ == nil {
return nil
}
bq := &BigQueryConfig{
Table: pbBQ.GetTable(),
UseTopicSchema: pbBQ.GetUseTopicSchema(),
DropUnknownFields: pbBQ.GetDropUnknownFields(),
WriteMetadata: pbBQ.GetWriteMetadata(),
State: BigQueryConfigState(pbBQ.State),
}
return bq
}

// DeadLetterPolicy specifies the conditions for dead lettering messages in
// a subscription.
type DeadLetterPolicy struct {
Expand Down Expand Up @@ -641,9 +761,14 @@ func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) {

// SubscriptionConfigToUpdate describes how to update a subscription.
type SubscriptionConfigToUpdate struct {
// If non-nil, the push config is changed.
// If non-nil, the push config is changed. Cannot be set at the same time as BigQueryConfig.
// If currently in push mode, set this value to the zero value to revert to a Pull based subscription.
PushConfig *PushConfig

// If non-nil, the bigquery config is changed. Cannot be set at the same time as PushConfig.
// If currently in bigquery mode, set this value to the zero value to revert to a Pull based subscription,
BigQueryConfig *BigQueryConfig

// If non-zero, the ack deadline is changed.
AckDeadline time.Duration

Expand Down Expand Up @@ -698,6 +823,10 @@ func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.Update
psub.PushConfig = cfg.PushConfig.toProto()
paths = append(paths, "push_config")
}
if cfg.BigQueryConfig != nil {
psub.BigqueryConfig = cfg.BigQueryConfig.toProto()
paths = append(paths, "bigquery_config")
}
if cfg.AckDeadline != 0 {
psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds()))
paths = append(paths, "ack_deadline_seconds")
Expand Down
34 changes: 34 additions & 0 deletions pubsub/subscription_test.go
Expand Up @@ -190,6 +190,7 @@ func TestUpdateSubscription(t *testing.T) {
Audience: "client-12345",
},
},
State: SubscriptionStateActive,
}
opt := cmpopts.IgnoreUnexported(SubscriptionConfig{})
if !testutil.Equal(cfg, want, opt) {
Expand Down Expand Up @@ -226,6 +227,7 @@ func TestUpdateSubscription(t *testing.T) {
Audience: "client-12345",
},
},
State: SubscriptionStateActive,
}
if !testutil.Equal(got, want, opt) {
t.Fatalf("\ngot %+v\nwant %+v", got, want)
Expand Down Expand Up @@ -435,3 +437,35 @@ func TestOrdering_CreateSubscription(t *testing.T) {
msg.Ack()
})
}

func TestBigQuerySubscription(t *testing.T) {
ctx := context.Background()
client, srv := newFake(t)
defer client.Close()
defer srv.Close()

topic := mustCreateTopic(t, client, "t")
bqTable := "some-project:some-dataset.some-table"
bqConfig := BigQueryConfig{
Table: bqTable,
}

subConfig := SubscriptionConfig{
Topic: topic,
BigQueryConfig: bqConfig,
}
bqSub, err := client.CreateSubscription(ctx, "s", subConfig)
if err != nil {
t.Fatal(err)
}
cfg, err := bqSub.Config(ctx)
if err != nil {
t.Fatal(err)
}

want := bqConfig
want.State = BigQueryConfigActive
if diff := testutil.Diff(cfg.BigQueryConfig, want); diff != "" {
t.Fatalf("CreateBQSubscription mismatch: \n%s", diff)
}
}

0 comments on commit 81f704a

Please sign in to comment.