Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner/spansql): add support for create, alter and drop change … #6669

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 41 additions & 0 deletions spanner/spannertest/db_test.go
Expand Up @@ -728,3 +728,44 @@ func TestAddBackQuoteForHypen(t *testing.T) {
t.Errorf("Generated SQL statement incorrect.\n got %v\nwant %v", got, want)
}
}

func TestCreateAndManageChangeStream(t *testing.T) {
// Testing Create Change Stream
ddl, err := spansql.ParseDDL("filename", "CREATE CHANGE STREAM SingerAlbumStream FOR Singers(FirstName, LastName), Albums OPTIONS( retention_period = '36h' )")
if err != nil {
t.Fatalf("%s: Bad DDL", err)
}

got := ddl.List[0].SQL()
want := "CREATE CHANGE STREAM SingerAlbumStream FOR Singers(FirstName, LastName), Albums OPTIONS( retention_period='36h' )"

if !reflect.DeepEqual(got, want) {
t.Errorf("Generated SQL statement incorrect.\n got %v\nwant %v", got, want)
}

// Testing Alter Change Stream Options
ddl, err = spansql.ParseDDL("filename", "ALTER CHANGE STREAM SingerAlbumStream SET OPTIONS( retention_period = '20h' )")
if err != nil {
t.Fatalf("%s: Bad DDL", err)
}

got = ddl.List[0].SQL()
want = "ALTER CHANGE STREAM SingerAlbumStream SET OPTIONS( retention_period='20h' )"

if !reflect.DeepEqual(got, want) {
t.Errorf("Generated SQL statement incorrect.\n got %v\nwant %v", got, want)
}

// Testing Drop Change Stream Options
ddl, err = spansql.ParseDDL("filename", "DROP CHANGE STREAM SingerAlbumStream")
if err != nil {
t.Fatalf("%s: Bad DDL", err)
}

got = ddl.List[0].SQL()
want = "DROP CHANGE STREAM SingerAlbumStream"

if !reflect.DeepEqual(got, want) {
t.Errorf("Generated SQL statement incorrect.\n got %v\nwant %v", got, want)
}
}
171 changes: 169 additions & 2 deletions spanner/spansql/parser.go
Expand Up @@ -982,7 +982,7 @@ func (p *parser) parseDDLStmt() (DDLStmt, *parseError) {

/*
statement:
{ create_database | create_table | create_index | alter_table | drop_table | drop_index }
{ create_database | create_table | create_index | alter_table | drop_table | drop_index | create_change_stream | alter_change_stream | drop_change_stream }
*/

// TODO: support create_database
Expand All @@ -1005,13 +1005,14 @@ func (p *parser) parseDDLStmt() (DDLStmt, *parseError) {
// DROP TABLE table_name
// DROP INDEX index_name
// DROP VIEW view_name
// DROP CHANGE STREAM change_stream_name
tok := p.next()
if tok.err != nil {
return nil, tok.err
}
switch {
default:
return nil, p.errorf("got %q, want TABLE, VIEW or INDEX", tok.value)
return nil, p.errorf("got %q, want TABLE, VIEW, INDEX or CHANGE", tok.value)
case tok.caseEqual("TABLE"):
name, err := p.parseTableOrIndexOrColumnName()
if err != nil {
Expand All @@ -1030,10 +1031,25 @@ func (p *parser) parseDDLStmt() (DDLStmt, *parseError) {
return nil, err
}
return &DropView{Name: name, Position: pos}, nil
case tok.caseEqual("CHANGE"):
if err := p.expect("STREAM"); err != nil {
return nil, err
}
name, err := p.parseTableOrIndexOrColumnName()
if err != nil {
return nil, err
}
return &DropChangeStream{Name: name, Position: pos}, nil
}
} else if p.sniff("ALTER", "DATABASE") {
a, err := p.parseAlterDatabase()
return a, err
} else if p.sniff("CREATE", "CHANGE", "STREAM") {
cs, err := p.parseCreateChangeStream()
return cs, err
} else if p.sniff("ALTER", "CHANGE", "STREAM") {
acs, err := p.parseAlterChangeStream()
return acs, err
}

return nil, p.errorf("unknown DDL statement")
Expand Down Expand Up @@ -2001,6 +2017,157 @@ func (p *parser) parseColumnNameList() ([]ID, *parseError) {
return list, err
}

func (p *parser) parseCreateChangeStream() (*CreateChangeStream, *parseError) {
debugf("parseCreateChangeStream: %v", p)

/*
CREATE CHANGE STREAM change_stream_name
[FOR column_or_table_watching_definition[, ... ] ]
[
OPTIONS (
retention_period = timespan,
value_capture_type = type
)
]
*/
if err := p.expect("CREATE"); err != nil {
return nil, err
}
pos := p.Pos()
if err := p.expect("CHANGE"); err != nil {
return nil, err
}
if err := p.expect("STREAM"); err != nil {
return nil, err
}
csname, err := p.parseTableOrIndexOrColumnName()
if err != nil {
return nil, err
}

if err := p.expect("FOR"); err != nil {
return nil, err
}

cs := &CreateChangeStream{Name: csname, Position: pos}

if p.eat("ALL") {
cs.WatchAllTables = true
} else {
for {
tname, err := p.parseTableOrIndexOrColumnName()
if err != nil {
return nil, err
}
pos := p.Pos()
wd := WatchDef{Table: tname, Position: pos}

if p.sniff("(") {
columns, err := p.parseColumnNameList()
if err != nil {
return nil, err
}
wd.Columns = columns
} else {
wd.WatchAllCols = true
}

cs.Watch = append(cs.Watch, wd)
if p.eat(",") {
continue
}
break
}
}

if p.sniff("OPTIONS") {
cs.Options, err = p.parseChangeStreamOptions()
if err != nil {
return nil, err
}
}

return cs, nil
}

func (p *parser) parseAlterChangeStream() (*AlterChangeStream, *parseError) {
debugf("parseAlterChangeStream: %v", p)

if err := p.expect("ALTER"); err != nil {
return nil, err
}
pos := p.Pos()
if err := p.expect("CHANGE"); err != nil {
return nil, err
}
if err := p.expect("STREAM"); err != nil {
return nil, err
}
csname, err := p.parseTableOrIndexOrColumnName()
if err != nil {
return nil, err
}

acs := &AlterChangeStream{Name: csname, Position: pos}
if err := p.expect("SET"); err != nil {
return nil, err
}
// TODO: Support for altering watch
if p.sniff("OPTIONS") {
options, err := p.parseChangeStreamOptions()
if err != nil {
return nil, err
}
acs.Alteration = AlterChangeStreamOptions{Options: options}
return acs, nil
}
return nil, p.errorf("got %q, expected OPTIONS", p.next())
}

func (p *parser) parseChangeStreamOptions() (ChangeStreamOptions, *parseError) {
debugf("parseChangeStreamOptions: %v", p)
/*
options_def:
OPTIONS (
retention_period = timespan,
value_capture_type = type
) */

if err := p.expect("OPTIONS"); err != nil {
return ChangeStreamOptions{}, err
}
if err := p.expect("("); err != nil {
return ChangeStreamOptions{}, err
}

var cso ChangeStreamOptions
if p.eat("retention_period", "=") {
tok := p.next()
if tok.err != nil {
return ChangeStreamOptions{}, tok.err
}
retentionPeriod := new(string)
if tok.value == "null" {
*retentionPeriod = ""
} else {
if tok.typ != stringToken {
return ChangeStreamOptions{}, p.errorf("invalid retention_period: %v", tok.value)
}
*retentionPeriod = tok.string
}
cso.RetentionPeriod = retentionPeriod
} else {
tok := p.next()
return ChangeStreamOptions{}, p.errorf("unknown change stream option: %v", tok.value)
}

if err := p.expect(")"); err != nil {
return ChangeStreamOptions{}, err
}

return cso, nil
}

var baseTypes = map[string]TypeBase{
"BOOL": Bool,
"INT64": Int64,
Expand Down
44 changes: 44 additions & 0 deletions spanner/spansql/sql.go
Expand Up @@ -96,6 +96,38 @@ func (cv CreateView) SQL() string {
return str
}

func (cs CreateChangeStream) SQL() string {
str := "CREATE CHANGE STREAM "
str += cs.Name.SQL() + " FOR "
if cs.WatchAllTables {
str += "ALL"
} else {
for i, table := range cs.Watch {
if i > 0 {
str += ", "
}
str += table.Table.SQL()
if !table.WatchAllCols {
str += "("
for i, c := range table.Columns {
if i > 0 {
str += ", "
}
str += c.SQL()
}
str += ")"
}
}
}
if cs.Options.RetentionPeriod != nil {
str += " OPTIONS( "
str += fmt.Sprintf("retention_period='%s'", *cs.Options.RetentionPeriod)
str += " )"
}

return str
}

func (dt DropTable) SQL() string {
return "DROP TABLE " + dt.Name.SQL()
}
Expand All @@ -108,6 +140,18 @@ func (dv DropView) SQL() string {
return "DROP VIEW " + dv.Name.SQL()
}

func (dc DropChangeStream) SQL() string {
return "DROP CHANGE STREAM " + dc.Name.SQL()
}

func (acs AlterChangeStream) SQL() string {
return "ALTER CHANGE STREAM " + acs.Name.SQL() + " SET " + acs.Alteration.SQL()
}

func (ao AlterChangeStreamOptions) SQL() string {
return "OPTIONS( " + fmt.Sprintf("retention_period='%s'", *ao.Options.RetentionPeriod) + " )"
}

func (at AlterTable) SQL() string {
return "ALTER TABLE " + at.Name.SQL() + " " + at.Alteration.SQL()
}
Expand Down
77 changes: 77 additions & 0 deletions spanner/spansql/types.go
Expand Up @@ -1030,3 +1030,80 @@ func getInlineComment(stmts statements, n Node) *Comment {
}
return c
}

// CreateChangeStream represents a CREATE CHANGE STREAM statement.
// https://cloud.google.com/spanner/docs/change-streams/manage
type CreateChangeStream struct {
Name ID
Watch []WatchDef
WatchAllTables bool
Options ChangeStreamOptions

Position Position
}

func (cs *CreateChangeStream) String() string { return fmt.Sprintf("%#v", cs) }
func (*CreateChangeStream) isDDLStmt() {}
func (cs *CreateChangeStream) Pos() Position { return cs.Position }
func (cs *CreateChangeStream) clearOffset() {
for i := range cs.Watch {
// Mutate in place.
cs.Watch[i].clearOffset()
}
cs.Position.Offset = 0
}

// AlterChangeStream represents a ALTER CHANGE STREAM statement.
type AlterChangeStream struct {
Name ID
Alteration ChangeStreamAlteration

Position Position
}

func (acs *AlterChangeStream) String() string { return fmt.Sprintf("%#v", acs) }
func (*AlterChangeStream) isDDLStmt() {}
func (acs *AlterChangeStream) Pos() Position { return acs.Position }
func (acs *AlterChangeStream) clearOffset() {
acs.Position.Offset = 0
}

type ChangeStreamAlteration interface {
isChangeStreamAlteration()
SQL() string
}

func (AlterWatch) isChangeStreamAlteration() {}
func (AlterChangeStreamOptions) isChangeStreamAlteration() {}

type (
AlterWatch struct{ Watch []WatchDef }
AlterChangeStreamOptions struct{ Options ChangeStreamOptions }
)

// DropChangeStream represents a DROP CHANGE STREAM statement.
type DropChangeStream struct {
Name ID

Position Position
}

func (dc *DropChangeStream) String() string { return fmt.Sprintf("%#v", dc) }
func (*DropChangeStream) isDDLStmt() {}
func (dc *DropChangeStream) Pos() Position { return dc.Position }
func (dc *DropChangeStream) clearOffset() { dc.Position.Offset = 0 }

type WatchDef struct {
Table ID
Columns []ID
WatchAllCols bool

Position Position
}

func (wd WatchDef) Pos() Position { return wd.Position }
func (wd *WatchDef) clearOffset() { wd.Position.Offset = 0 }

type ChangeStreamOptions struct {
RetentionPeriod *string
}