Skip to content

Commit

Permalink
get for client fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmetmircik committed Apr 14, 2021
1 parent 60b8f32 commit b3711cd
Show file tree
Hide file tree
Showing 11 changed files with 59 additions and 25 deletions.
Expand Up @@ -54,6 +54,7 @@ protected ClientMessage encodeResponse(Object response) {
protected Operation prepareOperation() {
MapOperationProvider operationProvider = getMapOperationProvider(parameters.name);
MapOperation operation = operationProvider.createGetOperation(parameters.name, parameters.key);
operation.setClientCall(true);
operation.setThreadId(parameters.threadId);
return operation;
}
Expand All @@ -74,7 +75,6 @@ protected Object processResponseBeforeSending(Object response) {
return response;
}


@Override
public String getServiceName() {
return MapService.SERVICE_NAME;
Expand Down
Expand Up @@ -27,7 +27,7 @@ public ClearBackupOperation() {

public ClearBackupOperation(String name) {
super(name);
createRecordStoreOnDemand = false;
setCreateRecordStoreOnDemand(false);
}

@Override
Expand Down
Expand Up @@ -37,7 +37,7 @@ public ClearOperation() {

public ClearOperation(String name) {
super(name);
createRecordStoreOnDemand = false;
setCreateRecordStoreOnDemand(false);
}

@Override
Expand Down
Expand Up @@ -184,7 +184,7 @@ public CallStatus call() {
}
// when offloading is enabled, left disposing
// to EntryOffloadableSetUnlockOperation
disposeDeferredBlocks = !offload;
setDisposeDeferredBlocks(!offload);

if (offload) {
return new EntryOperationOffload(getCallerAddress());
Expand Down
Expand Up @@ -30,7 +30,7 @@ public EvictAllBackupOperation() {

public EvictAllBackupOperation(String name) {
super(name);
createRecordStoreOnDemand = false;
setCreateRecordStoreOnDemand(false);
}

@Override
Expand Down
Expand Up @@ -43,7 +43,7 @@ public EvictAllOperation() {

public EvictAllOperation(String name) {
super(name);
createRecordStoreOnDemand = false;
setCreateRecordStoreOnDemand(false);
}

@Override
Expand Down
Expand Up @@ -54,7 +54,7 @@ public EvictBatchBackupOperation(String name,
this.name = name;
this.expiredKeys = expiredKeys;
this.primaryEntryCount = primaryEntryCount;
this.createRecordStoreOnDemand = false;
setCreateRecordStoreOnDemand(false);
}

@Override
Expand Down
Expand Up @@ -58,17 +58,18 @@ public abstract class MapOperation extends AbstractNamedOperation
implements IdentifiedDataSerializable, ServiceNamespaceAware {

private static final boolean ASSERTION_ENABLED = MapOperation.class.desiredAssertionStatus();
static final int BITMASK_CREATE_RECORD_STORE_ON_DEMAND = 1;
static final int BITMASK_DISPOSE_DEFERRED_BLOCK = 1 << 1;
static final int BITMASK_CAN_PUBLISH_WAN_EVENT = 1 << 2;
static final int BITMASK_IS_CLIENT_CALL = 1 << 3;

protected transient MapService mapService;
protected transient RecordStore<Record> recordStore;
protected transient MapContainer mapContainer;
protected transient MapServiceContext mapServiceContext;
protected transient MapEventPublisher mapEventPublisher;

protected transient boolean createRecordStoreOnDemand = true;
protected transient boolean disposeDeferredBlocks = true;

private transient boolean canPublishWanEvent;
private transient byte mapFlags = (byte) 0b00000011;

public MapOperation() {
}
Expand Down Expand Up @@ -97,13 +98,41 @@ public final void beforeRun() throws Exception {
throw rethrow(t, Exception.class);
}

canPublishWanEvent = canPublishWanEvent(mapContainer);
setMapFlag(canPublishWanEvent(mapContainer), BITMASK_CAN_PUBLISH_WAN_EVENT);

assertNativeMapOnPartitionThread();

innerBeforeRun();
}

public void setClientCall(boolean isClientCall) {
setMapFlag(isClientCall, BITMASK_IS_CLIENT_CALL);
}

protected boolean isClientCall() {
return isMapFlagSet(BITMASK_IS_CLIENT_CALL);
}

protected void setCreateRecordStoreOnDemand(boolean value) {
setMapFlag(value, BITMASK_CREATE_RECORD_STORE_ON_DEMAND);
}

protected void setDisposeDeferredBlocks(boolean value) {
setMapFlag(value, BITMASK_DISPOSE_DEFERRED_BLOCK);
}

private void setMapFlag(boolean value, int bitmask) {
if (value) {
mapFlags |= bitmask;
} else {
mapFlags &= ~bitmask;
}
}

private boolean isMapFlagSet(int bitmask) {
return (mapFlags & bitmask) != 0;
}

protected void innerBeforeRun() throws Exception {
// Intentionally empty method body.
// Concrete classes can override this method.
Expand Down Expand Up @@ -168,7 +197,7 @@ private RecordStore getRecordStoreOrNull() {
return null;
}
PartitionContainer partitionContainer = mapServiceContext.getPartitionContainer(partitionId);
if (createRecordStoreOnDemand) {
if (isMapFlagSet(BITMASK_CREATE_RECORD_STORE_ON_DEMAND)) {
return partitionContainer.getRecordStore(name);
} else {
return partitionContainer.getExistingRecordStore(name);
Expand Down Expand Up @@ -196,7 +225,7 @@ public void logError(Throwable e) {
}

void disposeDeferredBlocks() {
if (!disposeDeferredBlocks
if (!isMapFlagSet(BITMASK_DISPOSE_DEFERRED_BLOCK)
|| recordStore == null
|| recordStore.getInMemoryFormat() != NATIVE) {
return;
Expand Down Expand Up @@ -317,7 +346,7 @@ protected final void publishWanUpdate(Data dataKey, Object value) {
}

private void publishWanUpdateInternal(Data dataKey, Object value, boolean hasLoadProvenance) {
if (!canPublishWanEvent) {
if (!isMapFlagSet(BITMASK_CAN_PUBLISH_WAN_EVENT)) {
return;
}

Expand All @@ -340,7 +369,7 @@ protected final void publishLoadAsWanUpdate(Data dataKey, Object value) {
}

protected final void publishWanRemove(@Nonnull Data dataKey) {
if (!canPublishWanEvent) {
if (!isMapFlagSet(BITMASK_CAN_PUBLISH_WAN_EVENT)) {
return;
}

Expand Down
Expand Up @@ -16,12 +16,12 @@

package com.hazelcast.map.impl.operation;

import com.hazelcast.internal.iteration.IterationPointer;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.map.IMap;
import com.hazelcast.map.impl.MapEntries;
import com.hazelcast.internal.iteration.IterationPointer;
import com.hazelcast.map.impl.query.Query;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.query.Predicate;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.OperationFactory;
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.hazelcast.internal.eviction.ExpiredKey;
import com.hazelcast.internal.nearcache.impl.invalidation.InvalidationQueue;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.util.ExceptionUtil;
import com.hazelcast.internal.util.MapUtil;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.impl.ExpirationTimeSetter;
Expand Down Expand Up @@ -258,12 +259,17 @@ public final void evictExpiredEntries(final int percentage, final long now, fina
// 2. Do scanning and evict expired keys.
int scannedCount = 0;
int expiredCount = 0;
long scanLoopStartNanos = System.nanoTime();
do {
scannedCount += findExpiredKeys(now, backup);
expiredCount += evictExpiredKeys(backup);
} while (scannedCount < maxScannableCount && getOrInitCachedIterator().hasNext()
&& (System.nanoTime() - scanLoopStartNanos) < expiredKeyScanTimeoutNanos);
try {
long scanLoopStartNanos = System.nanoTime();
do {
scannedCount += findExpiredKeys(now, backup);
expiredCount += evictExpiredKeys(backup);
} while (scannedCount < maxScannableCount && getOrInitCachedIterator().hasNext()
&& (System.nanoTime() - scanLoopStartNanos) < expiredKeyScanTimeoutNanos);
} catch (Exception e) {
BATCH_OF_EXPIRED.get().clear();
throw ExceptionUtil.rethrow(e);
}

// 3. Send expired keys to backups(only valid for max-idle-expiry)
tryToSendBackupExpiryOp();
Expand Down
1 change: 0 additions & 1 deletion pom.xml
Expand Up @@ -31,7 +31,6 @@
<modules>
<module>hazelcast</module>
<module>hazelcast-spring</module>
<module>hazelcast-spring-tests</module>
<module>hazelcast-build-utils</module>
<module>hazelcast-sql-core</module>
<module>hazelcast-sql</module>
Expand Down

0 comments on commit b3711cd

Please sign in to comment.