forked from hazelcast/hazelcast
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ExpirySystem.java
393 lines (337 loc) · 15.9 KB
/
ExpirySystem.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
/*
* Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hazelcast.map.impl.recordstore.expiry;
import com.hazelcast.config.MapConfig;
import com.hazelcast.internal.eviction.ClearExpiredRecordsTask;
import com.hazelcast.internal.eviction.ExpiredKey;
import com.hazelcast.internal.nearcache.impl.invalidation.InvalidationQueue;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.util.ExceptionUtil;
import com.hazelcast.internal.util.MapUtil;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.impl.ExpirationTimeSetter;
import com.hazelcast.map.impl.MapContainer;
import com.hazelcast.map.impl.MapServiceContext;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.spi.properties.HazelcastProperty;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import static com.hazelcast.internal.util.ToHeapDataConverter.toHeapData;
import static com.hazelcast.map.impl.ExpirationTimeSetter.pickMaxIdleMillis;
import static com.hazelcast.map.impl.ExpirationTimeSetter.pickTTLMillis;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
* This class has all logic to remove expired entries. Expiry reason
* can be ttl or idleness. An instance of this class is created for
* each {@link RecordStore} and it is always accessed by same single thread.
*/
public class ExpirySystem {
private static final long DEFAULT_EXPIRED_KEY_SCAN_TIMEOUT_NANOS
= TimeUnit.MILLISECONDS.toNanos(1);
private static final String PROP_EXPIRED_KEY_SCAN_TIMEOUT_NANOS
= "hazelcast.internal.map.expired.key.scan.timeout.nanos";
private static final HazelcastProperty EXPIRED_KEY_SCAN_TIMEOUT_NANOS
= new HazelcastProperty(PROP_EXPIRED_KEY_SCAN_TIMEOUT_NANOS,
DEFAULT_EXPIRED_KEY_SCAN_TIMEOUT_NANOS, NANOSECONDS);
private static final int ONE_HUNDRED_PERCENT = 100;
private static final int MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN = 100;
private static final int MAX_SAMPLE_AT_A_TIME = 16;
private static final ThreadLocal<List> BATCH_OF_EXPIRED
= ThreadLocal.withInitial(() -> new ArrayList<>(MAX_SAMPLE_AT_A_TIME << 1));
private final long expiryDelayMillis;
private final long expiredKeyScanTimeoutNanos;
private final boolean canPrimaryDriveExpiration;
private final ILogger logger;
private final RecordStore recordStore;
private final MapContainer mapContainer;
private final MapServiceContext mapServiceContext;
private final ClearExpiredRecordsTask clearExpiredRecordsTask;
private final InvalidationQueue<ExpiredKey> expiredKeys = new InvalidationQueue<>();
private Iterator<Map.Entry<Data, ExpiryMetadata>> cachedExpirationIterator;
// This is volatile since it can be initialized at runtime lazily and
// can be accessed by query threads besides partition ones.
private volatile Map<Data, ExpiryMetadata> expireTimeByKey;
public ExpirySystem(RecordStore recordStore,
MapContainer mapContainer,
MapServiceContext mapServiceContext) {
this.recordStore = recordStore;
this.clearExpiredRecordsTask = mapServiceContext.getExpirationManager().getTask();
NodeEngine nodeEngine = mapServiceContext.getNodeEngine();
this.logger = nodeEngine.getLogger(getClass());
HazelcastProperties hazelcastProperties = nodeEngine.getProperties();
this.expiryDelayMillis = hazelcastProperties.getMillis(ClusterProperty.MAP_EXPIRY_DELAY_SECONDS);
this.mapContainer = mapContainer;
this.mapServiceContext = mapServiceContext;
this.canPrimaryDriveExpiration = mapServiceContext.getClearExpiredRecordsTask().canPrimaryDriveExpiration();
this.expiredKeyScanTimeoutNanos = nodeEngine.getProperties().getNanos(EXPIRED_KEY_SCAN_TIMEOUT_NANOS);
}
public final boolean isEmpty() {
return MapUtil.isNullOrEmpty(expireTimeByKey);
}
// this method is overridden
protected Map<Data, ExpiryMetadata> createExpiryTimeByKeyMap() {
// Operation and partition threads can have concurrent access
// to this class that's why we used CHM here. Also its
// iterator doesn't throw ConcurrentModificationException
// and this makes incremental scanning of expirable
// entries easy(see method `scanAndEvictExpiredKeys`).
return new ConcurrentHashMap<>();
}
// this method is overridden
public void clear() {
Map<Data, ExpiryMetadata> map = getOrCreateExpireTimeByKeyMap(false);
map.clear();
}
protected Map<Data, ExpiryMetadata> getOrCreateExpireTimeByKeyMap(boolean createIfAbsent) {
if (expireTimeByKey != null) {
return expireTimeByKey;
}
if (createIfAbsent) {
expireTimeByKey = createExpiryTimeByKeyMap();
return expireTimeByKey;
}
return Collections.emptyMap();
}
// this method is overridden
protected ExpiryMetadata createExpiryMetadata(long ttlMillis, long maxIdleMillis, long expirationTime) {
return new ExpiryMetadataImpl(ttlMillis, maxIdleMillis, expirationTime);
}
public final void addKeyIfExpirable(Data key, long ttl, long maxIdle, long expiryTime, long now) {
if (expiryTime <= 0) {
MapConfig mapConfig = mapContainer.getMapConfig();
long ttlMillis = pickTTLMillis(ttl, mapConfig);
long maxIdleMillis = pickMaxIdleMillis(maxIdle, mapConfig);
long expirationTime = ExpirationTimeSetter.calculateExpirationTime(ttlMillis, maxIdleMillis, now);
addExpirableKey(key, ttlMillis, maxIdleMillis, expirationTime);
} else {
addExpirableKey(key, ttl, maxIdle, expiryTime);
}
}
private void addExpirableKey(Data key, long ttlMillis, long maxIdleMillis, long expirationTime) {
if (expirationTime == Long.MAX_VALUE) {
if (!isEmpty()) {
callRemove(key, expireTimeByKey);
}
return;
}
Map<Data, ExpiryMetadata> expireTimeByKey = getOrCreateExpireTimeByKeyMap(true);
ExpiryMetadata expiryMetadata = expireTimeByKey.get(key);
if (expiryMetadata == null) {
expiryMetadata = createExpiryMetadata(ttlMillis, maxIdleMillis, expirationTime);
Data nativeKey = recordStore.getStorage().toBackingDataKeyFormat(key);
expireTimeByKey.put(nativeKey, expiryMetadata);
} else {
expiryMetadata.setTtl(ttlMillis)
.setMaxIdle(maxIdleMillis)
.setExpirationTime(expirationTime);
}
mapServiceContext.getExpirationManager().scheduleExpirationTask();
}
public final long calculateExpirationTime(long ttl, long maxIdle, long now) {
MapConfig mapConfig = mapContainer.getMapConfig();
long ttlMillis = pickTTLMillis(ttl, mapConfig);
long maxIdleMillis = pickMaxIdleMillis(maxIdle, mapConfig);
return ExpirationTimeSetter.calculateExpirationTime(ttlMillis, maxIdleMillis, now);
}
public final void removeKeyFromExpirySystem(Data key) {
Map<Data, ExpiryMetadata> expireTimeByKey = getOrCreateExpireTimeByKeyMap(false);
if (isEmpty()) {
return;
}
callRemove(key, expireTimeByKey);
}
public final void extendExpiryTime(Data dataKey, long now) {
if (isEmpty()) {
return;
}
Map<Data, ExpiryMetadata> expireTimeByKey = getOrCreateExpireTimeByKeyMap(false);
if (isEmpty()) {
return;
}
ExpiryMetadata expiryMetadata = getExpiryMetadataForExpiryCheck(dataKey, expireTimeByKey);
if (expiryMetadata == null
|| expiryMetadata.getMaxIdle() == Long.MAX_VALUE) {
return;
}
long expirationTime = ExpirationTimeSetter.calculateExpirationTime(expiryMetadata.getTtl(),
expiryMetadata.getMaxIdle(), now);
expiryMetadata.setExpirationTime(expirationTime);
}
public final ExpiryReason hasExpired(Data key, long now, boolean backup) {
Map<Data, ExpiryMetadata> expireTimeByKey = getOrCreateExpireTimeByKeyMap(false);
if (isEmpty()) {
return ExpiryReason.NOT_EXPIRED;
}
ExpiryMetadata expiryMetadata = getExpiryMetadataForExpiryCheck(key, expireTimeByKey);
return hasExpired(expiryMetadata, now, backup);
}
private ExpiryReason hasExpired(ExpiryMetadata expiryMetadata, long now, boolean backup) {
if (expiryMetadata == null) {
return ExpiryReason.NOT_EXPIRED;
}
long nextExpirationTime = backup
? expiryMetadata.getExpirationTime() + expiryDelayMillis
: expiryMetadata.getExpirationTime();
if (nextExpirationTime > now) {
return ExpiryReason.NOT_EXPIRED;
}
ExpiryReason expiryReason = expiryMetadata.getMaxIdle() <= expiryMetadata.getTtl()
? ExpiryReason.MAX_IDLE_SECONDS : ExpiryReason.TTL;
if (backup && canPrimaryDriveExpiration
&& expiryReason == ExpiryReason.MAX_IDLE_SECONDS) {
return ExpiryReason.NOT_EXPIRED;
}
return expiryReason;
}
public final InvalidationQueue<ExpiredKey> getExpiredKeys() {
return expiredKeys;
}
@Nonnull
public final ExpiryMetadata getExpiredMetadata(Data key) {
ExpiryMetadata expiryMetadata = getOrCreateExpireTimeByKeyMap(false).get(key);
return expiryMetadata != null ? expiryMetadata : ExpiryMetadata.NULL;
}
@SuppressWarnings("checkstyle:magicnumber")
public final void evictExpiredEntries(final int percentage, final long now, final boolean backup) {
// 1. Find how many keys we can scan at max.
final int maxScannableCount = findMaxScannableCount(percentage);
if (maxScannableCount == 0) {
// no expirable entry exists.
return;
}
// 2. Do scanning and evict expired keys.
int scannedCount = 0;
int expiredCount = 0;
try {
long scanLoopStartNanos = System.nanoTime();
do {
scannedCount += findExpiredKeys(now, backup);
expiredCount += evictExpiredKeys(backup);
} while (scannedCount < maxScannableCount && getOrInitCachedIterator().hasNext()
&& (System.nanoTime() - scanLoopStartNanos) < expiredKeyScanTimeoutNanos);
} catch (Exception e) {
BATCH_OF_EXPIRED.get().clear();
throw ExceptionUtil.rethrow(e);
}
// 3. Send expired keys to backups(only valid for max-idle-expiry)
tryToSendBackupExpiryOp();
if (logger.isFinestEnabled()) {
logProgress(maxScannableCount, scannedCount, expiredCount);
}
}
private void logProgress(int maxScannableCount, int scannedCount, int expiredCount) {
logger.finest(String.format("mapName: %s, partitionId: %d, partitionSize: %d, "
+ "remainingKeyCountToExpire: %d, maxScannableKeyCount: %d, "
+ "scannedKeyCount: %d, expiredKeyCount: %d"
, recordStore.getName(), recordStore.getPartitionId(), recordStore.size()
, expireTimeByKey.size(), maxScannableCount, scannedCount, expiredCount));
}
private int findMaxScannableCount(int percentage) {
Map<Data, ExpiryMetadata> expireTimeByKey = getOrCreateExpireTimeByKeyMap(false);
if (isEmpty()) {
return 0;
}
int numberOfExpirableKeys = expireTimeByKey.size();
if (numberOfExpirableKeys <= MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN) {
return numberOfExpirableKeys;
}
int percentageOfExpirableKeys = (int) (1D * numberOfExpirableKeys * percentage / ONE_HUNDRED_PERCENT);
return Math.max(MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN, percentageOfExpirableKeys);
}
/**
* Get cachedExpirationIterator or init it if it has no next entry.
*/
private Iterator<Map.Entry<Data, ExpiryMetadata>> getOrInitCachedIterator() {
if (cachedExpirationIterator == null || !cachedExpirationIterator.hasNext()) {
cachedExpirationIterator = initIteratorOf(expireTimeByKey);
}
return cachedExpirationIterator;
}
private int findExpiredKeys(long now, boolean backup) {
List batchOfExpired = BATCH_OF_EXPIRED.get();
int scannedCount = 0;
Iterator<Map.Entry<Data, ExpiryMetadata>> cachedIterator = getOrInitCachedIterator();
while (scannedCount++ < MAX_SAMPLE_AT_A_TIME && cachedIterator.hasNext()) {
Map.Entry<Data, ExpiryMetadata> entry = cachedIterator.next();
Data key = entry.getKey();
ExpiryMetadata expiryMetadata = entry.getValue();
ExpiryReason expiryReason = hasExpired(expiryMetadata, now, backup);
if (expiryReason != ExpiryReason.NOT_EXPIRED && !recordStore.isLocked(key)) {
// add key and expiryReason to list to evict them later
batchOfExpired.add(key);
batchOfExpired.add(expiryReason);
}
}
return scannedCount;
}
private int evictExpiredKeys(boolean backup) {
int evictedCount = 0;
List batchOfExpired = BATCH_OF_EXPIRED.get();
try {
for (int i = 0; i < batchOfExpired.size(); i += 2) {
Data key = (Data) batchOfExpired.get(i);
ExpiryReason expiryReason = (ExpiryReason) batchOfExpired.get(i + 1);
recordStore.evictExpiredEntryAndPublishExpiryEvent(key, expiryReason, backup);
callRemove(key, expireTimeByKey);
evictedCount++;
}
} finally {
batchOfExpired.clear();
}
return evictedCount;
}
// this method is overridden
protected ExpiryMetadata getExpiryMetadataForExpiryCheck(Data key,
Map<Data, ExpiryMetadata> expireTimeByKey) {
return expireTimeByKey.get(key);
}
// this method is overridden
protected Iterator<Map.Entry<Data, ExpiryMetadata>> initIteratorOf(Map<Data, ExpiryMetadata> expireTimeByKey) {
return expireTimeByKey.entrySet().iterator();
}
// this method is overridden
protected void callRemove(Data key, Map<Data, ExpiryMetadata> expireTimeByKey) {
expireTimeByKey.remove(key);
}
// this method is overridden
public void destroy() {
getOrCreateExpireTimeByKeyMap(false).clear();
}
public final void accumulateOrSendExpiredKey(Data dataKey, long valueHashCode) {
if (mapContainer.getTotalBackupCount() == 0) {
return;
}
if (dataKey != null) {
expiredKeys.offer(new ExpiredKey(toHeapData(dataKey), valueHashCode));
}
clearExpiredRecordsTask.tryToSendBackupExpiryOp(recordStore, true);
}
public final void tryToSendBackupExpiryOp() {
if (mapContainer.getTotalBackupCount() == 0) {
return;
}
clearExpiredRecordsTask.tryToSendBackupExpiryOp(recordStore, true);
}
}