Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUGFIX] Update Cassandra driver to drop materialized views before tables #984

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 40 additions & 0 deletions database/cassandra/cassandra.go
Expand Up @@ -292,6 +292,46 @@ func (c *Cassandra) Version() (version int, dirty bool, err error) {
}

func (c *Cassandra) Drop() error {
// Find an Materialized View and drop them first
queryViews := fmt.Sprintf(`
SELECT view_name, base_table_name
FROM system_schema.views
WHERE keyspace_name='%s'`,
c.config.KeyspaceName,
)
iterViews := c.session.Query(queryViews).Iter()
var viewName string
var baseTableName string
for iterViews.Scan(&viewName, &baseTableName) {
// ScyllaDB stores stores GSIs in the system_schema.views table as well
// so those have to be filtered out as scylladb is not smart enough and
// will throw an error if you try to drop using
// MATERIALIZED VIEW IF EXISTS syntax
if strings.HasSuffix(viewName, "_index") {
idxName, _ := strings.CutSuffix(viewName, "_index")
queryViewNotIndex := fmt.Sprintf(`
SELECT count(index_name)
FROM system_schema.indexes
WHERE keyspace_name='%s'
AND table_name = '%s'
AND index_name = '%s'`,
c.config.KeyspaceName,
baseTableName,
idxName,
)
var count int
if err := c.session.Query(queryViewNotIndex).Scan(&count); err != nil {
return err
}
if count > 0 {
continue
}
}
err := c.session.Query(fmt.Sprintf(`DROP MATERIALIZED VIEW IF EXISTS %s`, viewName)).Exec()
if err != nil {
return err
}
}
// select all tables in current schema
query := fmt.Sprintf(`SELECT table_name from system_schema.tables WHERE keyspace_name='%s'`, c.config.KeyspaceName)
iter := c.session.Query(query).Iter()
Expand Down