diff --git a/spanner/spannertest/db_test.go b/spanner/spannertest/db_test.go index cd4d2291d81..8643b410b74 100644 --- a/spanner/spannertest/db_test.go +++ b/spanner/spannertest/db_test.go @@ -728,3 +728,44 @@ func TestAddBackQuoteForHypen(t *testing.T) { t.Errorf("Generated SQL statement incorrect.\n got %v\nwant %v", got, want) } } + +func TestCreateAndManageChangeStream(t *testing.T) { + // Testing Create Change Stream + ddl, err := spansql.ParseDDL("filename", "CREATE CHANGE STREAM SingerAlbumStream FOR Singers(FirstName, LastName), Albums OPTIONS( retention_period = '36h' )") + if err != nil { + t.Fatalf("%s: Bad DDL", err) + } + + got := ddl.List[0].SQL() + want := "CREATE CHANGE STREAM SingerAlbumStream FOR Singers(FirstName, LastName), Albums OPTIONS( retention_period='36h' )" + + if !reflect.DeepEqual(got, want) { + t.Errorf("Generated SQL statement incorrect.\n got %v\nwant %v", got, want) + } + + // Testing Alter Change Stream Options + ddl, err = spansql.ParseDDL("filename", "ALTER CHANGE STREAM SingerAlbumStream SET OPTIONS( retention_period = '20h' )") + if err != nil { + t.Fatalf("%s: Bad DDL", err) + } + + got = ddl.List[0].SQL() + want = "ALTER CHANGE STREAM SingerAlbumStream SET OPTIONS( retention_period='20h' )" + + if !reflect.DeepEqual(got, want) { + t.Errorf("Generated SQL statement incorrect.\n got %v\nwant %v", got, want) + } + + // Testing Drop Change Stream Options + ddl, err = spansql.ParseDDL("filename", "DROP CHANGE STREAM SingerAlbumStream") + if err != nil { + t.Fatalf("%s: Bad DDL", err) + } + + got = ddl.List[0].SQL() + want = "DROP CHANGE STREAM SingerAlbumStream" + + if !reflect.DeepEqual(got, want) { + t.Errorf("Generated SQL statement incorrect.\n got %v\nwant %v", got, want) + } +} diff --git a/spanner/spansql/parser.go b/spanner/spansql/parser.go index 8b245aeaf82..064740f885d 100644 --- a/spanner/spansql/parser.go +++ b/spanner/spansql/parser.go @@ -982,7 +982,7 @@ func (p *parser) parseDDLStmt() (DDLStmt, *parseError) { /* statement: - { create_database | create_table | create_index | alter_table | drop_table | drop_index } + { create_database | create_table | create_index | alter_table | drop_table | drop_index | create_change_stream | alter_change_stream | drop_change_stream } */ // TODO: support create_database @@ -1005,13 +1005,14 @@ func (p *parser) parseDDLStmt() (DDLStmt, *parseError) { // DROP TABLE table_name // DROP INDEX index_name // DROP VIEW view_name + // DROP CHANGE STREAM change_stream_name tok := p.next() if tok.err != nil { return nil, tok.err } switch { default: - return nil, p.errorf("got %q, want TABLE, VIEW or INDEX", tok.value) + return nil, p.errorf("got %q, want TABLE, VIEW, INDEX or CHANGE", tok.value) case tok.caseEqual("TABLE"): name, err := p.parseTableOrIndexOrColumnName() if err != nil { @@ -1030,10 +1031,25 @@ func (p *parser) parseDDLStmt() (DDLStmt, *parseError) { return nil, err } return &DropView{Name: name, Position: pos}, nil + case tok.caseEqual("CHANGE"): + if err := p.expect("STREAM"); err != nil { + return nil, err + } + name, err := p.parseTableOrIndexOrColumnName() + if err != nil { + return nil, err + } + return &DropChangeStream{Name: name, Position: pos}, nil } } else if p.sniff("ALTER", "DATABASE") { a, err := p.parseAlterDatabase() return a, err + } else if p.sniff("CREATE", "CHANGE", "STREAM") { + cs, err := p.parseCreateChangeStream() + return cs, err + } else if p.sniff("ALTER", "CHANGE", "STREAM") { + acs, err := p.parseAlterChangeStream() + return acs, err } return nil, p.errorf("unknown DDL statement") @@ -2001,6 +2017,157 @@ func (p *parser) parseColumnNameList() ([]ID, *parseError) { return list, err } +func (p *parser) parseCreateChangeStream() (*CreateChangeStream, *parseError) { + debugf("parseCreateChangeStream: %v", p) + + /* + CREATE CHANGE STREAM change_stream_name + [FOR column_or_table_watching_definition[, ... ] ] + [ + OPTIONS ( + retention_period = timespan, + value_capture_type = type + ) + ] + */ + if err := p.expect("CREATE"); err != nil { + return nil, err + } + pos := p.Pos() + if err := p.expect("CHANGE"); err != nil { + return nil, err + } + if err := p.expect("STREAM"); err != nil { + return nil, err + } + csname, err := p.parseTableOrIndexOrColumnName() + if err != nil { + return nil, err + } + + if err := p.expect("FOR"); err != nil { + return nil, err + } + + cs := &CreateChangeStream{Name: csname, Position: pos} + + if p.eat("ALL") { + cs.WatchAllTables = true + } else { + for { + tname, err := p.parseTableOrIndexOrColumnName() + if err != nil { + return nil, err + } + pos := p.Pos() + wd := WatchDef{Table: tname, Position: pos} + + if p.sniff("(") { + columns, err := p.parseColumnNameList() + if err != nil { + return nil, err + } + wd.Columns = columns + } else { + wd.WatchAllCols = true + } + + cs.Watch = append(cs.Watch, wd) + if p.eat(",") { + continue + } + break + } + } + + if p.sniff("OPTIONS") { + cs.Options, err = p.parseChangeStreamOptions() + if err != nil { + return nil, err + } + } + + return cs, nil +} + +func (p *parser) parseAlterChangeStream() (*AlterChangeStream, *parseError) { + debugf("parseAlterChangeStream: %v", p) + + if err := p.expect("ALTER"); err != nil { + return nil, err + } + pos := p.Pos() + if err := p.expect("CHANGE"); err != nil { + return nil, err + } + if err := p.expect("STREAM"); err != nil { + return nil, err + } + csname, err := p.parseTableOrIndexOrColumnName() + if err != nil { + return nil, err + } + + acs := &AlterChangeStream{Name: csname, Position: pos} + if err := p.expect("SET"); err != nil { + return nil, err + } + // TODO: Support for altering watch + if p.sniff("OPTIONS") { + options, err := p.parseChangeStreamOptions() + if err != nil { + return nil, err + } + acs.Alteration = AlterChangeStreamOptions{Options: options} + return acs, nil + } + return nil, p.errorf("got %q, expected OPTIONS", p.next()) +} + +func (p *parser) parseChangeStreamOptions() (ChangeStreamOptions, *parseError) { + debugf("parseChangeStreamOptions: %v", p) + /* + options_def: + OPTIONS ( + retention_period = timespan, + value_capture_type = type + ) */ + + if err := p.expect("OPTIONS"); err != nil { + return ChangeStreamOptions{}, err + } + if err := p.expect("("); err != nil { + return ChangeStreamOptions{}, err + } + + var cso ChangeStreamOptions + if p.eat("retention_period", "=") { + tok := p.next() + if tok.err != nil { + return ChangeStreamOptions{}, tok.err + } + retentionPeriod := new(string) + if tok.value == "null" { + *retentionPeriod = "" + } else { + if tok.typ != stringToken { + return ChangeStreamOptions{}, p.errorf("invalid retention_period: %v", tok.value) + } + *retentionPeriod = tok.string + } + cso.RetentionPeriod = retentionPeriod + } else { + tok := p.next() + return ChangeStreamOptions{}, p.errorf("unknown change stream option: %v", tok.value) + } + + if err := p.expect(")"); err != nil { + return ChangeStreamOptions{}, err + } + + return cso, nil +} + var baseTypes = map[string]TypeBase{ "BOOL": Bool, "INT64": Int64, diff --git a/spanner/spansql/sql.go b/spanner/spansql/sql.go index 0afcc026131..6f4e75d42e4 100644 --- a/spanner/spansql/sql.go +++ b/spanner/spansql/sql.go @@ -96,6 +96,38 @@ func (cv CreateView) SQL() string { return str } +func (cs CreateChangeStream) SQL() string { + str := "CREATE CHANGE STREAM " + str += cs.Name.SQL() + " FOR " + if cs.WatchAllTables { + str += "ALL" + } else { + for i, table := range cs.Watch { + if i > 0 { + str += ", " + } + str += table.Table.SQL() + if !table.WatchAllCols { + str += "(" + for i, c := range table.Columns { + if i > 0 { + str += ", " + } + str += c.SQL() + } + str += ")" + } + } + } + if cs.Options.RetentionPeriod != nil { + str += " OPTIONS( " + str += fmt.Sprintf("retention_period='%s'", *cs.Options.RetentionPeriod) + str += " )" + } + + return str +} + func (dt DropTable) SQL() string { return "DROP TABLE " + dt.Name.SQL() } @@ -108,6 +140,18 @@ func (dv DropView) SQL() string { return "DROP VIEW " + dv.Name.SQL() } +func (dc DropChangeStream) SQL() string { + return "DROP CHANGE STREAM " + dc.Name.SQL() +} + +func (acs AlterChangeStream) SQL() string { + return "ALTER CHANGE STREAM " + acs.Name.SQL() + " SET " + acs.Alteration.SQL() +} + +func (ao AlterChangeStreamOptions) SQL() string { + return "OPTIONS( " + fmt.Sprintf("retention_period='%s'", *ao.Options.RetentionPeriod) + " )" +} + func (at AlterTable) SQL() string { return "ALTER TABLE " + at.Name.SQL() + " " + at.Alteration.SQL() } diff --git a/spanner/spansql/types.go b/spanner/spansql/types.go index 2469bccd750..22437229460 100644 --- a/spanner/spansql/types.go +++ b/spanner/spansql/types.go @@ -1030,3 +1030,80 @@ func getInlineComment(stmts statements, n Node) *Comment { } return c } + +// CreateChangeStream represents a CREATE CHANGE STREAM statement. +// https://cloud.google.com/spanner/docs/change-streams/manage +type CreateChangeStream struct { + Name ID + Watch []WatchDef + WatchAllTables bool + Options ChangeStreamOptions + + Position Position +} + +func (cs *CreateChangeStream) String() string { return fmt.Sprintf("%#v", cs) } +func (*CreateChangeStream) isDDLStmt() {} +func (cs *CreateChangeStream) Pos() Position { return cs.Position } +func (cs *CreateChangeStream) clearOffset() { + for i := range cs.Watch { + // Mutate in place. + cs.Watch[i].clearOffset() + } + cs.Position.Offset = 0 +} + +// AlterChangeStream represents a ALTER CHANGE STREAM statement. +type AlterChangeStream struct { + Name ID + Alteration ChangeStreamAlteration + + Position Position +} + +func (acs *AlterChangeStream) String() string { return fmt.Sprintf("%#v", acs) } +func (*AlterChangeStream) isDDLStmt() {} +func (acs *AlterChangeStream) Pos() Position { return acs.Position } +func (acs *AlterChangeStream) clearOffset() { + acs.Position.Offset = 0 +} + +type ChangeStreamAlteration interface { + isChangeStreamAlteration() + SQL() string +} + +func (AlterWatch) isChangeStreamAlteration() {} +func (AlterChangeStreamOptions) isChangeStreamAlteration() {} + +type ( + AlterWatch struct{ Watch []WatchDef } + AlterChangeStreamOptions struct{ Options ChangeStreamOptions } +) + +// DropChangeStream represents a DROP CHANGE STREAM statement. +type DropChangeStream struct { + Name ID + + Position Position +} + +func (dc *DropChangeStream) String() string { return fmt.Sprintf("%#v", dc) } +func (*DropChangeStream) isDDLStmt() {} +func (dc *DropChangeStream) Pos() Position { return dc.Position } +func (dc *DropChangeStream) clearOffset() { dc.Position.Offset = 0 } + +type WatchDef struct { + Table ID + Columns []ID + WatchAllCols bool + + Position Position +} + +func (wd WatchDef) Pos() Position { return wd.Position } +func (wd *WatchDef) clearOffset() { wd.Position.Offset = 0 } + +type ChangeStreamOptions struct { + RetentionPeriod *string +}