Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): Adding automated backups #9702

Merged
merged 39 commits into from May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
2aa800f
Adding automated backups
ma-g-22 Mar 29, 2024
279b1df
AB updates: addressing feedback
ma-g-22 Apr 4, 2024
66030cc
AB: addressing feedback
ma-g-22 Apr 19, 2024
b5182f3
AB: Addressing feedback
ma-g-22 Apr 25, 2024
794f136
AB: addressing feedback
ma-g-22 Apr 26, 2024
ea49b80
AB: addressing feedbackup
ma-g-22 Apr 30, 2024
0355c32
AB: fixing workflow errors
ma-g-22 Apr 30, 2024
6ef8ae9
AB: fixing conflict
ma-g-22 May 1, 2024
e56f1f5
Merge branch 'googleapis:main' into ab
ma-g-22 May 1, 2024
9819f53
Merge branch 'googleapis:main' into ab
ma-g-22 May 2, 2024
be42e37
AB: addressing feedback
ma-g-22 May 2, 2024
47a7335
AB: refactoring
ma-g-22 May 2, 2024
ba9cced
Merge branch 'googleapis:main' into ab
ma-g-22 May 3, 2024
3c240b0
AB: addressing feedback
ma-g-22 May 3, 2024
3c1961b
Merge branch 'googleapis:main' into ab
ma-g-22 May 6, 2024
c7276e7
Merge branch 'googleapis:main' into ab
ma-g-22 May 6, 2024
aaa344b
AB: addressing feedback
ma-g-22 May 6, 2024
72142aa
AB: addressing feedback
ma-g-22 May 7, 2024
98e2d49
Merge branch 'googleapis:main' into ab
ma-g-22 May 7, 2024
2a99018
AB: fixing vet.sh workflow errors
ma-g-22 May 7, 2024
982f7f5
AB: fixing apidiff workflow error
ma-g-22 May 7, 2024
ace8c8b
Merge branch 'googleapis:main' into ab
ma-g-22 May 7, 2024
3da5001
Merge branch 'main' into ab
bhshkh May 7, 2024
fbba2dd
AB: Fixing vet workflow errors
ma-g-22 May 7, 2024
5a9478a
Merge branch 'main' into ab
igorbernstein2 May 7, 2024
77c875a
Merge branch 'main' into ab
bhshkh May 7, 2024
9bce248
Merge branch 'main' into ab
bhshkh May 7, 2024
f9c2a70
Merge branch 'googleapis:main' into ab
ma-g-22 May 8, 2024
b532f5b
AB: fixing dependencies
ma-g-22 May 8, 2024
f189b96
Merge branch 'main' into ab
bhshkh May 8, 2024
a28b21e
AB: fixing vet.sh error
ma-g-22 May 8, 2024
de046f9
Merge branch 'googleapis:main' into ab
ma-g-22 May 8, 2024
4c1502d
AB: updating go.mod
ma-g-22 May 8, 2024
f4e47ba
AB: fixing go.sum
ma-g-22 May 8, 2024
6567bb6
AB: adding new lines
ma-g-22 May 8, 2024
c525a17
AB: fixing go.mod & go.sum
ma-g-22 May 8, 2024
5001d6d
AB: fixing deps
ma-g-22 May 8, 2024
99b4fa2
AB: fixing deps
ma-g-22 May 8, 2024
0697167
AB: udpating go.sum
ma-g-22 May 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
210 changes: 161 additions & 49 deletions bigtable/admin.go
Expand Up @@ -238,12 +238,67 @@ const (
Unprotected
)

// TableAutomatedBackupConfig generalizes automated backup configurations.
// Currently, the only supported type of automated backup configuration
// is TableAutomatedBackupPolicy.
type TableAutomatedBackupConfig interface {
isTableAutomatedBackupConfig()
}

// TableAutomatedBackupPolicy defines an automated backup policy for a table.
// Use nil TableAutomatedBackupPolicy to disable Automated Backups on a table.
// Use nil for a specific field to ignore that field when updating the policy on a table.
type TableAutomatedBackupPolicy struct {
// How long the automated backups should be retained. The only
// supported value at this time is 3 days.
RetentionPeriod optional.Duration
// How frequently automated backups should occur. The only
// supported value at this time is 24 hours.
Frequency optional.Duration
}

func (*TableAutomatedBackupPolicy) isTableAutomatedBackupConfig() {}

func toAutomatedBackupConfigProto(automatedBackupConfig TableAutomatedBackupConfig) (*btapb.Table_AutomatedBackupPolicy_, error) {
if automatedBackupConfig == nil {
return nil, nil
}
switch backupConfig := automatedBackupConfig.(type) {
case *TableAutomatedBackupPolicy:
return backupConfig.toProto()
default:
return nil, fmt.Errorf("error: Unknown type of automated backup configuration")
}
}

func (abp *TableAutomatedBackupPolicy) toProto() (*btapb.Table_AutomatedBackupPolicy_, error) {
pbAutomatedBackupPolicy := &btapb.Table_AutomatedBackupPolicy{
RetentionPeriod: durationpb.New(0),
Frequency: durationpb.New(0),
}
if abp.RetentionPeriod == nil && abp.Frequency == nil {
return nil, errors.New("at least one of RetentionPeriod and Frequency must be set")
}
if abp.RetentionPeriod != nil {
pbAutomatedBackupPolicy.RetentionPeriod = durationpb.New(optional.ToDuration(abp.RetentionPeriod))
}
if abp.Frequency != nil {
pbAutomatedBackupPolicy.Frequency = durationpb.New(optional.ToDuration(abp.Frequency))
}
return &btapb.Table_AutomatedBackupPolicy_{
AutomatedBackupPolicy: pbAutomatedBackupPolicy,
}, nil
}

// Family represents a column family with its optional GC policy and value type.
type Family struct {
GCPolicy GCPolicy
ValueType Type
}

// UpdateTableConf is unused
type UpdateTableConf struct{}

// TableConf contains all the information necessary to create a table with column families.
type TableConf struct {
TableID string
Expand All @@ -259,6 +314,8 @@ type TableConf struct {
// set to protected to make the table protected against data loss
DeletionProtection DeletionProtection
ChangeStreamRetention ChangeStreamRetention
// Configure an automated backup policy for the table
AutomatedBackupConfig TableAutomatedBackupConfig
}

// CreateTable creates a new table in the instance.
Expand Down Expand Up @@ -298,6 +355,15 @@ func (ac *AdminClient) CreateTableFromConf(ctx context.Context, conf *TableConf)
tbl.ChangeStreamConfig = &btapb.ChangeStreamConfig{}
tbl.ChangeStreamConfig.RetentionPeriod = durationpb.New(conf.ChangeStreamRetention.(time.Duration))
}

if conf.AutomatedBackupConfig != nil {
proto, err := toAutomatedBackupConfigProto(conf.AutomatedBackupConfig)
if err != nil {
return err
}
tbl.AutomatedBackupConfig = proto
}

if conf.Families != nil && conf.ColumnFamilies != nil {
return errors.New("only one of Families or ColumnFamilies may be set, not both")
}
Expand Down Expand Up @@ -376,79 +442,113 @@ func (ac *AdminClient) CreateColumnFamilyWithConfig(ctx context.Context, table,
return err
}

// UpdateTableConf contains all of the information necessary to update a table with column families.
type UpdateTableConf struct {
tableID string
// deletionProtection can be unset, true or false
// set to true to make the table protected against data loss
deletionProtection DeletionProtection
changeStreamRetention ChangeStreamRetention
}

// UpdateTableDisableChangeStream updates a table to disable change stream for table ID.
func (ac *AdminClient) UpdateTableDisableChangeStream(ctx context.Context, tableID string) error {
return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, None, time.Duration(0)})
}

// UpdateTableWithChangeStream updates a table to with the given table ID and change stream config.
func (ac *AdminClient) UpdateTableWithChangeStream(ctx context.Context, tableID string, changeStreamRetention ChangeStreamRetention) error {
return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, None, changeStreamRetention})
}

// UpdateTableWithDeletionProtection updates a table with the given table ID and deletion protection parameter.
func (ac *AdminClient) UpdateTableWithDeletionProtection(ctx context.Context, tableID string, deletionProtection DeletionProtection) error {
return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, deletionProtection, nil})
}
const (
deletionProtectionFieldMask = "deletion_protection"
changeStreamConfigFieldMask = "change_stream_config"
automatedBackupPolicyFieldMask = "automated_backup_policy"
retentionPeriodFieldMaskPath = ".retention_period"
frequencyFieldMaskPath = ".frequency"
)

// updateTableWithConf updates a table in the instance from the given configuration.
// only deletion protection can be updated at this period.
// table ID is required.
func (ac *AdminClient) updateTableWithConf(ctx context.Context, conf *UpdateTableConf) error {
if conf.tableID == "" {
return errors.New("TableID is required")
func (ac *AdminClient) newUpdateTableRequestProto(tableID string) (*btapb.UpdateTableRequest, error) {
if tableID == "" {
return nil, errors.New("TableID is required")
}

ctx = mergeOutgoingMetadata(ctx, ac.md)

updateMask := &field_mask.FieldMask{
Paths: []string{},
}
prefix := ac.instancePrefix()
req := &btapb.UpdateTableRequest{
Table: &btapb.Table{
Name: prefix + "/tables/" + conf.tableID,
Name: ac.instancePrefix() + "/tables/" + tableID,
},
UpdateMask: updateMask,
}
return req, nil
}

if conf.deletionProtection != None {
updateMask.Paths = append(updateMask.Paths, "deletion_protection")
req.Table.DeletionProtection = conf.deletionProtection != Unprotected
}

if conf.changeStreamRetention != nil {
if conf.changeStreamRetention.(time.Duration) == time.Duration(0) {
updateMask.Paths = append(updateMask.Paths, "change_stream_config")
} else {
updateMask.Paths = append(updateMask.Paths, "change_stream_config.retention_period")
req.Table.ChangeStreamConfig = &btapb.ChangeStreamConfig{}
req.Table.ChangeStreamConfig.RetentionPeriod = durationpb.New(conf.changeStreamRetention.(time.Duration))
}
}
func (ac *AdminClient) updateTableAndWait(ctx context.Context, updateTableRequest *btapb.UpdateTableRequest) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)

lro, err := ac.tClient.UpdateTable(ctx, req)
lro, err := ac.tClient.UpdateTable(ctx, updateTableRequest)
if err != nil {
return fmt.Errorf("error from update: %w", err)
}

var tbl btapb.Table
op := longrunning.InternalNewOperation(ac.lroClient, lro)
err = op.Wait(ctx, &tbl)
if err != nil {
return fmt.Errorf("error from operation: %v", err)
}

return nil
}

// UpdateTableDisableChangeStream updates a table to disable change stream for table ID.
func (ac *AdminClient) UpdateTableDisableChangeStream(ctx context.Context, tableID string) error {
req, err := ac.newUpdateTableRequestProto(tableID)
if err != nil {
return err
}
req.UpdateMask.Paths = []string{changeStreamConfigFieldMask}
return ac.updateTableAndWait(ctx, req)
}

// UpdateTableWithChangeStream updates a table to with the given table ID and change stream config.
func (ac *AdminClient) UpdateTableWithChangeStream(ctx context.Context, tableID string, changeStreamRetention ChangeStreamRetention) error {
req, err := ac.newUpdateTableRequestProto(tableID)
if err != nil {
return err
}
req.UpdateMask.Paths = []string{changeStreamConfigFieldMask + retentionPeriodFieldMaskPath}
req.Table.ChangeStreamConfig = &btapb.ChangeStreamConfig{}
req.Table.ChangeStreamConfig.RetentionPeriod = durationpb.New(changeStreamRetention.(time.Duration))
return ac.updateTableAndWait(ctx, req)
}

// UpdateTableWithDeletionProtection updates a table with the given table ID and deletion protection parameter.
func (ac *AdminClient) UpdateTableWithDeletionProtection(ctx context.Context, tableID string, deletionProtection DeletionProtection) error {
req, err := ac.newUpdateTableRequestProto(tableID)
if err != nil {
return err
}
req.UpdateMask.Paths = []string{deletionProtectionFieldMask}
req.Table.DeletionProtection = deletionProtection != Unprotected
return ac.updateTableAndWait(ctx, req)
}

// UpdateTableDisableAutomatedBackupPolicy updates a table to disable automated backups for table ID.
func (ac *AdminClient) UpdateTableDisableAutomatedBackupPolicy(ctx context.Context, tableID string) error {
req, err := ac.newUpdateTableRequestProto(tableID)
if err != nil {
return err
}
req.UpdateMask.Paths = []string{automatedBackupPolicyFieldMask}
return ac.updateTableAndWait(ctx, req)
}

// UpdateTableWithAutomatedBackupPolicy updates a table to with the given table ID and automated backup policy config.
func (ac *AdminClient) UpdateTableWithAutomatedBackupPolicy(ctx context.Context, tableID string, automatedBackupPolicy TableAutomatedBackupPolicy) error {
req, err := ac.newUpdateTableRequestProto(tableID)
if err != nil {
return err
}
abc, err := toAutomatedBackupConfigProto(&automatedBackupPolicy)
if err != nil {
return err
}
if abc.AutomatedBackupPolicy.RetentionPeriod.Seconds != 0 {
// Update Retention Period
req.UpdateMask.Paths = append(req.UpdateMask.Paths, automatedBackupPolicyFieldMask+retentionPeriodFieldMaskPath)
}
if abc.AutomatedBackupPolicy.Frequency.Seconds != 0 {
// Update Frequency
req.UpdateMask.Paths = append(req.UpdateMask.Paths, automatedBackupPolicyFieldMask+frequencyFieldMaskPath)
}
req.Table.AutomatedBackupConfig = abc
return ac.updateTableAndWait(ctx, req)
}

// DeleteTable deletes a table and all of its data.
func (ac *AdminClient) DeleteTable(ctx context.Context, table string) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
Expand Down Expand Up @@ -485,6 +585,7 @@ type TableInfo struct {
// for example when using NAME_ONLY, the response does not contain DeletionProtection and the value should be None
DeletionProtection DeletionProtection
ChangeStreamRetention ChangeStreamRetention
AutomatedBackupConfig TableAutomatedBackupConfig
}

// FamilyInfo represents information about a column family.
Expand Down Expand Up @@ -543,6 +644,17 @@ func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo,
if res.ChangeStreamConfig != nil && res.ChangeStreamConfig.RetentionPeriod != nil {
ti.ChangeStreamRetention = res.ChangeStreamConfig.RetentionPeriod.AsDuration()
}
if res.AutomatedBackupConfig != nil {
switch res.AutomatedBackupConfig.(type) {
case *btapb.Table_AutomatedBackupPolicy_:
ti.AutomatedBackupConfig = &TableAutomatedBackupPolicy{
RetentionPeriod: res.GetAutomatedBackupPolicy().GetRetentionPeriod().AsDuration(),
Frequency: res.GetAutomatedBackupPolicy().GetFrequency().AsDuration(),
}
default:
return nil, fmt.Errorf("error: Unknown type of automated backup configuration")
}
}

return ti, nil
}
Expand Down