Skip to content

Commit

Permalink
Fix #289: Add alternative execution for counting queries in lock serv…
Browse files Browse the repository at this point in the history
…ice (#292)

* Add alternative execution for counting queries in lock service

As explained in the issue #289, the aggregate functions are not supported by AWS Keyspaces.
This fix tries to execute counting queries and if it fails, the same query without the COUNT
function is executed and the returned rows are counted programmatically.

Also removes code specific to `MSSQLDatabase` in `LockServiceCassandra` obviously not related
with Liquibase implementation for Cassandra.

* Remove build folder and update .gitignore
  • Loading branch information
maximevw committed May 9, 2024
1 parent 29e8ef1 commit 07e6265
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 22 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -5,3 +5,4 @@
.classpath
.project
.settings/
/build
Expand Up @@ -21,6 +21,8 @@

import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;

public class LockServiceCassandra extends StandardLockService {

Expand Down Expand Up @@ -62,19 +64,6 @@ public boolean acquireLock() throws LockException {
// another node was faster
return false;
}
if ((rowsUpdated == -1) && (database instanceof MSSQLDatabase)) {

Scope.getCurrentScope().getLog(this.getClass()).info("Database did not return a proper row count (Might have NOCOUNT enabled)");
database.rollback();
Sql[] sql = SqlGeneratorFactory.getInstance().generateSql(
new LockDatabaseChangeLogStatement(), database
);
if (sql.length != 1) {
throw new UnexpectedLiquibaseException("Did not expect " + sql.length + " statements");
}
rowsUpdated = executor.update(new RawSqlStatement("EXEC sp_executesql N'SET NOCOUNT OFF " +
sql[0].toSql().replace("'", "''") + "'"));
}
if (rowsUpdated > 1) {
throw new LockException("Did not update change log lock correctly");
}
Expand Down Expand Up @@ -142,11 +131,11 @@ public boolean isDatabaseChangeLogLockTableCreated() {
boolean hasChangeLogLockTable;
try {
Statement statement = ((CassandraDatabase) database).getStatement();
statement.executeQuery("SELECT ID from " + getChangeLogLockTableName());
statement.executeQuery("SELECT ID FROM " + getChangeLogLockTableName());
statement.close();
hasChangeLogLockTable = true;
} catch (SQLException e) {
Scope.getCurrentScope().getLog(getClass()).info("No " + getChangeLogLockTableName() + " available in cassandra.");
Scope.getCurrentScope().getLog(getClass()).info("No " + getChangeLogLockTableName() + " available in Cassandra.");
hasChangeLogLockTable = false;
} catch (DatabaseException e) {
e.printStackTrace();
Expand All @@ -163,9 +152,8 @@ public boolean isDatabaseChangeLogLockTableInitialized(final boolean tableJustCr
Executor executor = Scope.getCurrentScope().getSingleton(ExecutorService.class).getExecutor("jdbc", database);

try {
isDatabaseChangeLogLockTableInitialized = executor.queryForInt(
new RawSqlStatement("SELECT COUNT(*) FROM " + getChangeLogLockTableName())
) > 0;
isDatabaseChangeLogLockTableInitialized = executeCountQueryWithAlternative(executor,
"SELECT COUNT(*) FROM " + getChangeLogLockTableName()) > 0;
} catch (LiquibaseException e) {
if (executor.updatesDatabase()) {
throw new UnexpectedLiquibaseException(e);
Expand All @@ -185,10 +173,9 @@ private boolean isLocked(Executor executor) throws DatabaseException {

private boolean isLockedByCurrentInstance(Executor executor) throws DatabaseException {
final String lockedBy = NetUtil.getLocalHostName() + " (" + NetUtil.getLocalHostAddress() + ")";
return executor.queryForInt(
new RawSqlStatement("SELECT COUNT(*) FROM " + getChangeLogLockTableName() + " where " +
"LOCKED = TRUE AND LOCKEDBY = '" + lockedBy + "' ALLOW FILTERING")
) > 0;
return executeCountQueryWithAlternative(executor,
"SELECT COUNT(*) FROM " + getChangeLogLockTableName() + " WHERE " +
"LOCKED = TRUE AND LOCKEDBY = '" + lockedBy + "' ALLOW FILTERING") > 0;
}

private String getChangeLogLockTableName() {
Expand All @@ -198,4 +185,21 @@ private String getChangeLogLockTableName() {
return database.getDatabaseChangeLogLockTableName();
}
}

private int executeCountQueryWithAlternative(final Executor executor, final String query) throws DatabaseException {
if (!query.contains("SELECT COUNT(*)")) {
throw new UnexpectedLiquibaseException("Invalid count query: " + query);
}
try {
return executor.queryForInt(new RawSqlStatement(query));
} catch (DatabaseException e) {
// If the count query failed (for example, because counting rows is not implemented - see issue #289 with
// AWS Keyspaces where aggregate functions like COUNT are not supported:
// https://docs.aws.amazon.com/keyspaces/latest/devguide/cassandra-apis.html#cassandra-functions), try to
// execute the same query without the COUNT function then programmatically count returned rows.
final String altQuery = query.replace("SELECT COUNT(*)", "SELECT *");
final List<Map<String, ?>> rows = executor.queryForList(new RawSqlStatement(altQuery));
return rows.size();
}
}
}

0 comments on commit 07e6265

Please sign in to comment.