From b3711cd98a7cf52bf1c48611781d2b0c0b723769 Mon Sep 17 00:00:00 2001 From: Ahmet Mircik Date: Wed, 14 Apr 2021 08:20:55 +0300 Subject: [PATCH] get for client fix --- .../protocol/task/map/MapGetMessageTask.java | 2 +- .../impl/operation/ClearBackupOperation.java | 2 +- .../map/impl/operation/ClearOperation.java | 2 +- .../map/impl/operation/EntryOperation.java | 2 +- .../operation/EvictAllBackupOperation.java | 2 +- .../map/impl/operation/EvictAllOperation.java | 2 +- .../operation/EvictBatchBackupOperation.java | 2 +- .../map/impl/operation/MapOperation.java | 47 +++++++++++++++---- .../impl/operation/MapOperationProvider.java | 4 +- .../impl/recordstore/expiry/ExpirySystem.java | 18 ++++--- pom.xml | 1 - 11 files changed, 59 insertions(+), 25 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/map/MapGetMessageTask.java b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/map/MapGetMessageTask.java index a8a939ff5ca8f..a9b8951f60d9e 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/map/MapGetMessageTask.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/map/MapGetMessageTask.java @@ -54,6 +54,7 @@ protected ClientMessage encodeResponse(Object response) { protected Operation prepareOperation() { MapOperationProvider operationProvider = getMapOperationProvider(parameters.name); MapOperation operation = operationProvider.createGetOperation(parameters.name, parameters.key); + operation.setClientCall(true); operation.setThreadId(parameters.threadId); return operation; } @@ -74,7 +75,6 @@ protected Object processResponseBeforeSending(Object response) { return response; } - @Override public String getServiceName() { return MapService.SERVICE_NAME; diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/ClearBackupOperation.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/ClearBackupOperation.java index 017dbac9649d3..4caf78afa3d99 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/ClearBackupOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/ClearBackupOperation.java @@ -27,7 +27,7 @@ public ClearBackupOperation() { public ClearBackupOperation(String name) { super(name); - createRecordStoreOnDemand = false; + setCreateRecordStoreOnDemand(false); } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/ClearOperation.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/ClearOperation.java index 33d8016c87a3a..509873ac91477 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/ClearOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/ClearOperation.java @@ -37,7 +37,7 @@ public ClearOperation() { public ClearOperation(String name) { super(name); - createRecordStoreOnDemand = false; + setCreateRecordStoreOnDemand(false); } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EntryOperation.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EntryOperation.java index 2247c38bfb1e8..6c94e1ef68cea 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EntryOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EntryOperation.java @@ -184,7 +184,7 @@ public CallStatus call() { } // when offloading is enabled, left disposing // to EntryOffloadableSetUnlockOperation - disposeDeferredBlocks = !offload; + setDisposeDeferredBlocks(!offload); if (offload) { return new EntryOperationOffload(getCallerAddress()); diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EvictAllBackupOperation.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EvictAllBackupOperation.java index eb395ac5ccc53..09cb357c19408 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EvictAllBackupOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EvictAllBackupOperation.java @@ -30,7 +30,7 @@ public EvictAllBackupOperation() { public EvictAllBackupOperation(String name) { super(name); - createRecordStoreOnDemand = false; + setCreateRecordStoreOnDemand(false); } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EvictAllOperation.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EvictAllOperation.java index cd697263500ca..fac800d8af65f 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EvictAllOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EvictAllOperation.java @@ -43,7 +43,7 @@ public EvictAllOperation() { public EvictAllOperation(String name) { super(name); - createRecordStoreOnDemand = false; + setCreateRecordStoreOnDemand(false); } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EvictBatchBackupOperation.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EvictBatchBackupOperation.java index 0764336134ecb..d240a9966a2fa 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EvictBatchBackupOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/EvictBatchBackupOperation.java @@ -54,7 +54,7 @@ public EvictBatchBackupOperation(String name, this.name = name; this.expiredKeys = expiredKeys; this.primaryEntryCount = primaryEntryCount; - this.createRecordStoreOnDemand = false; + setCreateRecordStoreOnDemand(false); } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MapOperation.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MapOperation.java index 3e554f0d12fd4..602d8d97a10a7 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MapOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MapOperation.java @@ -58,6 +58,10 @@ public abstract class MapOperation extends AbstractNamedOperation implements IdentifiedDataSerializable, ServiceNamespaceAware { private static final boolean ASSERTION_ENABLED = MapOperation.class.desiredAssertionStatus(); + static final int BITMASK_CREATE_RECORD_STORE_ON_DEMAND = 1; + static final int BITMASK_DISPOSE_DEFERRED_BLOCK = 1 << 1; + static final int BITMASK_CAN_PUBLISH_WAN_EVENT = 1 << 2; + static final int BITMASK_IS_CLIENT_CALL = 1 << 3; protected transient MapService mapService; protected transient RecordStore recordStore; @@ -65,10 +69,7 @@ public abstract class MapOperation extends AbstractNamedOperation protected transient MapServiceContext mapServiceContext; protected transient MapEventPublisher mapEventPublisher; - protected transient boolean createRecordStoreOnDemand = true; - protected transient boolean disposeDeferredBlocks = true; - - private transient boolean canPublishWanEvent; + private transient byte mapFlags = (byte) 0b00000011; public MapOperation() { } @@ -97,13 +98,41 @@ public final void beforeRun() throws Exception { throw rethrow(t, Exception.class); } - canPublishWanEvent = canPublishWanEvent(mapContainer); + setMapFlag(canPublishWanEvent(mapContainer), BITMASK_CAN_PUBLISH_WAN_EVENT); assertNativeMapOnPartitionThread(); innerBeforeRun(); } + public void setClientCall(boolean isClientCall) { + setMapFlag(isClientCall, BITMASK_IS_CLIENT_CALL); + } + + protected boolean isClientCall() { + return isMapFlagSet(BITMASK_IS_CLIENT_CALL); + } + + protected void setCreateRecordStoreOnDemand(boolean value) { + setMapFlag(value, BITMASK_CREATE_RECORD_STORE_ON_DEMAND); + } + + protected void setDisposeDeferredBlocks(boolean value) { + setMapFlag(value, BITMASK_DISPOSE_DEFERRED_BLOCK); + } + + private void setMapFlag(boolean value, int bitmask) { + if (value) { + mapFlags |= bitmask; + } else { + mapFlags &= ~bitmask; + } + } + + private boolean isMapFlagSet(int bitmask) { + return (mapFlags & bitmask) != 0; + } + protected void innerBeforeRun() throws Exception { // Intentionally empty method body. // Concrete classes can override this method. @@ -168,7 +197,7 @@ private RecordStore getRecordStoreOrNull() { return null; } PartitionContainer partitionContainer = mapServiceContext.getPartitionContainer(partitionId); - if (createRecordStoreOnDemand) { + if (isMapFlagSet(BITMASK_CREATE_RECORD_STORE_ON_DEMAND)) { return partitionContainer.getRecordStore(name); } else { return partitionContainer.getExistingRecordStore(name); @@ -196,7 +225,7 @@ public void logError(Throwable e) { } void disposeDeferredBlocks() { - if (!disposeDeferredBlocks + if (!isMapFlagSet(BITMASK_DISPOSE_DEFERRED_BLOCK) || recordStore == null || recordStore.getInMemoryFormat() != NATIVE) { return; @@ -317,7 +346,7 @@ protected final void publishWanUpdate(Data dataKey, Object value) { } private void publishWanUpdateInternal(Data dataKey, Object value, boolean hasLoadProvenance) { - if (!canPublishWanEvent) { + if (!isMapFlagSet(BITMASK_CAN_PUBLISH_WAN_EVENT)) { return; } @@ -340,7 +369,7 @@ protected final void publishLoadAsWanUpdate(Data dataKey, Object value) { } protected final void publishWanRemove(@Nonnull Data dataKey) { - if (!canPublishWanEvent) { + if (!isMapFlagSet(BITMASK_CAN_PUBLISH_WAN_EVENT)) { return; } diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MapOperationProvider.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MapOperationProvider.java index ccf45c7d91d20..4bf2e307ea4ca 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MapOperationProvider.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MapOperationProvider.java @@ -16,12 +16,12 @@ package com.hazelcast.map.impl.operation; +import com.hazelcast.internal.iteration.IterationPointer; +import com.hazelcast.internal.serialization.Data; import com.hazelcast.map.EntryProcessor; import com.hazelcast.map.IMap; import com.hazelcast.map.impl.MapEntries; -import com.hazelcast.internal.iteration.IterationPointer; import com.hazelcast.map.impl.query.Query; -import com.hazelcast.internal.serialization.Data; import com.hazelcast.query.Predicate; import com.hazelcast.spi.impl.operationservice.Operation; import com.hazelcast.spi.impl.operationservice.OperationFactory; diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java index 2df55bd0b3dba..36cc7a260153c 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java @@ -21,6 +21,7 @@ import com.hazelcast.internal.eviction.ExpiredKey; import com.hazelcast.internal.nearcache.impl.invalidation.InvalidationQueue; import com.hazelcast.internal.serialization.Data; +import com.hazelcast.internal.util.ExceptionUtil; import com.hazelcast.internal.util.MapUtil; import com.hazelcast.logging.ILogger; import com.hazelcast.map.impl.ExpirationTimeSetter; @@ -258,12 +259,17 @@ public final void evictExpiredEntries(final int percentage, final long now, fina // 2. Do scanning and evict expired keys. int scannedCount = 0; int expiredCount = 0; - long scanLoopStartNanos = System.nanoTime(); - do { - scannedCount += findExpiredKeys(now, backup); - expiredCount += evictExpiredKeys(backup); - } while (scannedCount < maxScannableCount && getOrInitCachedIterator().hasNext() - && (System.nanoTime() - scanLoopStartNanos) < expiredKeyScanTimeoutNanos); + try { + long scanLoopStartNanos = System.nanoTime(); + do { + scannedCount += findExpiredKeys(now, backup); + expiredCount += evictExpiredKeys(backup); + } while (scannedCount < maxScannableCount && getOrInitCachedIterator().hasNext() + && (System.nanoTime() - scanLoopStartNanos) < expiredKeyScanTimeoutNanos); + } catch (Exception e) { + BATCH_OF_EXPIRED.get().clear(); + throw ExceptionUtil.rethrow(e); + } // 3. Send expired keys to backups(only valid for max-idle-expiry) tryToSendBackupExpiryOp(); diff --git a/pom.xml b/pom.xml index 8b29ecedf02b9..d028f64a38006 100644 --- a/pom.xml +++ b/pom.xml @@ -31,7 +31,6 @@ hazelcast hazelcast-spring - hazelcast-spring-tests hazelcast-build-utils hazelcast-sql-core hazelcast-sql