Skip to content

Commit

Permalink
#465 implement expired entries cleaner for MySQL
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladimir Buhtoyarov committed Apr 27, 2024
1 parent bde5267 commit 8badfa6
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 28 deletions.
6 changes: 5 additions & 1 deletion asciidoc/src/main/docs/asciidoc/index.adoc
Expand Up @@ -9,6 +9,10 @@ Release date: `21 Apr 2024`
* http://bucket4j.com/{revnumber}/toc.html[{revnumber} Reference]
* http://bucket4j.com/commercial/java8.html[Instructions to get the builds for Java 8]

== How to support
https://app.lava.top/ru/2716741203?donate=open[Make donate] to increase motivation of project maintainer to develop new features,
write documentation and answer user questions.

== Documentation archive
If you are looking for documentation related to a previous version then http://bucket4j.com/previos-releases.html[ see there]

Expand All @@ -17,7 +21,7 @@ Vladimir Bukhtoyarov::
Lead Java developer at vk.com +
Saint-Petersburg, Russia +
jsecoder@mail.ru +
Role: maintainer, future vision, architect +
Role: maintainer, future vision, code owner +
image:images/photo.jpg[80,80] +

Maxim Bartkov::
Expand Down
Expand Up @@ -19,8 +19,6 @@
*/
package io.github.bucket4j.distributed.proxy;

import java.util.Collection;

/**
* Interface used by particular {@link ProxyManager} implementations to indicate
* that they support the manual(triggered by user) removing of expired bucket entries.
Expand All @@ -41,21 +39,21 @@ public interface ExpiredEntriesCleaner {
* <p>Example of usage:
* <pre>
* {@code
* private static final int CLEANUP_INTERVAL_MILLIS = 10_000;
* private static final int MAX_KEYS_TO_REMOVE_IN_ONE_TRANSACTION = 100;
* private static final int THRESHOLD_TO_CONTINUE_REMOVING = 20;
* private static final int MAX_TO_REMOVE_IN_ONE_TRANSACTION = 1_000;
* private static final int THRESHOLD_TO_CONTINUE_REMOVING = 50;
*
* @Scheduled(fixedDelay = CLEANUP_INTERVAL_MILLIS)
* // once per day at 4:30 morning
* @Scheduled(cron = "0 30 4 * * *")
* public void scheduleFixedDelayTask() {
* int removedKeysCount;
* do {
* removedKeysCount = proxyManager.removedKeys(MAX_KEYS_TO_REMOVE_IN_ONE_TRANSACTION);
* removedCount = proxyManager.removeExpired(MAX_TO_REMOVE_IN_ONE_TRANSACTION);
* if (removedKeysCount > 0) {
* logger.info("Removed {} expired buckets", removedKeysCount);
* logger.info("Removed {} expired buckets", removedCount);
* } else {
* logger.info("There are no expired buckets to remove");
* }
* } while (removedKeysCount > THRESHOLD_TO_CONTINUE_REMOVING)
* } while (removedCount > THRESHOLD_TO_CONTINUE_REMOVING)
* }
* }
* </pre>
Expand Down
8 changes: 4 additions & 4 deletions bucket4j-mysql/pom.xml
Expand Up @@ -41,19 +41,19 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>1.16.3</version>
<version>1.19.7</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
<version>8.0.33</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP-java6</artifactId>
<version>2.3.8</version>
<artifactId>HikariCP</artifactId>
<version>5.1.0</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Expand Up @@ -64,6 +64,11 @@ public <K2> MySQLSelectForUpdateBasedProxyManagerBuilder<K2> primaryKeyMapper(Pr
super.primaryKeyMapper = (PrimaryKeyMapper) Objects.requireNonNull(primaryKeyMapper);
return (MySQLSelectForUpdateBasedProxyManagerBuilder<K2>) this;
}

@Override
public boolean isExpireAfterWriteSupported() {
return true;
}
}

}
Expand Up @@ -21,8 +21,10 @@

import com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException;
import io.github.bucket4j.BucketExceptions;
import io.github.bucket4j.distributed.jdbc.CustomColumnProvider;
import io.github.bucket4j.distributed.jdbc.PrimaryKeyMapper;
import io.github.bucket4j.distributed.jdbc.SQLProxyConfiguration;
import io.github.bucket4j.distributed.proxy.ExpiredEntriesCleaner;
import io.github.bucket4j.distributed.proxy.generic.select_for_update.AbstractSelectForUpdateBasedProxyManager;
import io.github.bucket4j.distributed.proxy.generic.select_for_update.LockAndGetResult;
import io.github.bucket4j.distributed.proxy.generic.select_for_update.SelectForUpdateBasedTransaction;
Expand All @@ -35,6 +37,8 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

Expand All @@ -45,24 +49,43 @@
*
* @param <K> type of primary key
*/
public class MySQLSelectForUpdateBasedProxyManager<K> extends AbstractSelectForUpdateBasedProxyManager<K> {
public class MySQLSelectForUpdateBasedProxyManager<K> extends AbstractSelectForUpdateBasedProxyManager<K> implements ExpiredEntriesCleaner {

private final DataSource dataSource;
private final PrimaryKeyMapper<K> primaryKeyMapper;
private final String removeSqlQuery;
private final String updateSqlQuery;
private final String insertSqlQuery;
private final String selectSqlQuery;
private final String clearExpiredSqlQuery;
private final List<CustomColumnProvider<K>> customColumns = new ArrayList<>();

MySQLSelectForUpdateBasedProxyManager(MySQLSelectForUpdateBasedProxyManagerBuilder<K> builder) {
super(builder.getClientSideConfig());
this.dataSource = builder.getDataSource();
this.primaryKeyMapper = builder.getPrimaryKeyMapper();
this.removeSqlQuery = MessageFormat.format("DELETE FROM {0} WHERE {1} = ?", builder.getTableName(), builder.getIdColumnName());
updateSqlQuery = MessageFormat.format("UPDATE {0} SET {1}=? WHERE {2}=?", builder.getTableName(), builder.getStateColumnName(), builder.getIdColumnName());
insertSqlQuery = MessageFormat.format("INSERT IGNORE INTO {0}({1}, {2}) VALUES(?, null)",
builder.getTableName(), builder.getIdColumnName(), builder.getStateColumnName());
selectSqlQuery = MessageFormat.format("SELECT {0} as state FROM {1} WHERE {2} = ? FOR UPDATE", builder.getStateColumnName(), builder.getTableName(), builder.getIdColumnName());
this.customColumns.addAll(builder.getCustomColumns());
getClientSideConfig().getExpirationAfterWriteStrategy().ifPresent(expiration -> {
this.customColumns.add(CustomColumnProvider.createExpiresInColumnProvider(builder.getExpiresAtColumnName(), expiration));
});
if (customColumns.isEmpty()) {
this.updateSqlQuery = MessageFormat.format("UPDATE {0} SET {1}=? WHERE {2}=?", builder.getTableName(), builder.getStateColumnName(), builder.getIdColumnName());
} else {
String customPartInUpdate = String.join(",", customColumns.stream().map(column -> column.getCustomFieldName() + "=?").toList());
this.updateSqlQuery = MessageFormat.format("UPDATE {0} SET {1}=?,{2} WHERE {3}=?", builder.getTableName(), builder.getStateColumnName(), customPartInUpdate, builder.getIdColumnName());
}
// https://stackoverflow.com/questions/12810346/alternative-to-using-limit-keyword-in-a-subquery-in-mysql
this.clearExpiredSqlQuery = MessageFormat.format(
"""
DELETE FROM {0} WHERE
{2} < ? AND
{1} IN(SELECT * FROM (SELECT {1} FROM {0} WHERE {2} < ? LIMIT ? FOR UPDATE SKIP LOCKED) as subquery)
""", builder.getTableName(), builder.getIdColumnName(), builder.getExpiresAtColumnName()
);
}

/**
Expand All @@ -71,13 +94,17 @@ public class MySQLSelectForUpdateBasedProxyManager<K> extends AbstractSelectForU
@Deprecated
public MySQLSelectForUpdateBasedProxyManager(SQLProxyConfiguration<K> configuration) {
super(configuration.getClientSideConfig());
this.clearExpiredSqlQuery = null;
this.dataSource = Objects.requireNonNull(configuration.getDataSource());
this.primaryKeyMapper = configuration.getPrimaryKeyMapper();
this.removeSqlQuery = MessageFormat.format("DELETE FROM {0} WHERE {1} = ?", configuration.getTableName(), configuration.getIdName());
updateSqlQuery = MessageFormat.format("UPDATE {0} SET {1}=? WHERE {2}=?", configuration.getTableName(), configuration.getStateName(), configuration.getIdName());
insertSqlQuery = MessageFormat.format("INSERT IGNORE INTO {0}({1}, {2}) VALUES(?, null)",
configuration.getTableName(), configuration.getIdName(), configuration.getStateName());
selectSqlQuery = MessageFormat.format("SELECT {0} as state FROM {1} WHERE {2} = ? FOR UPDATE", configuration.getStateName(), configuration.getTableName(), configuration.getIdName());
if (getClientSideConfig().getExpirationAfterWriteStrategy().isPresent()) {
throw new IllegalArgumentException();
}
}

@Override
Expand All @@ -104,8 +131,12 @@ public void update(byte[] data, RemoteBucketState newState, Optional<Long> reque
try {
try (PreparedStatement updateStatement = connection.prepareStatement(updateSqlQuery)) {
applyTimeout(updateStatement, requestTimeoutNanos);
updateStatement.setBytes(1, data);
primaryKeyMapper.set(updateStatement, 2, key);
int i = 0;
updateStatement.setBytes(++i, data);
for (CustomColumnProvider<K> column : customColumns) {
column.setCustomField(key, ++i, updateStatement, newState, currentTimeNanos());
}
primaryKeyMapper.set(updateStatement, ++i, key);
updateStatement.executeUpdate();
}
} catch (SQLException e) {
Expand Down Expand Up @@ -187,4 +218,25 @@ public void removeProxy(K key) {
}
}

@Override
public boolean isExpireAfterWriteSupported() {
return true;
}


@Override
public int removeExpired(int batchSize) {
try (Connection connection = dataSource.getConnection()) {
long currentTimeMillis = System.currentTimeMillis();
try(PreparedStatement clearStatement = connection.prepareStatement(clearExpiredSqlQuery)) {
clearStatement.setLong(1, currentTimeMillis);
clearStatement.setLong(2, currentTimeMillis);
clearStatement.setInt(3, batchSize);
return clearStatement.executeUpdate();
}
} catch (SQLException e) {
throw new BucketExceptions.BucketExecutionException(e);
}
}

}
Expand Up @@ -9,6 +9,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.utility.DockerImageName;

import javax.sql.DataSource;
import java.sql.Connection;
Expand All @@ -28,7 +29,7 @@ public static void initializeInstance() throws SQLException {
container = startMySQLContainer();
dataSource = createJdbcDataSource(container);
BucketTableSettings tableSettings = BucketTableSettings.customSettings("test.bucket", "id", "state");
final String INIT_TABLE_SCRIPT = "CREATE TABLE IF NOT EXISTS {0}({1} BIGINT PRIMARY KEY, {2} BLOB)";
final String INIT_TABLE_SCRIPT = "CREATE TABLE IF NOT EXISTS {0}({1} BIGINT PRIMARY KEY, {2} BLOB, expires_at BIGINT)";
try (Connection connection = dataSource.getConnection()) {
try (Statement statement = connection.createStatement()) {
String query = MessageFormat.format(INIT_TABLE_SCRIPT, tableSettings.getTableName(), tableSettings.getIdName(), tableSettings.getStateName());
Expand All @@ -44,7 +45,7 @@ public static void initializeInstance() throws SQLException {
.table("test.bucket")
.idColumn("id")
.stateColumn("state")
)
).checkExpiration()
);
}

Expand All @@ -56,17 +57,16 @@ public static void shutdown() {
}

private static DataSource createJdbcDataSource(MySQLContainer container) {
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setJdbcUrl(container.getJdbcUrl());
hikariConfig.setUsername(container.getUsername());
hikariConfig.setPassword(container.getPassword());
hikariConfig.setDriverClassName(container.getDriverClassName());
hikariConfig.setMaximumPoolSize(100);
return new HikariDataSource(hikariConfig);
HikariConfig config = new HikariConfig();
config.setJdbcUrl(container.getJdbcUrl());
config.setUsername(container.getUsername());
config.setPassword(container.getPassword());
config.setMaximumPoolSize(10);
return new HikariDataSource(config);
}

private static MySQLContainer startMySQLContainer() {
MySQLContainer container = new MySQLContainer();
MySQLContainer container = new MySQLContainer(DockerImageName.parse("mysql:8.0.36"));
container.start();
return container;
}
Expand Down

0 comments on commit 8badfa6

Please sign in to comment.