Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3444: Add Custom TTL support for RedisLock, and JdbcLock #9053

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.support.locks;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

/**
* A {@link Lock} implementing this interface supports the spring distributed locks with custom time-to-live value per lock
*
* @author Eddie Cho
*
* @since 6.3
*/
public interface CustomTtlLock extends Lock {

/**
* Attempt to acquire a lock with a specific time-to-live
* @param time the maximum time to wait for the lock unit
* @param unit the time unit of the time argument
* @param customTtl the specific time-to-live for the lock status data
* @param customTtlUnit the time unit of the customTtl argument
* @return true if the lock was acquired and false if the waiting time elapsed before the lock was acquired
* @throws InterruptedException -
* if the current thread is interrupted while acquiring the lock (and interruption of lock acquisition is supported)
*/
boolean tryLock(long time, TimeUnit unit, long customTtl, TimeUnit customTtlUnit) throws InterruptedException;

/**
* Attempt to acquire a lock with a specific time-to-live
* @param customTtl the specific time-to-live for the lock status data
* @param customTtlUnit the time unit of the customTtl argument
*/
void lock(long customTtl, TimeUnit customTtlUnit);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* A {@link LockRegistry} implementing this interface supports the CustomTtlLock
*
* @author Eddie Cho
*
* @since 6.3
*/
package org.springframework.integration.support.locks;

public interface CustomTtlLockRegistry extends LockRegistry {

CustomTtlLock obtainCustomTtlLock(Object lockKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
* @author Gary Russell
* @author Alexandre Strubel
* @author Ruslan Stelmachenko
* @author Eddie Cho
*
* @since 4.3
*/
Expand All @@ -76,19 +77,12 @@ public class DefaultLockRepository
*/
public static final String DEFAULT_TABLE_PREFIX = "INT_";

/**
* Default value for the time-to-live property.
*/
public static final Duration DEFAULT_TTL = Duration.ofSeconds(10);

private final String id;

private final JdbcTemplate template;

private final AtomicBoolean started = new AtomicBoolean();

private Duration ttl = DEFAULT_TTL;

private String prefix = DEFAULT_TABLE_PREFIX;

private String region = "DEFAULT";
Expand All @@ -100,7 +94,7 @@ public class DefaultLockRepository

private String deleteExpiredQuery = """
DELETE FROM %sLOCK
WHERE REGION=? AND CREATED_DATE<?
WHERE REGION=? AND EXPIRED_AFTER<?
""";

private String deleteAllQuery = """
Expand All @@ -110,24 +104,24 @@ public class DefaultLockRepository

private String updateQuery = """
UPDATE %sLOCK
SET CLIENT_ID=?, CREATED_DATE=?
WHERE REGION=? AND LOCK_KEY=? AND (CLIENT_ID=? OR CREATED_DATE<?)
SET CLIENT_ID=?, EXPIRED_AFTER=?
WHERE REGION=? AND LOCK_KEY=? AND (CLIENT_ID=? OR EXPIRED_AFTER<?)
""";

private String insertQuery = """
INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, CREATED_DATE)
INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, EXPIRED_AFTER)
VALUES (?, ?, ?, ?)
""";

private String countQuery = """
SELECT COUNT(REGION)
FROM %sLOCK
WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=? AND CREATED_DATE>=?
WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=? AND EXPIRED_AFTER>=?
""";

private String renewQuery = """
UPDATE %sLOCK
SET CREATED_DATE=?
SET EXPIRED_AFTER=?
WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?
""";

Expand Down Expand Up @@ -188,14 +182,6 @@ public void setPrefix(String prefix) {
this.prefix = prefix;
}

/**
* Specify the time (in milliseconds) to expire deadlocks.
* @param timeToLive the time to expire deadlocks.
*/
public void setTimeToLive(int timeToLive) {
this.ttl = Duration.ofMillis(timeToLive);
}

/**
* Set a {@link PlatformTransactionManager} for operations.
* Otherwise, a primary {@link PlatformTransactionManager} bean is obtained
Expand All @@ -219,8 +205,8 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
* <pre class="code">
* {@code
* UPDATE %sLOCK
* SET CLIENT_ID=?, CREATED_DATE=?
* WHERE REGION=? AND LOCK_KEY=? AND (CLIENT_ID=? OR CREATED_DATE<?)
* SET CLIENT_ID=?, EXPIRED_AFTER=?
* WHERE REGION=? AND LOCK_KEY=? AND (CLIENT_ID=? OR EXPIRED_AFTER<?)
* }
* </pre>
* @param updateQuery the query to update a lock record.
Expand All @@ -247,7 +233,7 @@ public String getUpdateQuery() {
* Set a custom {@code INSERT} query for a lock record.
* The {@link #getInsertQuery()} can be used as a template for customization.
* The default query is
* {@code INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, CREATED_DATE) VALUES (?, ?, ?, ?)}.
* {@code INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, EXPIRED_AFTER) VALUES (?, ?, ?, ?)}.
* For example a PostgreSQL {@code ON CONFLICT DO NOTHING} hint can be provided like this:
* <pre class="code">
* {@code
Expand Down Expand Up @@ -281,7 +267,7 @@ public String getInsertQuery() {
* <pre class="code">
* {@code
* UPDATE %sLOCK
* SET CREATED_DATE=?
* SET EXPIRED_AFTER=?
* WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?
* }
* </pre>
Expand Down Expand Up @@ -389,23 +375,23 @@ public void close() {
}

@Override
public void delete(String lock) {
this.defaultTransactionTemplate.executeWithoutResult(
transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id));
public boolean delete(String lock) {
return this.defaultTransactionTemplate.execute(
transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id)) > 0;
}

@Override
public boolean acquire(String lock) {
public boolean acquire(String lock, Duration ttlDuration) {
Boolean result =
this.readCommittedTransactionTemplate.execute(
transactionStatus -> {
if (this.template.update(this.updateQuery, this.id, epochMillis(),
this.region, lock, this.id, ttlEpochMillis()) > 0) {
if (this.template.update(this.updateQuery, this.id, ttlEpochMillis(ttlDuration),
this.region, lock, this.id, epochMillis()) > 0) {
return true;
}
try {
return this.template.update(this.insertQuery, this.region, lock, this.id,
epochMillis()) > 0;
ttlEpochMillis(ttlDuration)) > 0;
}
catch (DataIntegrityViolationException ex) {
return false;
Expand All @@ -420,27 +406,27 @@ public boolean isAcquired(String lock) {
transactionStatus ->
Integer.valueOf(1).equals(
this.template.queryForObject(this.countQuery,
Integer.class, this.region, lock, this.id, ttlEpochMillis())));
Integer.class, this.region, lock, this.id, epochMillis())));
return Boolean.TRUE.equals(result);
}

@Override
public void deleteExpired() {
this.defaultTransactionTemplate.executeWithoutResult(
transactionStatus ->
this.template.update(this.deleteExpiredQuery, this.region, ttlEpochMillis()));
this.template.update(this.deleteExpiredQuery, this.region, epochMillis()));
}

@Override
public boolean renew(String lock) {
public boolean renew(String lock, Duration ttlDuration) {
final Boolean result = this.defaultTransactionTemplate.execute(
transactionStatus ->
this.template.update(this.renewQuery, epochMillis(), this.region, lock, this.id) > 0);
this.template.update(this.renewQuery, ttlEpochMillis(ttlDuration), this.region, lock, this.id) > 0);
return Boolean.TRUE.equals(result);
}

private Timestamp ttlEpochMillis() {
return Timestamp.valueOf(currentTime().minus(this.ttl));
private Timestamp ttlEpochMillis(Duration ttl) {
return Timestamp.valueOf(currentTime().plus(ttl));
}

private static Timestamp epochMillis() {
Expand Down