Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): support bigquery subscriptions #6119

Merged
merged 16 commits into from Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions pubsub/pstest/fake.go
Expand Up @@ -480,6 +480,9 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p
if ps.PushConfig == nil {
ps.PushConfig = &pb.PushConfig{}
}
if ps.BigqueryConfig == nil {
ps.BigqueryConfig = &pb.BigQueryConfig{}
}
ps.TopicMessageRetentionDuration = top.proto.MessageRetentionDuration
var deadLetterTopic *topic
if ps.DeadLetterPolicy != nil {
Expand Down Expand Up @@ -579,6 +582,9 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti
case "push_config":
sub.proto.PushConfig = req.Subscription.PushConfig

case "bigquery_config":
sub.proto.BigqueryConfig = req.GetSubscription().GetBigqueryConfig()

case "ack_deadline_seconds":
a := req.Subscription.AckDeadlineSeconds
if err := checkAckDeadline(a); err != nil {
Expand Down
56 changes: 56 additions & 0 deletions pubsub/pstest/fake_test.go
Expand Up @@ -1455,3 +1455,59 @@ func TestTopicRetentionAdmin(t *testing.T) {
t.Errorf("sub.TopicMessageRetentionDuration mismatch: %s", diff)
}
}

func TestSubscriptionPushPull(t *testing.T) {
ctx := context.Background()
pclient, sclient, _, cleanup := newFake(ctx, t)
defer cleanup()

top := mustCreateTopic(ctx, t, pclient, &pb.Topic{
Name: "projects/P/topics/T",
})

// Create a push subscription.
pc := &pb.PushConfig{
PushEndpoint: "some-endpoint",
}
got := mustCreateSubscription(ctx, t, sclient, &pb.Subscription{
AckDeadlineSeconds: minAckDeadlineSecs,
Name: "projects/P/subscriptions/S",
Topic: top.Name,
PushConfig: pc,
})

if diff := testutil.Diff(got.PushConfig, pc); diff != "" {
t.Errorf("sub.PushConfig mismatch: %s", diff)
}

// Update the subscription to write to BigQuery instead.
updateSub := got
updateSub.PushConfig = &pb.PushConfig{}
bqc := &pb.BigQueryConfig{
Table: "some-table",
}
updateSub.BigqueryConfig = bqc
got = mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{
Subscription: updateSub,
UpdateMask: &field_mask.FieldMask{Paths: []string{"push_config", "bigquery_config"}},
})
if diff := testutil.Diff(got.PushConfig, new(pb.PushConfig)); diff != "" {
t.Errorf("sub.PushConfig should be zero value\n%s", diff)
}
if diff := testutil.Diff(got.BigqueryConfig, bqc); diff != "" {
t.Errorf("sub.BigQueryConfig mismatch: %s", diff)
}

// Switch back to a pull subscription.
updateSub.BigqueryConfig = &pb.BigQueryConfig{}
got = mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{
Subscription: updateSub,
UpdateMask: &field_mask.FieldMask{Paths: []string{"bigquery_config"}},
})
if diff := testutil.Diff(got.PushConfig, new(pb.PushConfig)); diff != "" {
t.Errorf("sub.PushConfig should be zero value\n%s", diff)
}
if diff := testutil.Diff(got.BigqueryConfig, new(pb.BigQueryConfig)); diff != "" {
t.Errorf("sub.BigqueryConfig should be zero value\n%s", diff)
}
}
114 changes: 110 additions & 4 deletions pubsub/subscription.go
Expand Up @@ -210,14 +210,90 @@ func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ {
}
}

// BigQueryConfigState denotes the possible states for a BigQuery Subscription.
type BigQueryConfigState int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We added a SubscriptionState field as well as part of this feature. Should we add that here as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. I also made the fake return Active for both Subscription/BigQueryConfigState by default.


const (
// BigQueryConfigStateUnspecified is the default value. This value is unused.
BigQueryConfigStateUnspecified = iota

// BigQueryConfigActive means the subscription can actively send messages to BigQuery.
BigQueryConfigActive

// BigQueryConfigPermissionDenied means the subscription cannot write to the BigQuery table because of permission denied errors.
BigQueryConfigPermissionDenied

// BigQueryConfigNotFound means the subscription cannot write to the BigQuery table because it does not exist.
BigQueryConfigNotFound

// BigQueryConfigSchemaMismatch means the subscription cannot write to the BigQuery table due to a schema mismatch.
BigQueryConfigSchemaMismatch
)

// BigQueryConfig configures the subscription to deliver to a BigQuery table.
type BigQueryConfig struct {
// The name of the table to which to write data, of the form
// {projectId}:{datasetId}.{tableId}
Table string

// When true, use the topic's schema as the columns to write to in BigQuery,
// if it exists.
UseTopicSchema bool

// When true, write the subscription name, message_id, publish_time,
// attributes, and ordering_key to additional columns in the table. The
// subscription name, message_id, and publish_time fields are put in their own
// columns while all other message properties (other than data) are written to
// a JSON object in the attributes column.
WriteMetadata bool

// When true and use_topic_schema is true, any fields that are a part of the
// topic schema that are not part of the BigQuery table schema are dropped
// when writing to BigQuery. Otherwise, the schemas must be kept in sync and
// any messages with extra fields are not written and remain in the
// subscription's backlog.
DropUnknownFields bool

// This is an output-only field that indicates whether or not the subscription can
// receive messages. This field is set only in responses from the server;
// it is ignored if it is set in any requests.
State BigQueryConfigState
}

func (bc *BigQueryConfig) toProto() *pb.BigQueryConfig {
if bc == nil {
return nil
}
pbCfg := &pb.BigQueryConfig{
Table: bc.Table,
UseTopicSchema: bc.UseTopicSchema,
WriteMetadata: bc.WriteMetadata,
DropUnknownFields: bc.DropUnknownFields,
State: pb.BigQueryConfig_State(bc.State),
}
return pbCfg
}

// SubscriptionConfig describes the configuration of a subscription.
type SubscriptionConfig struct {
// The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
name string

Topic *Topic
// The topic from which this subscription is receiving messages.
Topic *Topic

// If push delivery is used with this subscription, this field is
// used to configure it. Either `PushConfig` or `BigQueryConfig` can be set,
// but not both. If both are empty, then the subscriber will pull and ack
// messages using API methods.
PushConfig PushConfig

// If delivery to BigQuery is used with this subscription, this field is
// used to configure it. Either `PushConfig` or `BigQueryConfig` can be set,
// but not both. If both are empty, then the subscriber will pull and ack
// messages using API methods.
BigQueryConfig BigQueryConfig

// The default maximum time after a subscriber receives a message before
// the subscriber should acknowledge the message. Note: messages which are
// obtained via Subscription.Receive need not be acknowledged within this
Expand Down Expand Up @@ -317,6 +393,10 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil {
pbPushConfig = cfg.PushConfig.toProto()
}
var pbBigQueryConfig *pb.BigQueryConfig
if cfg.BigQueryConfig.Table != "" {
pbBigQueryConfig = cfg.BigQueryConfig.toProto()
}
var retentionDuration *durpb.Duration
if cfg.RetentionDuration != 0 {
retentionDuration = durpb.New(cfg.RetentionDuration)
Expand All @@ -333,6 +413,7 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
Name: name,
Topic: cfg.Topic.name,
PushConfig: pbPushConfig,
BigqueryConfig: pbBigQueryConfig,
AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())),
RetainAckedMessages: cfg.RetainAckedMessages,
MessageRetentionDuration: retentionDuration,
Expand Down Expand Up @@ -372,10 +453,12 @@ func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionC
Detached: pbSub.Detached,
TopicMessageRetentionDuration: pbSub.TopicMessageRetentionDuration.AsDuration(),
}
pc := protoToPushConfig(pbSub.PushConfig)
if pc != nil {
if pc := protoToPushConfig(pbSub.PushConfig); pc != nil {
subC.PushConfig = *pc
}
if bq := protoToBQConfig(pbSub.GetBigqueryConfig()); bq != nil {
subC.BigQueryConfig = *bq
}
return subC, nil
}

Expand All @@ -398,6 +481,20 @@ func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig {
return pc
}

func protoToBQConfig(pbBQ *pb.BigQueryConfig) *BigQueryConfig {
if pbBQ == nil {
return nil
}
bq := &BigQueryConfig{
Table: pbBQ.GetTable(),
UseTopicSchema: pbBQ.GetUseTopicSchema(),
DropUnknownFields: pbBQ.GetDropUnknownFields(),
WriteMetadata: pbBQ.GetWriteMetadata(),
State: BigQueryConfigState(pbBQ.State),
}
return bq
}

// DeadLetterPolicy specifies the conditions for dead lettering messages in
// a subscription.
type DeadLetterPolicy struct {
Expand Down Expand Up @@ -641,9 +738,14 @@ func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) {

// SubscriptionConfigToUpdate describes how to update a subscription.
type SubscriptionConfigToUpdate struct {
// If non-nil, the push config is changed.
// If non-nil, the push config is changed. Cannot be set at the same time as BigQueryConfig.
// If currently in push mode, set this value to the zero value to revert to a Pull based subscription.
PushConfig *PushConfig

// If non-nil, the bigquery config is changed. Cannot be set at the same time as PushConfig.
// If currently in bigquery mode, set this value to the zero value to revert to a Pull based subscription,
BigQueryConfig *BigQueryConfig

// If non-zero, the ack deadline is changed.
AckDeadline time.Duration

Expand Down Expand Up @@ -698,6 +800,10 @@ func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.Update
psub.PushConfig = cfg.PushConfig.toProto()
paths = append(paths, "push_config")
}
if cfg.BigQueryConfig != nil {
psub.BigqueryConfig = cfg.BigQueryConfig.toProto()
paths = append(paths, "bigquery_config")
}
if cfg.AckDeadline != 0 {
psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds()))
paths = append(paths, "ack_deadline_seconds")
Expand Down
30 changes: 30 additions & 0 deletions pubsub/subscription_test.go
Expand Up @@ -435,3 +435,33 @@ func TestOrdering_CreateSubscription(t *testing.T) {
msg.Ack()
})
}

func TestBigQuerySubscription(t *testing.T) {
ctx := context.Background()
client, srv := newFake(t)
defer client.Close()
defer srv.Close()

topic := mustCreateTopic(t, client, "t")
bqTable := "some-project:some-dataset.some-table"
bqConfig := BigQueryConfig{
Table: bqTable,
}

subConfig := SubscriptionConfig{
Topic: topic,
BigQueryConfig: bqConfig,
}
bqSub, err := client.CreateSubscription(ctx, "s", subConfig)
if err != nil {
t.Fatal(err)
}
cfg, err := bqSub.Config(ctx)
if err != nil {
t.Fatal(err)
}

if diff := testutil.Diff(cfg.BigQueryConfig, bqConfig); diff != "" {
t.Fatalf("CreateBQSubscription mismatch: \n%s", diff)
}
}