Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): support bigquery subscriptions #6119

Merged
merged 16 commits into from Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions pubsub/integration_test.go
Expand Up @@ -625,6 +625,7 @@ func TestIntegration_UpdateSubscription(t *testing.T) {
ServiceAccountEmail: serviceAccountEmail,
},
},
State: SubscriptionStateActive,
}
opt := cmpopts.IgnoreUnexported(SubscriptionConfig{})
if diff := testutil.Diff(got, want, opt); diff != "" {
Expand Down Expand Up @@ -658,6 +659,7 @@ func TestIntegration_UpdateSubscription(t *testing.T) {
RetentionDuration: 2 * time.Hour,
Labels: map[string]string{"label": "value"},
ExpirationPolicy: 25 * time.Hour,
State: SubscriptionStateActive,
}

if !testutil.Equal(got, want, opt) {
Expand Down
12 changes: 12 additions & 0 deletions pubsub/pstest/fake.go
Expand Up @@ -480,6 +480,11 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p
if ps.PushConfig == nil {
ps.PushConfig = &pb.PushConfig{}
}
if ps.BigqueryConfig == nil {
ps.BigqueryConfig = &pb.BigQueryConfig{}
} else if ps.BigqueryConfig.Table != "" {
ps.BigqueryConfig.State = pb.BigQueryConfig_ACTIVE
}
ps.TopicMessageRetentionDuration = top.proto.MessageRetentionDuration
var deadLetterTopic *topic
if ps.DeadLetterPolicy != nil {
Expand Down Expand Up @@ -579,6 +584,12 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti
case "push_config":
sub.proto.PushConfig = req.Subscription.PushConfig

case "bigquery_config":
sub.proto.BigqueryConfig = req.GetSubscription().GetBigqueryConfig()
if sub.proto.GetBigqueryConfig().GetTable() != "" {
sub.proto.GetBigqueryConfig().State = pb.BigQueryConfig_ACTIVE
}

case "ack_deadline_seconds":
a := req.Subscription.AckDeadlineSeconds
if err := checkAckDeadline(a); err != nil {
Expand Down Expand Up @@ -788,6 +799,7 @@ func newSubscription(t *topic, mu *sync.Mutex, timeNowFunc func() time.Time, dea
if at == 0 {
at = 10 * time.Second
}
ps.State = pb.Subscription_ACTIVE
return &subscription{
topic: t,
deadLetterTopic: deadLetterTopic,
Expand Down
58 changes: 58 additions & 0 deletions pubsub/pstest/fake_test.go
Expand Up @@ -1455,3 +1455,61 @@ func TestTopicRetentionAdmin(t *testing.T) {
t.Errorf("sub.TopicMessageRetentionDuration mismatch: %s", diff)
}
}

func TestSubscriptionPushPull(t *testing.T) {
ctx := context.Background()
pclient, sclient, _, cleanup := newFake(ctx, t)
defer cleanup()

top := mustCreateTopic(ctx, t, pclient, &pb.Topic{
Name: "projects/P/topics/T",
})

// Create a push subscription.
pc := &pb.PushConfig{
PushEndpoint: "some-endpoint",
}
got := mustCreateSubscription(ctx, t, sclient, &pb.Subscription{
AckDeadlineSeconds: minAckDeadlineSecs,
Name: "projects/P/subscriptions/S",
Topic: top.Name,
PushConfig: pc,
})

if diff := testutil.Diff(got.PushConfig, pc); diff != "" {
t.Errorf("sub.PushConfig mismatch: %s", diff)
}

// Update the subscription to write to BigQuery instead.
updateSub := got
updateSub.PushConfig = &pb.PushConfig{}
bqc := &pb.BigQueryConfig{
Table: "some-table",
}
updateSub.BigqueryConfig = bqc
got = mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{
Subscription: updateSub,
UpdateMask: &field_mask.FieldMask{Paths: []string{"push_config", "bigquery_config"}},
})
if diff := testutil.Diff(got.PushConfig, new(pb.PushConfig)); diff != "" {
t.Errorf("sub.PushConfig should be zero value\n%s", diff)
}
want := bqc
want.State = pb.BigQueryConfig_ACTIVE
if diff := testutil.Diff(got.BigqueryConfig, want); diff != "" {
t.Errorf("sub.BigQueryConfig mismatch: %s", diff)
}

// Switch back to a pull subscription.
updateSub.BigqueryConfig = &pb.BigQueryConfig{}
got = mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{
Subscription: updateSub,
UpdateMask: &field_mask.FieldMask{Paths: []string{"bigquery_config"}},
})
if diff := testutil.Diff(got.PushConfig, new(pb.PushConfig)); diff != "" {
t.Errorf("sub.PushConfig should be zero value\n%s", diff)
}
if diff := testutil.Diff(got.BigqueryConfig, new(pb.BigQueryConfig)); diff != "" {
t.Errorf("sub.BigqueryConfig should be zero value\n%s", diff)
}
}
137 changes: 133 additions & 4 deletions pubsub/subscription.go
Expand Up @@ -210,14 +210,106 @@ func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ {
}
}

// BigQueryConfigState denotes the possible states for a BigQuery Subscription.
type BigQueryConfigState int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We added a SubscriptionState field as well as part of this feature. Should we add that here as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. I also made the fake return Active for both Subscription/BigQueryConfigState by default.


const (
// BigQueryConfigStateUnspecified is the default value. This value is unused.
BigQueryConfigStateUnspecified = iota

// BigQueryConfigActive means the subscription can actively send messages to BigQuery.
BigQueryConfigActive

// BigQueryConfigPermissionDenied means the subscription cannot write to the BigQuery table because of permission denied errors.
BigQueryConfigPermissionDenied

// BigQueryConfigNotFound means the subscription cannot write to the BigQuery table because it does not exist.
BigQueryConfigNotFound

// BigQueryConfigSchemaMismatch means the subscription cannot write to the BigQuery table due to a schema mismatch.
BigQueryConfigSchemaMismatch
)

// BigQueryConfig configures the subscription to deliver to a BigQuery table.
type BigQueryConfig struct {
// The name of the table to which to write data, of the form
// {projectId}:{datasetId}.{tableId}
Table string

// When true, use the topic's schema as the columns to write to in BigQuery,
// if it exists.
UseTopicSchema bool

// When true, write the subscription name, message_id, publish_time,
// attributes, and ordering_key to additional columns in the table. The
// subscription name, message_id, and publish_time fields are put in their own
// columns while all other message properties (other than data) are written to
// a JSON object in the attributes column.
WriteMetadata bool

// When true and use_topic_schema is true, any fields that are a part of the
// topic schema that are not part of the BigQuery table schema are dropped
// when writing to BigQuery. Otherwise, the schemas must be kept in sync and
// any messages with extra fields are not written and remain in the
// subscription's backlog.
DropUnknownFields bool

// This is an output-only field that indicates whether or not the subscription can
// receive messages. This field is set only in responses from the server;
// it is ignored if it is set in any requests.
State BigQueryConfigState
}

func (bc *BigQueryConfig) toProto() *pb.BigQueryConfig {
if bc == nil {
return nil
}
pbCfg := &pb.BigQueryConfig{
Table: bc.Table,
UseTopicSchema: bc.UseTopicSchema,
WriteMetadata: bc.WriteMetadata,
DropUnknownFields: bc.DropUnknownFields,
State: pb.BigQueryConfig_State(bc.State),
}
return pbCfg
}

// SubscriptionState denotes the possible states for a Subscription.
type SubscriptionState int

const (
// SubscriptionStateUnspecified is the default value. This value is unused.
SubscriptionStateUnspecified = iota

// SubscriptionStateActive means the subscription can actively send messages to BigQuery.
SubscriptionStateActive

// SubscriptionStateResourceError means the subscription receive messages because of an
// error with the resource to which it pushes messages.
// See the more detailed error state in the corresponding configuration.
SubscriptionStateResourceError
)

// SubscriptionConfig describes the configuration of a subscription.
type SubscriptionConfig struct {
// The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
name string

Topic *Topic
// The topic from which this subscription is receiving messages.
Topic *Topic

// If push delivery is used with this subscription, this field is
// used to configure it. Either `PushConfig` or `BigQueryConfig` can be set,
// but not both. If both are empty, then the subscriber will pull and ack
// messages using API methods.
PushConfig PushConfig

// If delivery to BigQuery is used with this subscription, this field is
// used to configure it. Either `PushConfig` or `BigQueryConfig` can be set,
// but not both. If both are empty, then the subscriber will pull and ack
// messages using API methods.
BigQueryConfig BigQueryConfig

// The default maximum time after a subscriber receives a message before
// the subscriber should acknowledge the message. Note: messages which are
// obtained via Subscription.Receive need not be acknowledged within this
Expand Down Expand Up @@ -290,6 +382,12 @@ type SubscriptionConfig struct {
// This is an output only field, meaning it will only appear in responses from the backend
// and will be ignored if sent in a request.
TopicMessageRetentionDuration time.Duration

// State indicates whether or not the subscription can receive messages.
// This is an output-only field that indicates whether or not the subscription can
// receive messages. This field is set only in responses from the server;
// it is ignored if it is set in any requests.
State SubscriptionState
}

// String returns the globally unique printable name of the subscription config.
Expand Down Expand Up @@ -317,6 +415,10 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil {
pbPushConfig = cfg.PushConfig.toProto()
}
var pbBigQueryConfig *pb.BigQueryConfig
if cfg.BigQueryConfig.Table != "" {
pbBigQueryConfig = cfg.BigQueryConfig.toProto()
}
var retentionDuration *durpb.Duration
if cfg.RetentionDuration != 0 {
retentionDuration = durpb.New(cfg.RetentionDuration)
Expand All @@ -333,6 +435,7 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
Name: name,
Topic: cfg.Topic.name,
PushConfig: pbPushConfig,
BigqueryConfig: pbBigQueryConfig,
AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())),
RetainAckedMessages: cfg.RetainAckedMessages,
MessageRetentionDuration: retentionDuration,
Expand Down Expand Up @@ -371,11 +474,14 @@ func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionC
RetryPolicy: rp,
Detached: pbSub.Detached,
TopicMessageRetentionDuration: pbSub.TopicMessageRetentionDuration.AsDuration(),
State: SubscriptionState(pbSub.State),
}
pc := protoToPushConfig(pbSub.PushConfig)
if pc != nil {
if pc := protoToPushConfig(pbSub.PushConfig); pc != nil {
subC.PushConfig = *pc
}
if bq := protoToBQConfig(pbSub.GetBigqueryConfig()); bq != nil {
subC.BigQueryConfig = *bq
}
return subC, nil
}

Expand All @@ -398,6 +504,20 @@ func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig {
return pc
}

func protoToBQConfig(pbBQ *pb.BigQueryConfig) *BigQueryConfig {
if pbBQ == nil {
return nil
}
bq := &BigQueryConfig{
Table: pbBQ.GetTable(),
UseTopicSchema: pbBQ.GetUseTopicSchema(),
DropUnknownFields: pbBQ.GetDropUnknownFields(),
WriteMetadata: pbBQ.GetWriteMetadata(),
State: BigQueryConfigState(pbBQ.State),
}
return bq
}

// DeadLetterPolicy specifies the conditions for dead lettering messages in
// a subscription.
type DeadLetterPolicy struct {
Expand Down Expand Up @@ -641,9 +761,14 @@ func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) {

// SubscriptionConfigToUpdate describes how to update a subscription.
type SubscriptionConfigToUpdate struct {
// If non-nil, the push config is changed.
// If non-nil, the push config is changed. Cannot be set at the same time as BigQueryConfig.
// If currently in push mode, set this value to the zero value to revert to a Pull based subscription.
PushConfig *PushConfig

// If non-nil, the bigquery config is changed. Cannot be set at the same time as PushConfig.
// If currently in bigquery mode, set this value to the zero value to revert to a Pull based subscription,
BigQueryConfig *BigQueryConfig

// If non-zero, the ack deadline is changed.
AckDeadline time.Duration

Expand Down Expand Up @@ -698,6 +823,10 @@ func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.Update
psub.PushConfig = cfg.PushConfig.toProto()
paths = append(paths, "push_config")
}
if cfg.BigQueryConfig != nil {
psub.BigqueryConfig = cfg.BigQueryConfig.toProto()
paths = append(paths, "bigquery_config")
}
if cfg.AckDeadline != 0 {
psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds()))
paths = append(paths, "ack_deadline_seconds")
Expand Down
34 changes: 34 additions & 0 deletions pubsub/subscription_test.go
Expand Up @@ -190,6 +190,7 @@ func TestUpdateSubscription(t *testing.T) {
Audience: "client-12345",
},
},
State: SubscriptionStateActive,
}
opt := cmpopts.IgnoreUnexported(SubscriptionConfig{})
if !testutil.Equal(cfg, want, opt) {
Expand Down Expand Up @@ -226,6 +227,7 @@ func TestUpdateSubscription(t *testing.T) {
Audience: "client-12345",
},
},
State: SubscriptionStateActive,
}
if !testutil.Equal(got, want, opt) {
t.Fatalf("\ngot %+v\nwant %+v", got, want)
Expand Down Expand Up @@ -435,3 +437,35 @@ func TestOrdering_CreateSubscription(t *testing.T) {
msg.Ack()
})
}

func TestBigQuerySubscription(t *testing.T) {
ctx := context.Background()
client, srv := newFake(t)
defer client.Close()
defer srv.Close()

topic := mustCreateTopic(t, client, "t")
bqTable := "some-project:some-dataset.some-table"
bqConfig := BigQueryConfig{
Table: bqTable,
}

subConfig := SubscriptionConfig{
Topic: topic,
BigQueryConfig: bqConfig,
}
bqSub, err := client.CreateSubscription(ctx, "s", subConfig)
if err != nil {
t.Fatal(err)
}
cfg, err := bqSub.Config(ctx)
if err != nil {
t.Fatal(err)
}

want := bqConfig
want.State = BigQueryConfigActive
if diff := testutil.Diff(cfg.BigQueryConfig, want); diff != "" {
t.Fatalf("CreateBQSubscription mismatch: \n%s", diff)
}
}