From d02cc381a02ce7a1526a1e6537004d19d8311003 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Wed, 1 Jun 2022 17:32:28 -0700 Subject: [PATCH 01/10] feat(pubsub): support bigquery subscriptions --- pubsub/pstest/fake.go | 12 ++++ pubsub/pstest/fake_test.go | 59 +++++++++++++++++++ pubsub/subscription.go | 113 +++++++++++++++++++++++++++++++++++- pubsub/subscription_test.go | 32 ++++++++++ 4 files changed, 213 insertions(+), 3 deletions(-) diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index 7584215010c..6e3eeacd8cf 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -480,6 +480,9 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p if ps.PushConfig == nil { ps.PushConfig = &pb.PushConfig{} } + if ps.BigqueryConfig == nil { + ps.BigqueryConfig = &pb.BigQueryConfig{} + } ps.TopicMessageRetentionDuration = top.proto.MessageRetentionDuration var deadLetterTopic *topic if ps.DeadLetterPolicy != nil { @@ -489,6 +492,9 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p } deadLetterTopic = dlTopic } + if ps.BigqueryConfig.Table != "" { + ps.BigqueryConfig.State = pb.BigQueryConfig_ACTIVE + } sub := newSubscription(top, &s.mu, s.timeNowFunc, deadLetterTopic, ps) top.subs[ps.Name] = sub @@ -578,6 +584,12 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti switch path { case "push_config": sub.proto.PushConfig = req.Subscription.PushConfig + + case "bigquery_config": + sub.proto.BigqueryConfig = req.GetSubscription().GetBigqueryConfig() + if sub.proto.BigqueryConfig.Table != "" { + sub.proto.BigqueryConfig.State = pb.BigQueryConfig_ACTIVE + } case "ack_deadline_seconds": a := req.Subscription.AckDeadlineSeconds diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index 2dce348c5f0..66adfb7323a 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -1455,3 +1455,62 @@ func TestTopicRetentionAdmin(t *testing.T) { t.Errorf("sub.TopicMessageRetentionDuration mismatch: %s", diff) } } + + +func TestSubscriptionPushPull(t *testing.T) { + ctx := context.Background() + pclient, sclient, _, cleanup := newFake(ctx, t) + defer cleanup() + + top := mustCreateTopic(ctx, t, pclient, &pb.Topic{ + Name: "projects/P/topics/T", + }) + + // Create a push based subscription + pc := &pb.PushConfig{ + PushEndpoint: "some-endpoint", + } + got := mustCreateSubscription(ctx, t, sclient, &pb.Subscription{ + AckDeadlineSeconds: minAckDeadlineSecs, + Name: "projects/P/subscriptions/S", + Topic: top.Name, + PushConfig: pc, + }) + + if diff := testutil.Diff(got.PushConfig, pc); diff != "" { + t.Errorf("sub.PushConfig mismatch: %s", diff) + } + + // Update the subscription to write to BigQuery instead. + updateSub := got + updateSub.PushConfig = &pb.PushConfig{} + bqc := &pb.BigQueryConfig{ + Table: "some-table", + } + updateSub.BigqueryConfig = bqc + got = mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{ + Subscription: updateSub, + UpdateMask: &field_mask.FieldMask{Paths: []string{"push_config", "bigquery_config"}}, + }) + if diff := testutil.Diff(got.PushConfig, new(pb.PushConfig)); diff != "" { + t.Errorf("sub.PushConfig should be zero value\n%s", diff) + } + want := bqc + want.State = pb.BigQueryConfig_ACTIVE + if diff := testutil.Diff(got.BigqueryConfig, bqc); diff != "" { + t.Errorf("sub.BigQueryConfig mismatch: %s", diff) + } + + // Switch to a pull based subscription. + updateSub.BigqueryConfig = &pb.BigQueryConfig{} + got = mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{ + Subscription: updateSub, + UpdateMask: &field_mask.FieldMask{Paths: []string{"bigquery_config"}}, + }) + if diff := testutil.Diff(got.PushConfig, new(pb.PushConfig)); diff != "" { + t.Errorf("sub.PushConfig should be zero value\n%s", diff) + } + if diff := testutil.Diff(got.BigqueryConfig, new(pb.BigQueryConfig)); diff != "" { + t.Errorf("sub.BigqueryConfig should be zero value\n%s", diff) + } +} \ No newline at end of file diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 0eb37487238..97e877c9874 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -210,14 +210,90 @@ func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ { } } + +type BigQueryConfigState int +// Possible states for a BigQuery subscription. +const ( + // Default value. This value is unused. + BigQueryConfigStateUnspecified = iota + + // The usbscription can actively send messages to BigQuery. + BigQueryConfigActive + + // Cannot write to the BigQuery table because of permission denied errors. + BigQueryConfigPermissionDenied + + // Cannot write to the BigQuery table because it does not exist. + BigQueryConfigNotFound + + // Cannot write to the BigQuery table due to a schema mismatch. + BigQueryConfigSchemaMismatch +) + +// BigQueryConfig configures the subscription to deliver to a BigQuery table. +type BigQueryConfig struct { + // The name of the table to which to write data, of the form + // {projectId}:{datasetId}.{tableId} + Table string + + // When true, use the topic's schema as the columns to write to in BigQuery, + // if it exists. + UseTopicSchema bool + + // When true, write the subscription name, message_id, publish_time, + // attributes, and ordering_key to additional columns in the table. The + // subscription name, message_id, and publish_time fields are put in their own + // columns while all other message properties (other than data) are written to + // a JSON object in the attributes column. + WriteMetadata bool + + + // When true and use_topic_schema is true, any fields that are a part of the + // topic schema that are not part of the BigQuery table schema are dropped + // when writing to BigQuery. Otherwise, the schemas must be kept in sync and + // any messages with extra fields are not written and remain in the + // subscription's backlog. + DropUnknownFields bool + + // Output only. An output-only field that indicates whether or not the subscription can + // receive messages. + State BigQueryConfigState +} + +func (bc *BigQueryConfig) toProto() *pb.BigQueryConfig{ + if bc == nil { + return nil + } + pbCfg := &pb.BigQueryConfig{ + Table: bc.Table, + UseTopicSchema: bc.UseTopicSchema, + WriteMetadata: bc.WriteMetadata, + DropUnknownFields: bc.DropUnknownFields, + State: pb.BigQueryConfig_State(bc.State), + } + return pbCfg +} + // SubscriptionConfig describes the configuration of a subscription. type SubscriptionConfig struct { // The fully qualified identifier for the subscription, in the format "projects//subscriptions/" name string + // The topic from which this subscription is receiving messages. Topic *Topic + + // If push delivery is used with this subscription, this field is + // used to configure it. Either `PushConfig` or `BigQueryConfig` can be set, + // but not both. If both are empty, then the subscriber will pull and ack + // messages using API methods. PushConfig PushConfig + // If delivery to BigQuery is used with this subscription, this field is + // used to configure it. Either `PushConfig` or `BigQueryConfig` can be set, + // but not both. If both are empty, then the subscriber will pull and ack + // messages using API methods. + BigQueryConfig BigQueryConfig + // The default maximum time after a subscriber receives a message before // the subscriber should acknowledge the message. Note: messages which are // obtained via Subscription.Receive need not be acknowledged within this @@ -317,6 +393,10 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil { pbPushConfig = cfg.PushConfig.toProto() } + var pbBigQueryConfig *pb.BigQueryConfig + if cfg.BigQueryConfig.Table!= "" { + pbBigQueryConfig = cfg.BigQueryConfig.toProto() + } var retentionDuration *durpb.Duration if cfg.RetentionDuration != 0 { retentionDuration = durpb.New(cfg.RetentionDuration) @@ -333,6 +413,7 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { Name: name, Topic: cfg.Topic.name, PushConfig: pbPushConfig, + BigqueryConfig: pbBigQueryConfig, AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())), RetainAckedMessages: cfg.RetainAckedMessages, MessageRetentionDuration: retentionDuration, @@ -372,10 +453,12 @@ func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionC Detached: pbSub.Detached, TopicMessageRetentionDuration: pbSub.TopicMessageRetentionDuration.AsDuration(), } - pc := protoToPushConfig(pbSub.PushConfig) - if pc != nil { + if pc := protoToPushConfig(pbSub.PushConfig); pc != nil { subC.PushConfig = *pc } + if bq := protoToBQConfig(pbSub.GetBigqueryConfig()); bq != nil { + subC.BigQueryConfig = *bq + } return subC, nil } @@ -398,6 +481,21 @@ func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig { return pc } +func protoToBQConfig(pbBQ *pb.BigQueryConfig) *BigQueryConfig { + if pbBQ == nil { + return nil + } + bq := &BigQueryConfig{ + Table: pbBQ.GetTable(), + UseTopicSchema: pbBQ.GetUseTopicSchema(), + DropUnknownFields: pbBQ.GetDropUnknownFields(), + WriteMetadata: pbBQ.GetWriteMetadata(), + State: BigQueryConfigState(pbBQ.State), + } + return bq +} + + // DeadLetterPolicy specifies the conditions for dead lettering messages in // a subscription. type DeadLetterPolicy struct { @@ -641,9 +739,14 @@ func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) { // SubscriptionConfigToUpdate describes how to update a subscription. type SubscriptionConfigToUpdate struct { - // If non-nil, the push config is changed. + // If non-nil, the push config is changed. Cannot be set at the same time as BigQueryConfig. + // If currently in push mode, set this value to the zero value to revert to a Pull based subscription. PushConfig *PushConfig + // If non-nil, the bigquery config is changed. Cannot be set at the same time as PushConfig. + // If currently in bigquery mode, set this value to the zero value to revert to a Pull based subscription, + BigQueryConfig *BigQueryConfig + // If non-zero, the ack deadline is changed. AckDeadline time.Duration @@ -698,6 +801,10 @@ func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.Update psub.PushConfig = cfg.PushConfig.toProto() paths = append(paths, "push_config") } + if cfg.BigQueryConfig != nil { + psub.BigqueryConfig = cfg.BigQueryConfig.toProto() + paths = append(paths, "bigquery_config") + } if cfg.AckDeadline != 0 { psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds())) paths = append(paths, "ack_deadline_seconds") diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go index 7595905152b..b9d2a7457db 100644 --- a/pubsub/subscription_test.go +++ b/pubsub/subscription_test.go @@ -435,3 +435,35 @@ func TestOrdering_CreateSubscription(t *testing.T) { msg.Ack() }) } + +func TestBigQuerySubscription(t *testing.T) { + ctx := context.Background() + client, srv := newFake(t) + defer client.Close() + defer srv.Close() + + topic := mustCreateTopic(t, client, "t") + bqTable := "some-project:some-dataset.some-table" + subConfig := SubscriptionConfig{ + Topic: topic, + BigQueryConfig: BigQueryConfig{ + Table: bqTable, + }, + } + bqSub, err := client.CreateSubscription(ctx, "s", subConfig) + if err != nil { + t.Fatal(err) + } + cfg, err := bqSub.Config(ctx) + if err != nil { + t.Fatal(err) + } + + want := BigQueryConfig{ + Table: bqTable, + State: BigQueryConfigActive, + } + if diff := testutil.Diff(cfg.BigQueryConfig, want); diff != "" { + t.Fatalf("CreateBQSubscription mismatch: \n%s", diff) + } +} From ed59e1236e1817481a6b81f56b6d566553775096 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Wed, 1 Jun 2022 17:38:53 -0700 Subject: [PATCH 02/10] fix formatting and update comments --- pubsub/pstest/fake_test.go | 2 +- pubsub/subscription.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index 66adfb7323a..25b49f37bab 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -1513,4 +1513,4 @@ func TestSubscriptionPushPull(t *testing.T) { if diff := testutil.Diff(got.BigqueryConfig, new(pb.BigQueryConfig)); diff != "" { t.Errorf("sub.BigqueryConfig should be zero value\n%s", diff) } -} \ No newline at end of file +} diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 97e877c9874..803b1a3740b 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -211,8 +211,9 @@ func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ { } +// BigQueryConfigState denotes the possible states for a BigQuery Subscription. type BigQueryConfigState int -// Possible states for a BigQuery subscription. + const ( // Default value. This value is unused. BigQueryConfigStateUnspecified = iota @@ -247,7 +248,6 @@ type BigQueryConfig struct { // a JSON object in the attributes column. WriteMetadata bool - // When true and use_topic_schema is true, any fields that are a part of the // topic schema that are not part of the BigQuery table schema are dropped // when writing to BigQuery. Otherwise, the schemas must be kept in sync and @@ -394,7 +394,7 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { pbPushConfig = cfg.PushConfig.toProto() } var pbBigQueryConfig *pb.BigQueryConfig - if cfg.BigQueryConfig.Table!= "" { + if cfg.BigQueryConfig.Table != "" { pbBigQueryConfig = cfg.BigQueryConfig.toProto() } var retentionDuration *durpb.Duration From ba763678fc605c28f222f9ab20f681df0fa2812b Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Wed, 1 Jun 2022 21:31:06 -0700 Subject: [PATCH 03/10] run gofmt --- pubsub/pstest/fake.go | 2 +- pubsub/pstest/fake_test.go | 13 ++++---- pubsub/subscription.go | 66 ++++++++++++++++++------------------- pubsub/subscription_test.go | 2 +- 4 files changed, 40 insertions(+), 43 deletions(-) diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index 6e3eeacd8cf..bbd5db6ebdf 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -584,7 +584,7 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti switch path { case "push_config": sub.proto.PushConfig = req.Subscription.PushConfig - + case "bigquery_config": sub.proto.BigqueryConfig = req.GetSubscription().GetBigqueryConfig() if sub.proto.BigqueryConfig.Table != "" { diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index 25b49f37bab..be3bb1f0586 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -1456,25 +1456,24 @@ func TestTopicRetentionAdmin(t *testing.T) { } } - func TestSubscriptionPushPull(t *testing.T) { ctx := context.Background() pclient, sclient, _, cleanup := newFake(ctx, t) defer cleanup() top := mustCreateTopic(ctx, t, pclient, &pb.Topic{ - Name: "projects/P/topics/T", + Name: "projects/P/topics/T", }) // Create a push based subscription pc := &pb.PushConfig{ PushEndpoint: "some-endpoint", - } + } got := mustCreateSubscription(ctx, t, sclient, &pb.Subscription{ AckDeadlineSeconds: minAckDeadlineSecs, Name: "projects/P/subscriptions/S", Topic: top.Name, - PushConfig: pc, + PushConfig: pc, }) if diff := testutil.Diff(got.PushConfig, pc); diff != "" { @@ -1487,10 +1486,10 @@ func TestSubscriptionPushPull(t *testing.T) { bqc := &pb.BigQueryConfig{ Table: "some-table", } - updateSub.BigqueryConfig = bqc + updateSub.BigqueryConfig = bqc got = mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{ Subscription: updateSub, - UpdateMask: &field_mask.FieldMask{Paths: []string{"push_config", "bigquery_config"}}, + UpdateMask: &field_mask.FieldMask{Paths: []string{"push_config", "bigquery_config"}}, }) if diff := testutil.Diff(got.PushConfig, new(pb.PushConfig)); diff != "" { t.Errorf("sub.PushConfig should be zero value\n%s", diff) @@ -1505,7 +1504,7 @@ func TestSubscriptionPushPull(t *testing.T) { updateSub.BigqueryConfig = &pb.BigQueryConfig{} got = mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{ Subscription: updateSub, - UpdateMask: &field_mask.FieldMask{Paths: []string{"bigquery_config"}}, + UpdateMask: &field_mask.FieldMask{Paths: []string{"bigquery_config"}}, }) if diff := testutil.Diff(got.PushConfig, new(pb.PushConfig)); diff != "" { t.Errorf("sub.PushConfig should be zero value\n%s", diff) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 803b1a3740b..d14ee3253e9 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -210,66 +210,65 @@ func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ { } } - // BigQueryConfigState denotes the possible states for a BigQuery Subscription. type BigQueryConfigState int const ( // Default value. This value is unused. BigQueryConfigStateUnspecified = iota - + // The usbscription can actively send messages to BigQuery. BigQueryConfigActive - + // Cannot write to the BigQuery table because of permission denied errors. BigQueryConfigPermissionDenied - + // Cannot write to the BigQuery table because it does not exist. BigQueryConfigNotFound - + // Cannot write to the BigQuery table due to a schema mismatch. BigQueryConfigSchemaMismatch ) -// BigQueryConfig configures the subscription to deliver to a BigQuery table. +// BigQueryConfig configures the subscription to deliver to a BigQuery table. type BigQueryConfig struct { // The name of the table to which to write data, of the form - // {projectId}:{datasetId}.{tableId} + // {projectId}:{datasetId}.{tableId} Table string // When true, use the topic's schema as the columns to write to in BigQuery, - // if it exists. + // if it exists. UseTopicSchema bool // When true, write the subscription name, message_id, publish_time, - // attributes, and ordering_key to additional columns in the table. The - // subscription name, message_id, and publish_time fields are put in their own - // columns while all other message properties (other than data) are written to - // a JSON object in the attributes column. + // attributes, and ordering_key to additional columns in the table. The + // subscription name, message_id, and publish_time fields are put in their own + // columns while all other message properties (other than data) are written to + // a JSON object in the attributes column. WriteMetadata bool // When true and use_topic_schema is true, any fields that are a part of the - // topic schema that are not part of the BigQuery table schema are dropped - // when writing to BigQuery. Otherwise, the schemas must be kept in sync and - // any messages with extra fields are not written and remain in the - // subscription's backlog. + // topic schema that are not part of the BigQuery table schema are dropped + // when writing to BigQuery. Otherwise, the schemas must be kept in sync and + // any messages with extra fields are not written and remain in the + // subscription's backlog. DropUnknownFields bool // Output only. An output-only field that indicates whether or not the subscription can - // receive messages. + // receive messages. State BigQueryConfigState } -func (bc *BigQueryConfig) toProto() *pb.BigQueryConfig{ +func (bc *BigQueryConfig) toProto() *pb.BigQueryConfig { if bc == nil { return nil } pbCfg := &pb.BigQueryConfig{ - Table: bc.Table, - UseTopicSchema: bc.UseTopicSchema, - WriteMetadata: bc.WriteMetadata, + Table: bc.Table, + UseTopicSchema: bc.UseTopicSchema, + WriteMetadata: bc.WriteMetadata, DropUnknownFields: bc.DropUnknownFields, - State: pb.BigQueryConfig_State(bc.State), + State: pb.BigQueryConfig_State(bc.State), } return pbCfg } @@ -280,12 +279,12 @@ type SubscriptionConfig struct { name string // The topic from which this subscription is receiving messages. - Topic *Topic + Topic *Topic - // If push delivery is used with this subscription, this field is - // used to configure it. Either `PushConfig` or `BigQueryConfig` can be set, - // but not both. If both are empty, then the subscriber will pull and ack - // messages using API methods. + // If push delivery is used with this subscription, this field is + // used to configure it. Either `PushConfig` or `BigQueryConfig` can be set, + // but not both. If both are empty, then the subscriber will pull and ack + // messages using API methods. PushConfig PushConfig // If delivery to BigQuery is used with this subscription, this field is @@ -413,7 +412,7 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { Name: name, Topic: cfg.Topic.name, PushConfig: pbPushConfig, - BigqueryConfig: pbBigQueryConfig, + BigqueryConfig: pbBigQueryConfig, AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())), RetainAckedMessages: cfg.RetainAckedMessages, MessageRetentionDuration: retentionDuration, @@ -486,16 +485,15 @@ func protoToBQConfig(pbBQ *pb.BigQueryConfig) *BigQueryConfig { return nil } bq := &BigQueryConfig{ - Table: pbBQ.GetTable(), - UseTopicSchema: pbBQ.GetUseTopicSchema(), + Table: pbBQ.GetTable(), + UseTopicSchema: pbBQ.GetUseTopicSchema(), DropUnknownFields: pbBQ.GetDropUnknownFields(), - WriteMetadata: pbBQ.GetWriteMetadata(), - State: BigQueryConfigState(pbBQ.State), + WriteMetadata: pbBQ.GetWriteMetadata(), + State: BigQueryConfigState(pbBQ.State), } return bq } - // DeadLetterPolicy specifies the conditions for dead lettering messages in // a subscription. type DeadLetterPolicy struct { @@ -744,7 +742,7 @@ type SubscriptionConfigToUpdate struct { PushConfig *PushConfig // If non-nil, the bigquery config is changed. Cannot be set at the same time as PushConfig. - // If currently in bigquery mode, set this value to the zero value to revert to a Pull based subscription, + // If currently in bigquery mode, set this value to the zero value to revert to a Pull based subscription, BigQueryConfig *BigQueryConfig // If non-zero, the ack deadline is changed. diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go index b9d2a7457db..296ed6e0c3f 100644 --- a/pubsub/subscription_test.go +++ b/pubsub/subscription_test.go @@ -445,7 +445,7 @@ func TestBigQuerySubscription(t *testing.T) { topic := mustCreateTopic(t, client, "t") bqTable := "some-project:some-dataset.some-table" subConfig := SubscriptionConfig{ - Topic: topic, + Topic: topic, BigQueryConfig: BigQueryConfig{ Table: bqTable, }, From 8c8b73e73fe43476c4c318adf72329ba655c427c Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Thu, 2 Jun 2022 12:18:41 -0700 Subject: [PATCH 04/10] simplify fake behavior for bqconfig --- pubsub/pstest/fake.go | 6 ------ pubsub/pstest/fake_test.go | 2 -- pubsub/subscription_test.go | 16 +++++++--------- 3 files changed, 7 insertions(+), 17 deletions(-) diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index bbd5db6ebdf..426758e7db6 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -492,9 +492,6 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p } deadLetterTopic = dlTopic } - if ps.BigqueryConfig.Table != "" { - ps.BigqueryConfig.State = pb.BigQueryConfig_ACTIVE - } sub := newSubscription(top, &s.mu, s.timeNowFunc, deadLetterTopic, ps) top.subs[ps.Name] = sub @@ -587,9 +584,6 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti case "bigquery_config": sub.proto.BigqueryConfig = req.GetSubscription().GetBigqueryConfig() - if sub.proto.BigqueryConfig.Table != "" { - sub.proto.BigqueryConfig.State = pb.BigQueryConfig_ACTIVE - } case "ack_deadline_seconds": a := req.Subscription.AckDeadlineSeconds diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index be3bb1f0586..2c064beec57 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -1494,8 +1494,6 @@ func TestSubscriptionPushPull(t *testing.T) { if diff := testutil.Diff(got.PushConfig, new(pb.PushConfig)); diff != "" { t.Errorf("sub.PushConfig should be zero value\n%s", diff) } - want := bqc - want.State = pb.BigQueryConfig_ACTIVE if diff := testutil.Diff(got.BigqueryConfig, bqc); diff != "" { t.Errorf("sub.BigQueryConfig mismatch: %s", diff) } diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go index 296ed6e0c3f..dec9bbb8c26 100644 --- a/pubsub/subscription_test.go +++ b/pubsub/subscription_test.go @@ -444,11 +444,13 @@ func TestBigQuerySubscription(t *testing.T) { topic := mustCreateTopic(t, client, "t") bqTable := "some-project:some-dataset.some-table" + bqConfig := BigQueryConfig{ + Table: bqTable, + } + subConfig := SubscriptionConfig{ - Topic: topic, - BigQueryConfig: BigQueryConfig{ - Table: bqTable, - }, + Topic: topic, + BigQueryConfig: bqConfig, } bqSub, err := client.CreateSubscription(ctx, "s", subConfig) if err != nil { @@ -459,11 +461,7 @@ func TestBigQuerySubscription(t *testing.T) { t.Fatal(err) } - want := BigQueryConfig{ - Table: bqTable, - State: BigQueryConfigActive, - } - if diff := testutil.Diff(cfg.BigQueryConfig, want); diff != "" { + if diff := testutil.Diff(cfg.BigQueryConfig, bqConfig); diff != "" { t.Fatalf("CreateBQSubscription mismatch: \n%s", diff) } } From 6cf0954ecabd618415b272d7248807f9ecd80fe2 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 6 Jun 2022 09:47:35 -0700 Subject: [PATCH 05/10] fix comments --- pubsub/pstest/fake_test.go | 4 ++-- pubsub/subscription.go | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index 2c064beec57..a5bae971f20 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -1465,7 +1465,7 @@ func TestSubscriptionPushPull(t *testing.T) { Name: "projects/P/topics/T", }) - // Create a push based subscription + // Create a push subscription. pc := &pb.PushConfig{ PushEndpoint: "some-endpoint", } @@ -1498,7 +1498,7 @@ func TestSubscriptionPushPull(t *testing.T) { t.Errorf("sub.BigQueryConfig mismatch: %s", diff) } - // Switch to a pull based subscription. + // Switch back to a pull subscription. updateSub.BigqueryConfig = &pb.BigQueryConfig{} got = mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{ Subscription: updateSub, diff --git a/pubsub/subscription.go b/pubsub/subscription.go index d14ee3253e9..aafa2c466bf 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -254,8 +254,9 @@ type BigQueryConfig struct { // subscription's backlog. DropUnknownFields bool - // Output only. An output-only field that indicates whether or not the subscription can - // receive messages. + // This is an output-only field that indicates whether or not the subscription can + // receive messages. This field is set only in responses from the server; + // it is ignored if it is set in any requests. State BigQueryConfigState } From 70d876dab5e787a36fc1b8fdb61931c2a3567871 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 6 Jun 2022 10:05:05 -0700 Subject: [PATCH 06/10] add comments to exported consts --- pubsub/subscription.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index aafa2c466bf..520ee3ef240 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -214,19 +214,19 @@ func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ { type BigQueryConfigState int const ( - // Default value. This value is unused. + // BigQueryConfigStateUnspecified is the default value. This value is unused. BigQueryConfigStateUnspecified = iota - // The usbscription can actively send messages to BigQuery. + // BigQueryConfigActive means the subscription can actively send messages to BigQuery. BigQueryConfigActive - // Cannot write to the BigQuery table because of permission denied errors. + // BigQueryConfigPermissionDenied means the subscription cannot write to the BigQuery table because of permission denied errors. BigQueryConfigPermissionDenied - // Cannot write to the BigQuery table because it does not exist. + // BigQueryConfigNotFound means the subscription cannot write to the BigQuery table because it does not exist. BigQueryConfigNotFound - // Cannot write to the BigQuery table due to a schema mismatch. + // BigQueryConfigSchemaMismatch means the subscription cannot write to the BigQuery table due to a schema mismatch. BigQueryConfigSchemaMismatch ) From 021208a950ae45f4285f4b98642dbe9422d6c179 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 6 Jun 2022 23:51:28 -0700 Subject: [PATCH 07/10] add SubscriptionState field --- pubsub/integration_test.go | 2 ++ pubsub/pstest/fake.go | 6 ++++++ pubsub/pstest/fake_test.go | 4 +++- pubsub/subscription.go | 21 +++++++++++++++++++++ pubsub/subscription_test.go | 6 +++++- 5 files changed, 37 insertions(+), 2 deletions(-) diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 73375286b5c..e0417d17b83 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -625,6 +625,7 @@ func TestIntegration_UpdateSubscription(t *testing.T) { ServiceAccountEmail: serviceAccountEmail, }, }, + State: SubscriptionStateActive, } opt := cmpopts.IgnoreUnexported(SubscriptionConfig{}) if diff := testutil.Diff(got, want, opt); diff != "" { @@ -658,6 +659,7 @@ func TestIntegration_UpdateSubscription(t *testing.T) { RetentionDuration: 2 * time.Hour, Labels: map[string]string{"label": "value"}, ExpirationPolicy: 25 * time.Hour, + State: SubscriptionStateActive, } if !testutil.Equal(got, want, opt) { diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index 426758e7db6..964b692642a 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -482,6 +482,8 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p } if ps.BigqueryConfig == nil { ps.BigqueryConfig = &pb.BigQueryConfig{} + } else if ps.BigqueryConfig.Table != "" { + ps.BigqueryConfig.State = pb.BigQueryConfig_ACTIVE } ps.TopicMessageRetentionDuration = top.proto.MessageRetentionDuration var deadLetterTopic *topic @@ -584,6 +586,9 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti case "bigquery_config": sub.proto.BigqueryConfig = req.GetSubscription().GetBigqueryConfig() + if sub.proto.GetBigqueryConfig().GetTable() != "" { + sub.proto.GetBigqueryConfig().State = pb.BigQueryConfig_ACTIVE + } case "ack_deadline_seconds": a := req.Subscription.AckDeadlineSeconds @@ -794,6 +799,7 @@ func newSubscription(t *topic, mu *sync.Mutex, timeNowFunc func() time.Time, dea if at == 0 { at = 10 * time.Second } + ps.State = pb.Subscription_ACTIVE return &subscription{ topic: t, deadLetterTopic: deadLetterTopic, diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index a5bae971f20..ee5f604b574 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -1494,7 +1494,9 @@ func TestSubscriptionPushPull(t *testing.T) { if diff := testutil.Diff(got.PushConfig, new(pb.PushConfig)); diff != "" { t.Errorf("sub.PushConfig should be zero value\n%s", diff) } - if diff := testutil.Diff(got.BigqueryConfig, bqc); diff != "" { + want := bqc + want.State = pb.BigQueryConfig_ACTIVE + if diff := testutil.Diff(got.BigqueryConfig, want); diff != "" { t.Errorf("sub.BigQueryConfig mismatch: %s", diff) } diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 520ee3ef240..525d81e648e 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -274,6 +274,21 @@ func (bc *BigQueryConfig) toProto() *pb.BigQueryConfig { return pbCfg } +type SubscriptionState int + +const ( + // SubscriptionStateUnspecified is the default value. This value is unused. + SubscriptionStateUnspecified = iota + + // SubscriptionStateActive means the subscription can actively send messages to BigQuery. + SubscriptionStateActive + + // SubscriptionStateResourceError means the subscription receive messages because of an + // error with the resource to which it pushes messages. + // See the more detailed error state in the corresponding configuration. + SubscriptionStateResourceError +) + // SubscriptionConfig describes the configuration of a subscription. type SubscriptionConfig struct { // The fully qualified identifier for the subscription, in the format "projects//subscriptions/" @@ -366,6 +381,11 @@ type SubscriptionConfig struct { // This is an output only field, meaning it will only appear in responses from the backend // and will be ignored if sent in a request. TopicMessageRetentionDuration time.Duration + + // This is an output-only field that indicates whether or not the subscription can + // receive messages. This field is set only in responses from the server; + // it is ignored if it is set in any requests. + State SubscriptionState } // String returns the globally unique printable name of the subscription config. @@ -452,6 +472,7 @@ func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionC RetryPolicy: rp, Detached: pbSub.Detached, TopicMessageRetentionDuration: pbSub.TopicMessageRetentionDuration.AsDuration(), + State: SubscriptionState(pbSub.State), } if pc := protoToPushConfig(pbSub.PushConfig); pc != nil { subC.PushConfig = *pc diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go index dec9bbb8c26..7d4ac9ee28c 100644 --- a/pubsub/subscription_test.go +++ b/pubsub/subscription_test.go @@ -190,6 +190,7 @@ func TestUpdateSubscription(t *testing.T) { Audience: "client-12345", }, }, + State: SubscriptionStateActive, } opt := cmpopts.IgnoreUnexported(SubscriptionConfig{}) if !testutil.Equal(cfg, want, opt) { @@ -226,6 +227,7 @@ func TestUpdateSubscription(t *testing.T) { Audience: "client-12345", }, }, + State: SubscriptionStateActive, } if !testutil.Equal(got, want, opt) { t.Fatalf("\ngot %+v\nwant %+v", got, want) @@ -461,7 +463,9 @@ func TestBigQuerySubscription(t *testing.T) { t.Fatal(err) } - if diff := testutil.Diff(cfg.BigQueryConfig, bqConfig); diff != "" { + want := bqConfig + want.State = BigQueryConfigActive + if diff := testutil.Diff(cfg.BigQueryConfig, want); diff != "" { t.Fatalf("CreateBQSubscription mismatch: \n%s", diff) } } From 40f4fce847a503f64db11725a17d19b9d2638f30 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 7 Jun 2022 09:10:59 -0700 Subject: [PATCH 08/10] add comment to exported field --- pubsub/subscription.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 525d81e648e..702d03db88d 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -382,6 +382,7 @@ type SubscriptionConfig struct { // and will be ignored if sent in a request. TopicMessageRetentionDuration time.Duration + // State indicates whether or not the subscription can receive messages. // This is an output-only field that indicates whether or not the subscription can // receive messages. This field is set only in responses from the server; // it is ignored if it is set in any requests. From 7df8cc50b864ce77136584e31fe556c37bac91cb Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 7 Jun 2022 09:28:20 -0700 Subject: [PATCH 09/10] add comment to exported type --- pubsub/subscription.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 702d03db88d..0c2b2f4bc70 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -274,6 +274,7 @@ func (bc *BigQueryConfig) toProto() *pb.BigQueryConfig { return pbCfg } +// BigQueryConfigState denotes the possible states for a Subscription. type SubscriptionState int const ( From 8f79979b4ab151d122806e5b45233557ad364957 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 7 Jun 2022 10:31:34 -0700 Subject: [PATCH 10/10] fix comment --- pubsub/subscription.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 0c2b2f4bc70..daaf6748df9 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -274,7 +274,7 @@ func (bc *BigQueryConfig) toProto() *pb.BigQueryConfig { return pbCfg } -// BigQueryConfigState denotes the possible states for a Subscription. +// SubscriptionState denotes the possible states for a Subscription. type SubscriptionState int const (