Skip to content

Commit

Permalink
Add support throttling admin API requests #849
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesChenX committed Oct 7, 2021
1 parent 21bf3b9 commit d6c7221
Show file tree
Hide file tree
Showing 20 changed files with 284 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import im.turms.common.constant.UserStatus;
import im.turms.common.model.dto.notification.TurmsNotification;
import im.turms.gateway.pojo.bo.session.UserSession;
import im.turms.gateway.throttle.TokenBucketContext;
import im.turms.server.common.throttle.TokenBucketContext;
import im.turms.server.common.dto.CloseReason;
import im.turms.server.common.lang.ConcurrentEnumMap;
import im.turms.server.common.util.ProtoUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import im.turms.common.model.dto.notification.TurmsNotification;
import im.turms.common.util.RandomUtil;
import im.turms.gateway.pojo.bo.session.connection.NetConnection;
import im.turms.gateway.throttle.TokenBucket;
import im.turms.gateway.throttle.TokenBucketContext;
import im.turms.server.common.throttle.TokenBucket;
import im.turms.server.common.throttle.TokenBucketContext;
import im.turms.server.common.dto.CloseReason;
import im.turms.server.common.tracing.TracingContext;
import io.netty.buffer.ByteBuf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@
import im.turms.common.constant.statuscode.SessionCloseStatus;
import im.turms.gateway.manager.HeartbeatManager;
import im.turms.gateway.manager.UserSessionsManager;
import im.turms.gateway.plugin.extension.UserOnlineStatusChangeHandler;
import im.turms.gateway.plugin.TurmsPluginManager;
import im.turms.gateway.plugin.extension.UserOnlineStatusChangeHandler;
import im.turms.gateway.pojo.bo.session.UserSession;
import im.turms.gateway.service.impl.observability.MetricsService;
import im.turms.gateway.throttle.TokenBucketContext;
import im.turms.server.common.bo.session.UserSessionsStatus;
import im.turms.server.common.cluster.node.Node;
import im.turms.server.common.constant.TurmsStatusCode;
Expand All @@ -40,11 +39,11 @@
import im.turms.server.common.property.TurmsPropertiesManager;
import im.turms.server.common.property.env.gateway.GatewayProperties;
import im.turms.server.common.property.env.gateway.SessionProperties;
import im.turms.server.common.property.env.gateway.clientapi.RateLimitingProperties;
import im.turms.server.common.rpc.request.SetUserOfflineRequest;
import im.turms.server.common.rpc.service.ISessionService;
import im.turms.server.common.service.session.SessionLocationService;
import im.turms.server.common.service.session.UserStatusService;
import im.turms.server.common.throttle.TokenBucketContext;
import im.turms.server.common.util.AssertUtil;
import im.turms.server.common.util.DeviceTypeUtil;
import im.turms.server.common.util.MapUtil;
Expand Down Expand Up @@ -124,8 +123,7 @@ public SessionService(
SessionProperties sessionProperties = gatewayProperties.getSession();
closeIdleSessionAfterSeconds = sessionProperties.getCloseIdleSessionAfterSeconds();

requestTokenBucketContext = new TokenBucketContext();
updateRequestTokenBucket(requestTokenBucketContext, gatewayProperties.getClientApi().getRateLimiting());
requestTokenBucketContext = new TokenBucketContext(gatewayProperties.getClientApi().getRateLimiting());

heartbeatManager = new HeartbeatManager(this,
userStatusService,
Expand All @@ -144,21 +142,14 @@ public SessionService(
heartbeatManager.setMinHeartbeatIntervalMillis(newSessionProperties.getMinHeartbeatIntervalSeconds() * 1000);
heartbeatManager.setSwitchProtocolAfterMillis(newSessionProperties.getSwitchProtocolAfterSeconds() * 1000);

updateRequestTokenBucket(requestTokenBucketContext, newGatewayProperties.getClientApi().getRateLimiting());
requestTokenBucketContext.updateRequestTokenBucket(newGatewayProperties.getClientApi().getRateLimiting());
});

MeterRegistry registry = metricsService.getRegistry();
loggedInUsersCounter = registry.counter(LOGGED_IN_USERS_COUNTER_NAME);
registry.gaugeMapSize(ONLINE_USERS_GAUGE_NAME, Tags.empty(), sessionsManagerByUserId);
}

private void updateRequestTokenBucket(TokenBucketContext context, RateLimitingProperties properties) {
context.setCapacity(properties.getCapacity());
context.setInitialTokens(properties.getInitialTokens());
context.setTokensPerPeriod(properties.getTokensPerPeriod());
context.setRefillIntervalMillis(properties.getRefillIntervalMillis());
}

@PreDestroy
public void destroy() {
heartbeatManager.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import im.turms.common.constant.statuscode.SessionCloseStatus;
import im.turms.gateway.manager.UserSessionsManager;
import im.turms.gateway.pojo.bo.session.connection.NetConnection;
import im.turms.gateway.throttle.TokenBucketContext;
import im.turms.server.common.throttle.TokenBucketContext;
import im.turms.server.common.dto.CloseReason;
import org.junit.jupiter.api.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import im.turms.common.constant.DeviceType;
import im.turms.gateway.pojo.bo.session.UserSession;
import im.turms.gateway.throttle.TokenBucketContext;
import im.turms.server.common.throttle.TokenBucketContext;
import org.junit.jupiter.api.Test;
import org.springframework.data.geo.Point;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import im.turms.gateway.pojo.bo.session.connection.TcpConnection;
import im.turms.gateway.service.impl.message.OutboundMessageService;
import im.turms.gateway.service.impl.session.SessionService;
import im.turms.gateway.throttle.TokenBucketContext;
import im.turms.server.common.throttle.TokenBucketContext;
import im.turms.server.common.cluster.node.Node;
import im.turms.server.common.property.TurmsProperties;
import im.turms.server.common.property.TurmsPropertiesManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import im.turms.gateway.service.impl.session.UserService;
import im.turms.gateway.service.impl.session.UserSimultaneousLoginService;
import im.turms.gateway.service.mediator.ServiceMediator;
import im.turms.gateway.throttle.TokenBucketContext;
import im.turms.server.common.throttle.TokenBucketContext;
import im.turms.server.common.cluster.node.Node;
import im.turms.server.common.constant.TurmsStatusCode;
import im.turms.server.common.dto.ServiceRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,6 @@ private CronConstant() {

public static final String EXPIRED_BLOCKED_CLIENT_CLEANUP_CRON = "0 0 1/6 * * *";

public static final String EXPIRED_ADMIN_API_ACCESS_INFO_CLEANUP_CRON = "0 0/30 * * * *";

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.bson.BsonDocument;
import org.bson.BsonNull;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.conversions.Bson;

Expand All @@ -43,8 +44,8 @@
public class Filter implements Bson {

/**
* Use org.bson.BsonDocument instead of org.bson.Document
* because Document will be converted to BsonDocument by mongo-java-driver finally,
* Use {@link BsonDocument} instead of {@link Document}
* because {@link Document} will be converted to {@link BsonDocument} by mongo-java-driver finally,
* which is a huge waste of system resources because both documents are heavy
*/
private final BsonDocument document;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
* limitations under the License.
*/

package im.turms.server.common.property.env.gateway.clientapi;
package im.turms.server.common.property.env.common;

import com.fasterxml.jackson.annotation.JsonView;
import im.turms.server.common.property.metadata.annotation.Description;
import im.turms.server.common.property.metadata.annotation.GlobalProperty;
import im.turms.server.common.property.metadata.view.MutablePropertiesView;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

Expand All @@ -32,10 +31,9 @@
* @author James Chen
*/
@AllArgsConstructor
@Builder(toBuilder = true)
@Data
@NoArgsConstructor
public class RateLimitingProperties {
public abstract class CommonRateLimitingProperties {

@Description("The maximum number of tokens that the bucket can hold")
@GlobalProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class ClientApiProperties {
private ClientApiLoggingProperties logging = new ClientApiLoggingProperties();

@NestedConfigurationProperty
private RateLimitingProperties rateLimiting = new RateLimitingProperties();
private ClientApiRateLimitingProperties rateLimiting = new ClientApiRateLimitingProperties();

public void setReturnReasonForServerError(boolean returnReasonForServerError) {
this.returnReasonForServerError = returnReasonForServerError;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,14 @@
* limitations under the License.
*/

package im.turms.gateway.throttle;
package im.turms.server.common.property.env.gateway.clientapi;

import lombok.Setter;
import im.turms.server.common.property.env.common.CommonRateLimitingProperties;
import lombok.NoArgsConstructor;

/**
* @author James Chen
*/
@Setter
public class TokenBucketContext {
int capacity;
int tokensPerPeriod;
int refillIntervalMillis;
int initialTokens;
@NoArgsConstructor
public class ClientApiRateLimitingProperties extends CommonRateLimitingProperties {
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public class AdminApiProperties {
@NestedConfigurationProperty
private AddressProperties address = new AddressProperties();

@JsonView(MutablePropertiesView.class)
@NestedConfigurationProperty
private AdminApiRateLimitingProperties rateLimiting = new AdminApiRateLimitingProperties();

@JsonView(MutablePropertiesView.class)
@NestedConfigurationProperty
private LogProperties log = new LogProperties();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (C) 2019 The Turms Project
* https://github.com/turms-im/turms
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package im.turms.server.common.property.env.service.env.adminapi;

import im.turms.server.common.property.env.common.CommonRateLimitingProperties;

/**
* @author James Chen
*/
public class AdminApiRateLimitingProperties extends CommonRateLimitingProperties {

public AdminApiRateLimitingProperties() {
setCapacity(50);
setInitialTokens(50);
setTokensPerPeriod(50);
setRefillIntervalMillis(1000);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
* limitations under the License.
*/

package im.turms.gateway.throttle;
package im.turms.server.common.throttle;

import lombok.Getter;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

Expand All @@ -29,6 +31,7 @@ public class TokenBucket {

private final TokenBucketContext context;

@Getter
private volatile int tokens;
private volatile long lastRefillTime;

Expand Down Expand Up @@ -58,17 +61,35 @@ public boolean tryAcquire(long time) {
return false;
}
int periods = (int) (time - lastRefillTime) / refillInterval;
if (periods > 0) {
// We expect tokensPerPeriod is always greater than 0
// so tokenCount can be always greater than or equals to 0
tokenCount = Math.min(periods * context.tokensPerPeriod - 1, context.capacity);
if (TOKENS_UPDATER.compareAndSet(this, 0, tokenCount)) {
lastRefillTime = time;
return true;
}
return tryAcquire(time);
if (periods <= 0) {
return false;
}
// We expect tokensPerPeriod is always greater than 0
// so tokenCount can be always greater than or equals to 0
tokenCount = Math.min(periods * context.tokensPerPeriod - 1, context.capacity);
if (TOKENS_UPDATER.compareAndSet(this, 0, tokenCount)) {
lastRefillTime = time;
return true;
}
return tryAcquire(time);
}

public void refill(long time) {
int refillInterval = context.refillIntervalMillis;
if (refillInterval <= 0) {
return;
}
int periods = (int) (time - lastRefillTime) / refillInterval;
if (periods <= 0) {
return;
}
int tokenCount = tokens;
int newTokenCount = Math.min(tokenCount + periods * context.tokensPerPeriod, context.capacity);
if (TOKENS_UPDATER.compareAndSet(this, tokenCount, newTokenCount)) {
lastRefillTime = time;
} else {
refill(time);
}
return false;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2019 The Turms Project
* https://github.com/turms-im/turms
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package im.turms.server.common.throttle;

import im.turms.server.common.property.env.common.CommonRateLimitingProperties;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* @author James Chen
*/
@Data
@NoArgsConstructor
public class TokenBucketContext {
int capacity;
int tokensPerPeriod;
int refillIntervalMillis;
int initialTokens;

public TokenBucketContext(CommonRateLimitingProperties properties) {
updateRequestTokenBucket(properties);
}

public void updateRequestTokenBucket(CommonRateLimitingProperties properties) {
capacity = properties.getCapacity();
initialTokens = properties.getInitialTokens();
tokensPerPeriod = properties.getTokensPerPeriod();
refillIntervalMillis = properties.getRefillIntervalMillis();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
* limitations under the License.
*/

package unit.im.turms.gateway.throttle;
package unit.im.turms.server.common.throttle;

import im.turms.gateway.throttle.TokenBucket;
import im.turms.gateway.throttle.TokenBucketContext;
import im.turms.server.common.throttle.TokenBucket;
import im.turms.server.common.throttle.TokenBucketContext;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down
5 changes: 2 additions & 3 deletions turms-server-common/src/test/java/util/JarUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ private void JarFile() {

@SneakyThrows
public static Path createJarFile(String outputFile, List<Class<?>> classEntries, List<String> resources) {
ClassLoader loader = Thread.currentThread().getContextClassLoader();
ClassLoader loader = JarUtil.class.getClassLoader();
URI jarFileUri = loader.getResource(".").toURI().resolve(outputFile);
Path jarPath = Paths.get(jarFileUri);
File jarFile = jarPath.toFile();
File jarFile = Paths.get(jarFileUri).toFile();
Manifest manifest = new Manifest();
manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0");
try (FileOutputStream fos = new FileOutputStream(jarFile, false);
Expand Down

0 comments on commit d6c7221

Please sign in to comment.