Skip to content

Commit

Permalink
feat(spanner/spansql): add support for create, alter and drop change … (
Browse files Browse the repository at this point in the history
#6669)

* feat(spanner/spansql): add support for create, alter and drop change streams

* lint issue fix

Co-authored-by: rahul2393 <rahulyadavsep92@gmail.com>
  • Loading branch information
harshachinta and rahul2393 committed Sep 19, 2022
1 parent 478b8dd commit cc4620a
Show file tree
Hide file tree
Showing 4 changed files with 331 additions and 2 deletions.
41 changes: 41 additions & 0 deletions spanner/spannertest/db_test.go
Expand Up @@ -728,3 +728,44 @@ func TestAddBackQuoteForHypen(t *testing.T) {
t.Errorf("Generated SQL statement incorrect.\n got %v\nwant %v", got, want)
}
}

func TestCreateAndManageChangeStream(t *testing.T) {
// Testing Create Change Stream
ddl, err := spansql.ParseDDL("filename", "CREATE CHANGE STREAM SingerAlbumStream FOR Singers(FirstName, LastName), Albums OPTIONS( retention_period = '36h' )")
if err != nil {
t.Fatalf("%s: Bad DDL", err)
}

got := ddl.List[0].SQL()
want := "CREATE CHANGE STREAM SingerAlbumStream FOR Singers(FirstName, LastName), Albums OPTIONS( retention_period='36h' )"

if !reflect.DeepEqual(got, want) {
t.Errorf("Generated SQL statement incorrect.\n got %v\nwant %v", got, want)
}

// Testing Alter Change Stream Options
ddl, err = spansql.ParseDDL("filename", "ALTER CHANGE STREAM SingerAlbumStream SET OPTIONS( retention_period = '20h' )")
if err != nil {
t.Fatalf("%s: Bad DDL", err)
}

got = ddl.List[0].SQL()
want = "ALTER CHANGE STREAM SingerAlbumStream SET OPTIONS( retention_period='20h' )"

if !reflect.DeepEqual(got, want) {
t.Errorf("Generated SQL statement incorrect.\n got %v\nwant %v", got, want)
}

// Testing Drop Change Stream Options
ddl, err = spansql.ParseDDL("filename", "DROP CHANGE STREAM SingerAlbumStream")
if err != nil {
t.Fatalf("%s: Bad DDL", err)
}

got = ddl.List[0].SQL()
want = "DROP CHANGE STREAM SingerAlbumStream"

if !reflect.DeepEqual(got, want) {
t.Errorf("Generated SQL statement incorrect.\n got %v\nwant %v", got, want)
}
}
171 changes: 169 additions & 2 deletions spanner/spansql/parser.go
Expand Up @@ -982,7 +982,7 @@ func (p *parser) parseDDLStmt() (DDLStmt, *parseError) {

/*
statement:
{ create_database | create_table | create_index | alter_table | drop_table | drop_index }
{ create_database | create_table | create_index | alter_table | drop_table | drop_index | create_change_stream | alter_change_stream | drop_change_stream }
*/

// TODO: support create_database
Expand All @@ -1005,13 +1005,14 @@ func (p *parser) parseDDLStmt() (DDLStmt, *parseError) {
// DROP TABLE table_name
// DROP INDEX index_name
// DROP VIEW view_name
// DROP CHANGE STREAM change_stream_name
tok := p.next()
if tok.err != nil {
return nil, tok.err
}
switch {
default:
return nil, p.errorf("got %q, want TABLE, VIEW or INDEX", tok.value)
return nil, p.errorf("got %q, want TABLE, VIEW, INDEX or CHANGE", tok.value)
case tok.caseEqual("TABLE"):
name, err := p.parseTableOrIndexOrColumnName()
if err != nil {
Expand All @@ -1030,10 +1031,25 @@ func (p *parser) parseDDLStmt() (DDLStmt, *parseError) {
return nil, err
}
return &DropView{Name: name, Position: pos}, nil
case tok.caseEqual("CHANGE"):
if err := p.expect("STREAM"); err != nil {
return nil, err
}
name, err := p.parseTableOrIndexOrColumnName()
if err != nil {
return nil, err
}
return &DropChangeStream{Name: name, Position: pos}, nil
}
} else if p.sniff("ALTER", "DATABASE") {
a, err := p.parseAlterDatabase()
return a, err
} else if p.sniff("CREATE", "CHANGE", "STREAM") {
cs, err := p.parseCreateChangeStream()
return cs, err
} else if p.sniff("ALTER", "CHANGE", "STREAM") {
acs, err := p.parseAlterChangeStream()
return acs, err
}

return nil, p.errorf("unknown DDL statement")
Expand Down Expand Up @@ -2001,6 +2017,157 @@ func (p *parser) parseColumnNameList() ([]ID, *parseError) {
return list, err
}

func (p *parser) parseCreateChangeStream() (*CreateChangeStream, *parseError) {
debugf("parseCreateChangeStream: %v", p)

/*
CREATE CHANGE STREAM change_stream_name
[FOR column_or_table_watching_definition[, ... ] ]
[
OPTIONS (
retention_period = timespan,
value_capture_type = type
)
]
*/
if err := p.expect("CREATE"); err != nil {
return nil, err
}
pos := p.Pos()
if err := p.expect("CHANGE"); err != nil {
return nil, err
}
if err := p.expect("STREAM"); err != nil {
return nil, err
}
csname, err := p.parseTableOrIndexOrColumnName()
if err != nil {
return nil, err
}

if err := p.expect("FOR"); err != nil {
return nil, err
}

cs := &CreateChangeStream{Name: csname, Position: pos}

if p.eat("ALL") {
cs.WatchAllTables = true
} else {
for {
tname, err := p.parseTableOrIndexOrColumnName()
if err != nil {
return nil, err
}
pos := p.Pos()
wd := WatchDef{Table: tname, Position: pos}

if p.sniff("(") {
columns, err := p.parseColumnNameList()
if err != nil {
return nil, err
}
wd.Columns = columns
} else {
wd.WatchAllCols = true
}

cs.Watch = append(cs.Watch, wd)
if p.eat(",") {
continue
}
break
}
}

if p.sniff("OPTIONS") {
cs.Options, err = p.parseChangeStreamOptions()
if err != nil {
return nil, err
}
}

return cs, nil
}

func (p *parser) parseAlterChangeStream() (*AlterChangeStream, *parseError) {
debugf("parseAlterChangeStream: %v", p)

if err := p.expect("ALTER"); err != nil {
return nil, err
}
pos := p.Pos()
if err := p.expect("CHANGE"); err != nil {
return nil, err
}
if err := p.expect("STREAM"); err != nil {
return nil, err
}
csname, err := p.parseTableOrIndexOrColumnName()
if err != nil {
return nil, err
}

acs := &AlterChangeStream{Name: csname, Position: pos}
if err := p.expect("SET"); err != nil {
return nil, err
}
// TODO: Support for altering watch
if p.sniff("OPTIONS") {
options, err := p.parseChangeStreamOptions()
if err != nil {
return nil, err
}
acs.Alteration = AlterChangeStreamOptions{Options: options}
return acs, nil
}
return nil, p.errorf("got %q, expected OPTIONS", p.next())
}

func (p *parser) parseChangeStreamOptions() (ChangeStreamOptions, *parseError) {
debugf("parseChangeStreamOptions: %v", p)
/*
options_def:
OPTIONS (
retention_period = timespan,
value_capture_type = type
) */

if err := p.expect("OPTIONS"); err != nil {
return ChangeStreamOptions{}, err
}
if err := p.expect("("); err != nil {
return ChangeStreamOptions{}, err
}

var cso ChangeStreamOptions
if p.eat("retention_period", "=") {
tok := p.next()
if tok.err != nil {
return ChangeStreamOptions{}, tok.err
}
retentionPeriod := new(string)
if tok.value == "null" {
*retentionPeriod = ""
} else {
if tok.typ != stringToken {
return ChangeStreamOptions{}, p.errorf("invalid retention_period: %v", tok.value)
}
*retentionPeriod = tok.string
}
cso.RetentionPeriod = retentionPeriod
} else {
tok := p.next()
return ChangeStreamOptions{}, p.errorf("unknown change stream option: %v", tok.value)
}

if err := p.expect(")"); err != nil {
return ChangeStreamOptions{}, err
}

return cso, nil
}

var baseTypes = map[string]TypeBase{
"BOOL": Bool,
"INT64": Int64,
Expand Down
44 changes: 44 additions & 0 deletions spanner/spansql/sql.go
Expand Up @@ -96,6 +96,38 @@ func (cv CreateView) SQL() string {
return str
}

func (cs CreateChangeStream) SQL() string {
str := "CREATE CHANGE STREAM "
str += cs.Name.SQL() + " FOR "
if cs.WatchAllTables {
str += "ALL"
} else {
for i, table := range cs.Watch {
if i > 0 {
str += ", "
}
str += table.Table.SQL()
if !table.WatchAllCols {
str += "("
for i, c := range table.Columns {
if i > 0 {
str += ", "
}
str += c.SQL()
}
str += ")"
}
}
}
if cs.Options.RetentionPeriod != nil {
str += " OPTIONS( "
str += fmt.Sprintf("retention_period='%s'", *cs.Options.RetentionPeriod)
str += " )"
}

return str
}

func (dt DropTable) SQL() string {
return "DROP TABLE " + dt.Name.SQL()
}
Expand All @@ -108,6 +140,18 @@ func (dv DropView) SQL() string {
return "DROP VIEW " + dv.Name.SQL()
}

func (dc DropChangeStream) SQL() string {
return "DROP CHANGE STREAM " + dc.Name.SQL()
}

func (acs AlterChangeStream) SQL() string {
return "ALTER CHANGE STREAM " + acs.Name.SQL() + " SET " + acs.Alteration.SQL()
}

func (ao AlterChangeStreamOptions) SQL() string {
return "OPTIONS( " + fmt.Sprintf("retention_period='%s'", *ao.Options.RetentionPeriod) + " )"
}

func (at AlterTable) SQL() string {
return "ALTER TABLE " + at.Name.SQL() + " " + at.Alteration.SQL()
}
Expand Down
77 changes: 77 additions & 0 deletions spanner/spansql/types.go
Expand Up @@ -1030,3 +1030,80 @@ func getInlineComment(stmts statements, n Node) *Comment {
}
return c
}

// CreateChangeStream represents a CREATE CHANGE STREAM statement.
// https://cloud.google.com/spanner/docs/change-streams/manage
type CreateChangeStream struct {
Name ID
Watch []WatchDef
WatchAllTables bool
Options ChangeStreamOptions

Position Position
}

func (cs *CreateChangeStream) String() string { return fmt.Sprintf("%#v", cs) }
func (*CreateChangeStream) isDDLStmt() {}
func (cs *CreateChangeStream) Pos() Position { return cs.Position }
func (cs *CreateChangeStream) clearOffset() {
for i := range cs.Watch {
// Mutate in place.
cs.Watch[i].clearOffset()
}
cs.Position.Offset = 0
}

// AlterChangeStream represents a ALTER CHANGE STREAM statement.
type AlterChangeStream struct {
Name ID
Alteration ChangeStreamAlteration

Position Position
}

func (acs *AlterChangeStream) String() string { return fmt.Sprintf("%#v", acs) }
func (*AlterChangeStream) isDDLStmt() {}
func (acs *AlterChangeStream) Pos() Position { return acs.Position }
func (acs *AlterChangeStream) clearOffset() {
acs.Position.Offset = 0
}

type ChangeStreamAlteration interface {
isChangeStreamAlteration()
SQL() string
}

func (AlterWatch) isChangeStreamAlteration() {}
func (AlterChangeStreamOptions) isChangeStreamAlteration() {}

type (
AlterWatch struct{ Watch []WatchDef }
AlterChangeStreamOptions struct{ Options ChangeStreamOptions }
)

// DropChangeStream represents a DROP CHANGE STREAM statement.
type DropChangeStream struct {
Name ID

Position Position
}

func (dc *DropChangeStream) String() string { return fmt.Sprintf("%#v", dc) }
func (*DropChangeStream) isDDLStmt() {}
func (dc *DropChangeStream) Pos() Position { return dc.Position }
func (dc *DropChangeStream) clearOffset() { dc.Position.Offset = 0 }

type WatchDef struct {
Table ID
Columns []ID
WatchAllCols bool

Position Position
}

func (wd WatchDef) Pos() Position { return wd.Position }
func (wd *WatchDef) clearOffset() { wd.Position.Offset = 0 }

type ChangeStreamOptions struct {
RetentionPeriod *string
}

0 comments on commit cc4620a

Please sign in to comment.