Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmetmircik committed Mar 31, 2022
1 parent f99016f commit 7cd25ab
Show file tree
Hide file tree
Showing 12 changed files with 680 additions and 121 deletions.
Expand Up @@ -22,7 +22,7 @@
import com.hazelcast.cache.impl.ICacheInternal;
import com.hazelcast.cache.impl.ICacheService;
import com.hazelcast.client.cache.impl.ClientCacheProxySupportUtil.EmptyCompletionListener;
import com.hazelcast.internal.nearcache.impl.NearCachingHook;
import com.hazelcast.internal.nearcache.impl.RemoteCallHook;
import com.hazelcast.client.impl.ClientDelegatingFuture;
import com.hazelcast.client.impl.clientside.ClientMessageDecoder;
import com.hazelcast.client.impl.protocol.ClientMessage;
Expand Down Expand Up @@ -891,7 +891,7 @@ protected void putAllInternal(Map<? extends K, ? extends V> userInputMap,
List<Map.Entry<Data, Data>>[] entriesPerPartition,
long startNanos) {
try {
NearCachingHook<K, V> nearCachingHook = createPutAllNearCachingHook(userInputMap.size());
RemoteCallHook<K, V> nearCachingHook = createPutAllNearCachingHook(userInputMap.size());
// first we fill entry set per partition
groupDataToPartitions(userInputMap, entriesPerPartition, nearCachingHook);
// then we invoke the operations and sync on completion of these operations
Expand All @@ -903,13 +903,13 @@ protected void putAllInternal(Map<? extends K, ? extends V> userInputMap,
}

// Overridden to inject hook for near cache population.
protected NearCachingHook<K, V> createPutAllNearCachingHook(int keySetSize) {
return NearCachingHook.EMPTY_HOOK;
protected RemoteCallHook<K, V> createPutAllNearCachingHook(int keySetSize) {
return RemoteCallHook.EMPTY_HOOK;
}

private void groupDataToPartitions(Map<? extends K, ? extends V> userInputMap,
List<Map.Entry<Data, Data>>[] entriesPerPartition,
NearCachingHook nearCachingHook) {
RemoteCallHook nearCachingHook) {
ClientPartitionService partitionService = getContext().getPartitionService();

for (Map.Entry<? extends K, ? extends V> entry : userInputMap.entrySet()) {
Expand All @@ -935,7 +935,7 @@ private void groupDataToPartitions(Map<? extends K, ? extends V> userInputMap,

protected void callPutAllSync(List<Map.Entry<Data, Data>>[] entriesPerPartition,
Data expiryPolicyData,
NearCachingHook<K, V> nearCachingHook, long startNanos) {
RemoteCallHook<K, V> nearCachingHook, long startNanos) {

List<ClientCacheProxySupportUtil.FutureEntriesTuple> futureEntriesTuples
= new ArrayList<>(entriesPerPartition.length);
Expand Down
Expand Up @@ -34,13 +34,15 @@
import com.hazelcast.internal.adapter.ICacheDataStructureAdapter;
import com.hazelcast.internal.nearcache.NearCache;
import com.hazelcast.internal.nearcache.NearCacheManager;
import com.hazelcast.internal.nearcache.impl.NearCachingHook;
import com.hazelcast.internal.nearcache.impl.RemoteCallHook;
import com.hazelcast.internal.nearcache.impl.invalidation.RepairingHandler;
import com.hazelcast.internal.nearcache.impl.invalidation.RepairingTask;
import com.hazelcast.internal.nio.Connection;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.spi.impl.operationservice.Operation;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.integration.CompletionListener;
import java.util.ArrayList;
Expand Down Expand Up @@ -467,18 +469,18 @@ protected boolean setExpiryPolicyInternal(K key, ExpiryPolicy expiryPolicy) {

@Override
protected void callPutAllSync(List<Map.Entry<Data, Data>>[] entriesPerPartition, Data expiryPolicyData,
NearCachingHook<K, V> nearCachingHook, long startNanos) {
RemoteCallHook<K, V> nearCachingHook, long startNanos) {
try {
super.callPutAllSync(entriesPerPartition, expiryPolicyData, nearCachingHook, startNanos);
nearCachingHook.onRemoteCallSuccess();
nearCachingHook.onRemoteCallSuccess(null);
} catch (Throwable t) {
nearCachingHook.onRemoteCallFailure();
throw rethrow(t);
}
}

@Override
protected NearCachingHook<K, V> createPutAllNearCachingHook(int keySetSize) {
protected RemoteCallHook<K, V> createPutAllNearCachingHook(int keySetSize) {
return cacheOnUpdate
? new PutAllCacheOnUpdateHook(keySetSize)
: new PutAllInvalidateHook(keySetSize);
Expand All @@ -491,7 +493,7 @@ protected NearCachingHook<K, V> createPutAllNearCachingHook(int keySetSize) {
*  
* Only used with putAll calls.
*/
private class PutAllCacheOnUpdateHook implements NearCachingHook<K, V> {
private class PutAllCacheOnUpdateHook implements RemoteCallHook<K, V> {
// Holds near-cache-key, near-cache-value and reservation-id
private final List<Object> keyValueId;

Expand All @@ -507,7 +509,7 @@ public void beforeRemoteCall(K key, Data keyData, V value, Data valueData) {
}

@Override
public void onRemoteCallSuccess() {
public void onRemoteCallSuccess(Operation remoteCall) {
for (int i = 0; i < keyValueId.size(); i += 3) {
Object nearCacheKey = keyValueId.get(i);
Object nearCacheValue = keyValueId.get(i + 1);
Expand Down Expand Up @@ -542,7 +544,7 @@ public void onRemoteCallFailure() {
*  
* Only used with putAll calls.
*/
private class PutAllInvalidateHook implements NearCachingHook<K, V> {
private class PutAllInvalidateHook implements RemoteCallHook<K, V> {

private final List<Object> nearCacheKeys;

Expand All @@ -556,15 +558,15 @@ public void beforeRemoteCall(K key, Data keyData, V value, Data valueData) {
}

@Override
public void onRemoteCallSuccess() {
public void onRemoteCallSuccess(@Nullable Operation remoteCall) {
for (Object nearCacheKey : nearCacheKeys) {
invalidateNearCache(nearCacheKey);
}
}

@Override
public void onRemoteCallFailure() {
onRemoteCallSuccess();
onRemoteCallSuccess(null);
}
}

Expand Down
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.internal.nearcache.impl;

import com.hazelcast.internal.serialization.Data;
import com.hazelcast.spi.impl.operationservice.Operation;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;

public class CompositeRemoteCallHook<K, V> implements RemoteCallHook<K, V> {

private final List<RemoteCallHook> hooks = new ArrayList<>();

public void add(RemoteCallHook newHook) {
hooks.add(newHook);
}

@Override
public void beforeRemoteCall(K key, Data keyData,
@Nullable V value, @Nullable Data valueData) {
for (int i = 0; i < hooks.size(); i++) {
hooks.get(i).beforeRemoteCall(key, keyData, value, valueData);
}
}

@Override
public void onRemoteCallSuccess(@Nullable Operation remoteCall) {
for (int i = 0; i < hooks.size(); i++) {
hooks.get(i).onRemoteCallSuccess(remoteCall);
}
}

@Override
public void onRemoteCallFailure() {
for (int i = 0; i < hooks.size(); i++) {
hooks.get(i).onRemoteCallFailure();
}
}
}
Expand Up @@ -18,26 +18,27 @@


import com.hazelcast.internal.serialization.Data;
import com.hazelcast.spi.impl.operationservice.Operation;

import javax.annotation.Nullable;

/**
* Hook to be used by near cache enabled proxy objects.
*
* With this hook, you can implement needed logic
* for truly invalidate/populate local near cache.
* Hook to be used by near cache invalidations or by statistics updates.
* <p>
* For instance, with this hook, you can implement needed
* logic for truly invalidate/populate local near cache.
*/
public interface NearCachingHook<K, V> {
public interface RemoteCallHook<K, V> {

NearCachingHook EMPTY_HOOK = new NearCachingHook() {
RemoteCallHook EMPTY_HOOK = new RemoteCallHook() {

@Override
public void beforeRemoteCall(Object key, Data keyData,
Object value, Data valueData) {
}

@Override
public void onRemoteCallSuccess() {
public void onRemoteCallSuccess(Operation remoteCall) {
}

@Override
Expand All @@ -46,9 +47,10 @@ public void onRemoteCallFailure() {
}
};

void beforeRemoteCall(K key, Data keyData, @Nullable V value, @Nullable Data valueData);
void beforeRemoteCall(K key, Data keyData,
@Nullable V value, @Nullable Data valueData);

void onRemoteCallSuccess();
void onRemoteCallSuccess(@Nullable Operation remoteCall);

void onRemoteCallFailure();
}
@@ -0,0 +1,93 @@
/*
* Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.map.impl;

import com.hazelcast.internal.monitor.impl.LocalMapStatsImpl;
import com.hazelcast.internal.util.Timer;
import com.hazelcast.map.impl.operation.BasePutOperation;
import com.hazelcast.map.impl.operation.BaseRemoveOperation;
import com.hazelcast.map.impl.operation.GetOperation;
import com.hazelcast.map.impl.operation.SetOperation;
import com.hazelcast.map.impl.tx.TxnDeleteOperation;
import com.hazelcast.map.impl.tx.TxnSetOperation;
import com.hazelcast.spi.impl.operationservice.Operation;

/**
* Helper to update map operation stats on caller side.
*/
public final class MapOperationStatsUpdater {

private MapOperationStatsUpdater() {
}

/**
* Updates stats upon operation-call
* on {@link com.hazelcast.map.IMap}
*/
public static void incrementOperationStats(Operation operation,
LocalMapStatsImpl localMapStats,
long startTimeNanos) {

long durationNanos = Timer.nanosElapsed(startTimeNanos);

if (operation instanceof SetOperation) {
localMapStats.incrementSetLatencyNanos(durationNanos);
return;
}

if (operation instanceof BasePutOperation) {
localMapStats.incrementPutLatencyNanos(durationNanos);
return;
}

if (operation instanceof BaseRemoveOperation) {
localMapStats.incrementRemoveLatencyNanos(durationNanos);
return;
}

if (operation instanceof GetOperation) {
localMapStats.incrementGetLatencyNanos(durationNanos);
return;
}
}

/**
* Updates stats upon operation-call on {@link
* com.hazelcast.transaction.TransactionalMap}
*/
public static void incrementTxnOperationStats(Operation operation,
LocalMapStatsImpl localMapStats,
long startTimeNanos) {

long durationNanos = Timer.nanosElapsed(startTimeNanos);

if (operation instanceof TxnSetOperation) {
localMapStats.incrementSetLatencyNanos(durationNanos);
return;
}

if (operation instanceof TxnDeleteOperation) {
localMapStats.incrementRemoveLatencyNanos(durationNanos);
return;
}

if (operation instanceof GetOperation) {
localMapStats.incrementGetLatencyNanos(durationNanos);
return;
}
}
}
Expand Up @@ -21,7 +21,6 @@
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.PartitioningStrategyConfig;
import com.hazelcast.internal.eviction.ExpirationManager;
import com.hazelcast.internal.monitor.impl.LocalMapStatsImpl;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.util.collection.PartitionIdSet;
import com.hazelcast.internal.util.comparators.ValueComparator;
Expand All @@ -43,7 +42,6 @@
import com.hazelcast.query.impl.predicates.QueryOptimizer;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.eventservice.EventFilter;
import com.hazelcast.spi.impl.operationservice.Operation;

import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -173,8 +171,6 @@ void removeRecordStoresFromPartitionMatchingWith(Predicate<RecordStore> predicat

Extractors getExtractors(String mapName);

void incrementOperationStats(long startTime, LocalMapStatsImpl localMapStats, String mapName, Operation operation);

boolean removeMapContainer(MapContainer mapContainer);

PartitioningStrategy getPartitioningStrategy(String mapName, PartitioningStrategyConfig config);
Expand Down

0 comments on commit 7cd25ab

Please sign in to comment.