diff --git a/hazelcast-client/src/test/java/com/hazelcast/client/map/impl/nearcache/MapNearCacheInvalidationFromClientTest.java b/hazelcast-client/src/test/java/com/hazelcast/client/map/impl/nearcache/MapNearCacheInvalidationFromClientTest.java index dac538ec6078..085b7fc76fc5 100644 --- a/hazelcast-client/src/test/java/com/hazelcast/client/map/impl/nearcache/MapNearCacheInvalidationFromClientTest.java +++ b/hazelcast-client/src/test/java/com/hazelcast/client/map/impl/nearcache/MapNearCacheInvalidationFromClientTest.java @@ -22,15 +22,18 @@ import com.hazelcast.config.NearCacheConfig; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IMap; +import com.hazelcast.core.TransactionalMap; import com.hazelcast.internal.nearcache.NearCache; import com.hazelcast.map.impl.MapService; import com.hazelcast.map.impl.MapServiceContext; import com.hazelcast.map.impl.nearcache.MapNearCacheManager; +import com.hazelcast.spi.NodeEngine; import com.hazelcast.test.AssertTask; import com.hazelcast.test.HazelcastParallelClassRunner; import com.hazelcast.test.HazelcastTestSupport; import com.hazelcast.test.annotation.ParallelTest; import com.hazelcast.test.annotation.QuickTest; +import com.hazelcast.transaction.TransactionContext; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -42,6 +45,7 @@ import static com.hazelcast.spi.properties.GroupProperty.MAP_INVALIDATION_MESSAGE_BATCH_FREQUENCY_SECONDS; import static com.hazelcast.spi.properties.GroupProperty.MAP_INVALIDATION_MESSAGE_BATCH_SIZE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -50,11 +54,12 @@ @Category({QuickTest.class, ParallelTest.class}) public class MapNearCacheInvalidationFromClientTest extends HazelcastTestSupport { - private String mapName; + private static final int ENTRY_COUNT = 100; + private String mapName; private TestHazelcastFactory factory; - private HazelcastInstance lite; + private HazelcastInstance liteMember; private HazelcastInstance client; @Before @@ -63,8 +68,7 @@ public void init() { factory = new TestHazelcastFactory(); factory.newHazelcastInstance(createServerConfig(mapName, false)); - - lite = factory.newHazelcastInstance(createServerConfig(mapName, true)); + liteMember = factory.newHazelcastInstance(createServerConfig(mapName, true)); client = factory.newHazelcastClient(); } @@ -76,19 +80,16 @@ public void tearDown() { @Test public void testPut() { IMap map = client.getMap(mapName); - - int count = 100; - for (int i = 0; i < count; i++) { + for (int i = 0; i < ENTRY_COUNT; i++) { map.put(i, i); } - IMap liteMap = lite.getMap(mapName); - - for (int i = 0; i < count; i++) { + IMap liteMap = liteMember.getMap(mapName); + for (int i = 0; i < ENTRY_COUNT; i++) { assertNotNull(liteMap.get(i)); } - NearCache nearCache = getNearCache(lite, mapName); + NearCache nearCache = getNearCache(liteMember, mapName); int sizeAfterPut = nearCache.size(); assertTrue("Near Cache size should be > 0 but was " + sizeAfterPut, sizeAfterPut > 0); } @@ -96,27 +97,23 @@ public void testPut() { @Test public void testClear() { IMap map = client.getMap(mapName); - - final int count = 100; - for (int i = 0; i < count; i++) { + for (int i = 0; i < ENTRY_COUNT; i++) { map.put(i, i); } - final IMap liteMap = lite.getMap(mapName); - final NearCache nearCache = getNearCache(lite, mapName); - + final IMap liteMap = liteMember.getMap(mapName); + final NearCache nearCache = getNearCache(liteMember, mapName); assertTrueEventually(new AssertTask() { @Override public void run() { - for (int i = 0; i < count; i++) { + for (int i = 0; i < ENTRY_COUNT; i++) { liteMap.get(i); } - assertEquals(count, nearCache.size()); + assertEquals(ENTRY_COUNT, nearCache.size()); } }); map.clear(); - assertTrueEventually(new AssertTask() { @Override public void run() { @@ -128,31 +125,27 @@ public void run() { @Test public void testEvictAll() { IMap map = client.getMap(mapName); - - final int count = 100; - for (int i = 0; i < count; i++) { + for (int i = 0; i < ENTRY_COUNT; i++) { map.put(i, i); } - final IMap liteMap = lite.getMap(mapName); - final NearCache nearCache = getNearCache(lite, mapName); - + final IMap liteMap = liteMember.getMap(mapName); + final NearCache nearCache = getNearCache(liteMember, mapName); assertTrueEventually(new AssertTask() { @Override public void run() { - for (int i = 0; i < count; i++) { + for (int i = 0; i < ENTRY_COUNT; i++) { liteMap.get(i); } - assertEquals(count, nearCache.size()); + assertEquals(ENTRY_COUNT, nearCache.size()); } }); map.evictAll(); - assertTrueEventually(new AssertTask() { @Override public void run() { - assertTrue(nearCache.size() < count); + assertTrue(nearCache.size() < ENTRY_COUNT); } }); } @@ -160,31 +153,27 @@ public void run() { @Test public void testEvict() { IMap map = client.getMap(mapName); - - final int count = 100; - for (int i = 0; i < count; i++) { + for (int i = 0; i < ENTRY_COUNT; i++) { map.put(i, i); } - final IMap liteMap = lite.getMap(mapName); - final NearCache nearCache = getNearCache(lite, mapName); - + final IMap liteMap = liteMember.getMap(mapName); + final NearCache nearCache = getNearCache(liteMember, mapName); assertTrueEventually(new AssertTask() { @Override public void run() { - for (int i = 0; i < count; i++) { + for (int i = 0; i < ENTRY_COUNT; i++) { liteMap.get(i); } - assertEquals(count, nearCache.size()); + assertEquals(ENTRY_COUNT, nearCache.size()); } }); map.evict(0); - assertTrueEventually(new AssertTask() { @Override public void run() { - assertTrue(nearCache.size() < count); + assertTrue(nearCache.size() < ENTRY_COUNT); } }); } @@ -194,9 +183,8 @@ public void testUpdate() { IMap map = client.getMap(mapName); map.put(1, 1); - final IMap liteMap = lite.getMap(mapName); - final NearCache nearCache = getNearCache(lite, mapName); - + final IMap liteMap = liteMember.getMap(mapName); + final NearCache nearCache = getNearCache(liteMember, mapName); assertTrueEventually(new AssertTask() { @Override public void run() { @@ -206,7 +194,6 @@ public void run() { }); map.put(1, 2); - assertTrueEventually(new AssertTask() { @Override public void run() { @@ -220,9 +207,8 @@ public void testRemove() { IMap map = client.getMap(mapName); map.put(1, 1); - final IMap liteMap = lite.getMap(mapName); - final NearCache nearCache = getNearCache(lite, mapName); - + final IMap liteMap = liteMember.getMap(mapName); + final NearCache nearCache = getNearCache(liteMember, mapName); assertTrueEventually(new AssertTask() { @Override public void run() { @@ -232,7 +218,6 @@ public void run() { }); map.remove(1); - assertTrueEventually(new AssertTask() { @Override public void run() { @@ -241,6 +226,31 @@ public void run() { }); } + @Test + public void testWithTransactionalMap() { + TransactionContext context = client.newTransactionContext(); + context.beginTransaction(); + try { + TransactionalMap txnMap = context.getMap(mapName); + assertNull("Expected null for a non-existent key", txnMap.get("key")); + assertFalse("Expected non-existent key not to be found", txnMap.containsKey("key")); + + assertNull("Expected no old value for new key", txnMap.put("key", "value")); + assertEquals("Expected value for existing key", "value", txnMap.get("key")); + assertTrue("Expected existing key to be found", txnMap.containsKey("key")); + + assertEquals("Expected value when removing existing key", "value", txnMap.remove("key")); + assertNull("Expected null for a non-existent key", txnMap.get("key")); + assertFalse("Expected non-existent key not to be found", txnMap.containsKey("key")); + } finally { + context.rollbackTransaction(); + } + + IMap map = client.getMap(mapName); + assertNull("Expected null for a non-existent key", map.get("key")); + assertFalse("Expected non-existent key not to be found", map.containsKey("key")); + } + private Config createServerConfig(String mapName, boolean liteMember) { NearCacheConfig nearCacheConfig = new NearCacheConfig() .setInvalidateOnChange(true); @@ -256,15 +266,15 @@ private Config createServerConfig(String mapName, boolean liteMember) { .addMapConfig(mapConfig); } - @SuppressWarnings("unchecked") private NearCache getNearCache(HazelcastInstance instance, String mapName) { - MapServiceContext mapServiceContext = getMapService(instance).getMapServiceContext(); + NodeEngine nodeEngine = getNodeEngineImpl(instance); + MapServiceContext mapServiceContext = getMapService(nodeEngine).getMapServiceContext(); MapNearCacheManager mapNearCacheManager = mapServiceContext.getMapNearCacheManager(); - NearCacheConfig nearCacheConfig = getNodeEngineImpl(instance).getConfig().findMapConfig(mapName).getNearCacheConfig(); - return mapNearCacheManager.getOrCreateNearCache(mapName, nearCacheConfig); + MapConfig mapConfig = nodeEngine.getConfig().findMapConfig(mapName); + return mapNearCacheManager.getOrCreateNearCache(mapName, mapConfig.getNearCacheConfig()); } - private MapService getMapService(HazelcastInstance instance) { - return getNodeEngineImpl(instance).getService(MapService.SERVICE_NAME); + private MapService getMapService(NodeEngine nodeEngine) { + return nodeEngine.getService(MapService.SERVICE_NAME); } } diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxy.java b/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxy.java index 321d14c39e8c..7aade4dbfbcb 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxy.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxy.java @@ -48,7 +48,8 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; /** - * Proxy implementation of {@link com.hazelcast.core.TransactionalMap} interface. + * Proxy implementation of {@link com.hazelcast.core.TransactionalMap} + * interface. */ public class TransactionalMapProxy extends TransactionalMapProxySupport implements TransactionalMap { @@ -63,12 +64,13 @@ public boolean containsKey(Object key) { checkTransactionState(); checkNotNull(key, "key can't be null"); - Data keyData = mapServiceContext.toData(key, partitionStrategy); - TxnValueWrapper valueWrapper = txMap.get(keyData); + Data dataKey = ss.toData(key, partitionStrategy); + + TxnValueWrapper valueWrapper = txMap.get(dataKey); if (valueWrapper != null) { return (valueWrapper.type != Type.REMOVED); } - return containsKeyInternal(keyData, key); + return containsKeyInternal(key, dataKey); } @Override @@ -100,14 +102,13 @@ public Object get(Object key) { checkTransactionState(); checkNotNull(key, "key can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); + Data dataKey = ss.toData(key, partitionStrategy); - TxnValueWrapper currentValue = txMap.get(keyData); + TxnValueWrapper currentValue = txMap.get(dataKey); if (currentValue != null) { return checkIfRemoved(currentValue); } - return toObjectIfNeeded(getInternal(nearCacheKey, keyData)); + return toObjectIfNeeded(getInternal(key, dataKey)); } @Override @@ -115,14 +116,14 @@ public Object getForUpdate(Object key) { checkTransactionState(); checkNotNull(key, "key can't be null"); - Data keyData = mapServiceContext.toData(key, partitionStrategy); + Data dataKey = ss.toData(key, partitionStrategy); - TxnValueWrapper currentValue = txMap.get(keyData); + TxnValueWrapper currentValue = txMap.get(dataKey); if (currentValue != null) { return checkIfRemoved(currentValue); } - return toObjectIfNeeded(getForUpdateInternal(keyData)); + return toObjectIfNeeded(getForUpdateInternal(dataKey)); } @Override @@ -136,18 +137,17 @@ public Object put(Object key, Object value, long ttl, TimeUnit timeUnit) { checkNotNull(key, "key can't be null"); checkNotNull(value, "value can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); + Data dataKey = ss.toData(key, partitionStrategy); try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); - Object valueBeforeTxn = toObjectIfNeeded(putInternal(keyData, mapServiceContext.toData(value), ttl, timeUnit)); + Object valueBeforeTxn = toObjectIfNeeded(putInternal(dataKey, ss.toData(value), ttl, timeUnit)); - TxnValueWrapper currentValue = txMap.get(keyData); + TxnValueWrapper currentValue = txMap.get(dataKey); Type type = valueBeforeTxn == null ? Type.NEW : Type.UPDATED; TxnValueWrapper wrapper = new TxnValueWrapper(value, type); - txMap.put(keyData, wrapper); + txMap.put(dataKey, wrapper); return currentValue == null ? valueBeforeTxn : checkIfRemoved(currentValue); } finally { - invalidateNearCache(nearCacheKey); + invalidateNearCache(key, dataKey); } } @@ -157,15 +157,14 @@ public void set(Object key, Object value) { checkNotNull(key, "key can't be null"); checkNotNull(value, "value can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); + Data dataKey = ss.toData(key, partitionStrategy); try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); - Data dataBeforeTxn = putInternal(keyData, mapServiceContext.toData(value), -1, MILLISECONDS); + Data dataBeforeTxn = putInternal(dataKey, ss.toData(value), -1, MILLISECONDS); Type type = dataBeforeTxn == null ? Type.NEW : Type.UPDATED; TxnValueWrapper wrapper = new TxnValueWrapper(value, type); - txMap.put(keyData, wrapper); + txMap.put(dataKey, wrapper); } finally { - invalidateNearCache(nearCacheKey); + invalidateNearCache(key, dataKey); } } @@ -175,27 +174,26 @@ public Object putIfAbsent(Object key, Object value) { checkNotNull(key, "key can't be null"); checkNotNull(value, "value can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); + Data dataKey = ss.toData(key, partitionStrategy); try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); - TxnValueWrapper wrapper = txMap.get(keyData); + TxnValueWrapper wrapper = txMap.get(dataKey); boolean haveTxnPast = wrapper != null; if (haveTxnPast) { if (wrapper.type != Type.REMOVED) { return wrapper.value; } - putInternal(keyData, mapServiceContext.toData(value), -1, MILLISECONDS); - txMap.put(keyData, new TxnValueWrapper(value, Type.NEW)); + putInternal(dataKey, ss.toData(value), -1, MILLISECONDS); + txMap.put(dataKey, new TxnValueWrapper(value, Type.NEW)); return null; } else { - Data oldValue = putIfAbsentInternal(keyData, mapServiceContext.toData(value)); + Data oldValue = putIfAbsentInternal(dataKey, ss.toData(value)); if (oldValue == null) { - txMap.put(keyData, new TxnValueWrapper(value, Type.NEW)); + txMap.put(dataKey, new TxnValueWrapper(value, Type.NEW)); } return toObjectIfNeeded(oldValue); } } finally { - invalidateNearCache(nearCacheKey); + invalidateNearCache(key, dataKey); } } @@ -205,28 +203,26 @@ public Object replace(Object key, Object value) { checkNotNull(key, "key can't be null"); checkNotNull(value, "value can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); + Data dataKey = ss.toData(key, partitionStrategy); try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); - - TxnValueWrapper wrapper = txMap.get(keyData); + TxnValueWrapper wrapper = txMap.get(dataKey); boolean haveTxnPast = wrapper != null; if (haveTxnPast) { if (wrapper.type == Type.REMOVED) { return null; } - putInternal(keyData, mapServiceContext.toData(value), -1, MILLISECONDS); - txMap.put(keyData, new TxnValueWrapper(value, Type.UPDATED)); + putInternal(dataKey, ss.toData(value), -1, MILLISECONDS); + txMap.put(dataKey, new TxnValueWrapper(value, Type.UPDATED)); return wrapper.value; } else { - Data oldValue = replaceInternal(keyData, mapServiceContext.toData(value)); + Data oldValue = replaceInternal(dataKey, ss.toData(value)); if (oldValue != null) { - txMap.put(keyData, new TxnValueWrapper(value, Type.UPDATED)); + txMap.put(dataKey, new TxnValueWrapper(value, Type.UPDATED)); } return toObjectIfNeeded(oldValue); } } finally { - invalidateNearCache(nearCacheKey); + invalidateNearCache(key, dataKey); } } @@ -237,29 +233,26 @@ public boolean replace(Object key, Object oldValue, Object newValue) { checkNotNull(oldValue, "oldValue can't be null"); checkNotNull(newValue, "newValue can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); + Data dataKey = ss.toData(key, partitionStrategy); try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); - - TxnValueWrapper wrapper = txMap.get(keyData); + TxnValueWrapper wrapper = txMap.get(dataKey); boolean haveTxnPast = wrapper != null; if (haveTxnPast) { if (!wrapper.value.equals(oldValue)) { return false; } - putInternal(keyData, mapServiceContext.toData(newValue), -1, MILLISECONDS); - txMap.put(keyData, new TxnValueWrapper(wrapper.value, Type.UPDATED)); + putInternal(dataKey, ss.toData(newValue), -1, MILLISECONDS); + txMap.put(dataKey, new TxnValueWrapper(wrapper.value, Type.UPDATED)); return true; } else { - boolean success = replaceIfSameInternal(keyData, - mapServiceContext.toData(oldValue), mapServiceContext.toData(newValue)); + boolean success = replaceIfSameInternal(dataKey, ss.toData(oldValue), ss.toData(newValue)); if (success) { - txMap.put(keyData, new TxnValueWrapper(newValue, Type.UPDATED)); + txMap.put(dataKey, new TxnValueWrapper(newValue, Type.UPDATED)); } return success; } } finally { - invalidateNearCache(nearCacheKey); + invalidateNearCache(key, dataKey); } } @@ -269,16 +262,14 @@ public boolean remove(Object key, Object value) { checkNotNull(key, "key can't be null"); checkNotNull(value, "value can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); + Data dataKey = ss.toData(key, partitionStrategy); try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); - - TxnValueWrapper wrapper = txMap.get(keyData); + TxnValueWrapper wrapper = txMap.get(dataKey); // wrapper is null which means this entry is not touched by transaction if (wrapper == null) { - boolean removed = removeIfSameInternal(keyData, value); + boolean removed = removeIfSameInternal(dataKey, value); if (removed) { - txMap.put(keyData, new TxnValueWrapper(value, Type.REMOVED)); + txMap.put(dataKey, new TxnValueWrapper(value, Type.REMOVED)); } return removed; } @@ -291,13 +282,12 @@ public boolean remove(Object key, Object value) { return false; } // wrapper value is equal to passed value, we call removeInternal just to add delete log - removeInternal(keyData); - txMap.put(keyData, new TxnValueWrapper(value, Type.REMOVED)); + removeInternal(dataKey); + txMap.put(dataKey, new TxnValueWrapper(value, Type.REMOVED)); + return true; } finally { - invalidateNearCache(nearCacheKey); + invalidateNearCache(key, dataKey); } - - return true; } @Override @@ -305,18 +295,17 @@ public Object remove(Object key) { checkTransactionState(); checkNotNull(key, "key can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); + Data dataKey = ss.toData(key, partitionStrategy); try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); - Object valueBeforeTxn = toObjectIfNeeded(removeInternal(keyData)); + Object valueBeforeTxn = toObjectIfNeeded(removeInternal(dataKey)); TxnValueWrapper wrapper = null; - if (valueBeforeTxn != null || txMap.containsKey(keyData)) { - wrapper = txMap.put(keyData, new TxnValueWrapper(valueBeforeTxn, Type.REMOVED)); + if (valueBeforeTxn != null || txMap.containsKey(dataKey)) { + wrapper = txMap.put(dataKey, new TxnValueWrapper(valueBeforeTxn, Type.REMOVED)); } return wrapper == null ? valueBeforeTxn : checkIfRemoved(wrapper); } finally { - invalidateNearCache(nearCacheKey); + invalidateNearCache(key, dataKey); } } @@ -325,15 +314,14 @@ public void delete(Object key) { checkTransactionState(); checkNotNull(key, "key can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); + Data dataKey = ss.toData(key, partitionStrategy); try { - Data keyData = mapServiceContext.toData(key, partitionStrategy); - Data data = removeInternal(keyData); - if (data != null || txMap.containsKey(keyData)) { - txMap.put(keyData, new TxnValueWrapper(toObjectIfNeeded(data), Type.REMOVED)); + Data data = removeInternal(dataKey); + if (data != null || txMap.containsKey(dataKey)) { + txMap.put(dataKey, new TxnValueWrapper(toObjectIfNeeded(data), Type.REMOVED)); } } finally { - invalidateNearCache(nearCacheKey); + invalidateNearCache(key, dataKey); } } @@ -365,15 +353,15 @@ public Set keySet(Predicate predicate) { // meanwhile remove keys which are not in txMap returningKeySet.remove(toObjectIfNeeded(entry.getKey())); } else { - Data keyData = entry.getKey(); + Data dataKey = entry.getKey(); if (predicate == TruePredicate.INSTANCE) { - returningKeySet.add(toObjectIfNeeded(keyData)); + returningKeySet.add(toObjectIfNeeded(dataKey)); } else { - cachedQueryEntry.init(ss, keyData, entry.getValue().value, extractors); + cachedQueryEntry.init(ss, dataKey, entry.getValue().value, extractors); // apply predicate on txMap if (predicate.apply(cachedQueryEntry)) { - returningKeySet.add(toObjectIfNeeded(keyData)); + returningKeySet.add(toObjectIfNeeded(dataKey)); } } } diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxySupport.java b/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxySupport.java index d69fb9ee0f6c..0ae07fd8d419 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxySupport.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxySupport.java @@ -49,7 +49,8 @@ import static com.hazelcast.util.ExceptionUtil.rethrow; /** - * Base class contains proxy helper methods for {@link com.hazelcast.map.impl.tx.TransactionalMapProxy} + * Base class contains proxy helper methods for + * {@link com.hazelcast.map.impl.tx.TransactionalMapProxy}. */ public abstract class TransactionalMapProxySupport extends TransactionalDistributedObject { @@ -86,7 +87,7 @@ public abstract class TransactionalMapProxySupport extends TransactionalDistribu } @Override - public String getName() { + public final String getName() { return name; } @@ -95,19 +96,19 @@ public final String getServiceName() { return SERVICE_NAME; } - boolean isEquals(Object value1, Object value2) { + final boolean isEquals(Object value1, Object value2) { return recordComparator.isEqual(value1, value2); } - void checkTransactionState() { + final void checkTransactionState() { if (!tx.getState().equals(Transaction.State.ACTIVE)) { throw new TransactionNotActiveException("Transaction is not active!"); } } - boolean containsKeyInternal(Data dataKey, Object objectKey) { + final boolean containsKeyInternal(Object key, Data dataKey) { if (nearCacheEnabled) { - Object nearCacheKey = serializeKeys ? dataKey : objectKey; + Object nearCacheKey = toNearCacheKeyWithStrategy(key, dataKey); Object cachedValue = getCachedValue(nearCacheKey, false); if (cachedValue != NOT_CACHED) { return cachedValue != null; @@ -125,17 +126,18 @@ boolean containsKeyInternal(Data dataKey, Object objectKey) { } } - Object getInternal(Object nearCacheKey, Data keyData) { + final Object getInternal(Object key, Data dataKey) { if (nearCacheEnabled) { + Object nearCacheKey = toNearCacheKeyWithStrategy(key, dataKey); Object value = getCachedValue(nearCacheKey, true); if (value != NOT_CACHED) { return value; } } - MapOperation operation = operationProvider.createGetOperation(name, keyData); + MapOperation operation = operationProvider.createGetOperation(name, dataKey); operation.setThreadId(ThreadUtil.getThreadId()); - int partitionId = partitionService.getPartitionId(keyData); + int partitionId = partitionService.getPartitionId(dataKey); try { Future future = operationService.createInvocationBuilder(SERVICE_NAME, operation, partitionId) .setResultDeserialized(false) @@ -146,54 +148,13 @@ Object getInternal(Object nearCacheKey, Data keyData) { } } - final Object toNearCacheKeyWithStrategy(Object key) { - if (!nearCacheEnabled) { - return key; - } - - return serializeKeys ? ss.toData(key, partitionStrategy) : key; - } - - final void invalidateNearCache(Object nearCacheKey) { - if (!nearCacheEnabled) { - return; - } - if (nearCacheKey == null) { - return; - } - NearCache nearCache = mapNearCacheManager.getNearCache(name); - if (nearCache == null) { - return; - } - - nearCache.invalidate(nearCacheKey); - } - - private Object getCachedValue(Object nearCacheKey, boolean deserializeValue) { - NearCache nearCache = mapNearCacheManager.getNearCache(name); - if (nearCache == null) { - return NOT_CACHED; - } - - Object value = nearCache.get(nearCacheKey); - if (value == null) { - return NOT_CACHED; - } - if (value == CACHED_AS_NULL) { - return null; - } - - mapServiceContext.interceptAfterGet(name, value); - return deserializeValue ? ss.toObject(value) : value; - } - - Object getForUpdateInternal(Data key) { + final Object getForUpdateInternal(Data key) { VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis(), true); addUnlockTransactionRecord(key, versionedValue.version); return versionedValue.value; } - int sizeInternal() { + final int sizeInternal() { try { OperationFactory sizeOperationFactory = operationProvider.createMapSizeOperationFactory(name); Map results = operationService.invokeOnAllPartitions(SERVICE_NAME, sizeOperationFactory); @@ -208,7 +169,7 @@ int sizeInternal() { } } - Data putInternal(Data key, Data value, long ttl, TimeUnit timeUnit) { + final Data putInternal(Data key, Data value, long ttl, TimeUnit timeUnit) { VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis()); long timeInMillis = getTimeInMillis(ttl, timeUnit); MapOperation operation = operationProvider.createTxnSetOperation(name, key, value, versionedValue.version, timeInMillis); @@ -216,7 +177,7 @@ Data putInternal(Data key, Data value, long ttl, TimeUnit timeUnit) { return versionedValue.value; } - Data putIfAbsentInternal(Data key, Data value) { + final Data putIfAbsentInternal(Data key, Data value) { boolean unlockImmediately = !valueMap.containsKey(key); VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis()); if (versionedValue.value != null) { @@ -233,7 +194,7 @@ Data putIfAbsentInternal(Data key, Data value) { return versionedValue.value; } - Data replaceInternal(Data key, Data value) { + final Data replaceInternal(Data key, Data value) { boolean unlockImmediately = !valueMap.containsKey(key); VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis()); if (versionedValue.value == null) { @@ -249,7 +210,7 @@ Data replaceInternal(Data key, Data value) { return versionedValue.value; } - boolean replaceIfSameInternal(Data key, Object oldValue, Data newValue) { + final boolean replaceIfSameInternal(Data key, Object oldValue, Data newValue) { boolean unlockImmediately = !valueMap.containsKey(key); VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis()); if (!isEquals(oldValue, versionedValue.value)) { @@ -265,15 +226,14 @@ boolean replaceIfSameInternal(Data key, Object oldValue, Data newValue) { return true; } - Data removeInternal(Data key) { + final Data removeInternal(Data key) { VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis()); - tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), - operationProvider.createTxnDeleteOperation(name, key, versionedValue.version), - versionedValue.version, tx.getOwnerUuid())); + MapOperation operation = operationProvider.createTxnDeleteOperation(name, key, versionedValue.version); + tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), operation, versionedValue.version, tx.getOwnerUuid())); return versionedValue.value; } - boolean removeIfSameInternal(Data key, Object value) { + final boolean removeIfSameInternal(Data key, Object value) { boolean unlockImmediately = !valueMap.containsKey(key); VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis()); if (!isEquals(versionedValue.value, value)) { @@ -284,12 +244,48 @@ boolean removeIfSameInternal(Data key, Object value) { addUnlockTransactionRecord(key, versionedValue.version); return false; } - tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), - operationProvider.createTxnDeleteOperation(name, key, versionedValue.version), - versionedValue.version, tx.getOwnerUuid())); + MapOperation operation = operationProvider.createTxnDeleteOperation(name, key, versionedValue.version); + tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), operation, versionedValue.version, tx.getOwnerUuid())); return true; } + final void invalidateNearCache(Object key, Data dataKey) { + if (!nearCacheEnabled || key == null) { + return; + } + NearCache nearCache = mapNearCacheManager.getNearCache(name); + if (nearCache == null) { + return; + } + Object nearCacheKey = toNearCacheKeyWithStrategy(key, dataKey); + nearCache.invalidate(nearCacheKey); + } + + private Object toNearCacheKeyWithStrategy(Object key, Data dataKey) { + if (!nearCacheEnabled) { + return null; + } + return serializeKeys ? dataKey : ss.toObject(key); + } + + private Object getCachedValue(Object nearCacheKey, boolean deserializeValue) { + NearCache nearCache = mapNearCacheManager.getNearCache(name); + if (nearCache == null) { + return NOT_CACHED; + } + + Object value = nearCache.get(nearCacheKey); + if (value == null) { + return NOT_CACHED; + } + if (value == CACHED_AS_NULL) { + return null; + } + + mapServiceContext.interceptAfterGet(name, value); + return deserializeValue ? ss.toObject(value) : value; + } + private void unlock(Data key, VersionedValue versionedValue) { try { TxnUnlockOperation unlockOperation = new TxnUnlockOperation(name, key, versionedValue.version); @@ -327,8 +323,8 @@ private VersionedValue lockAndGet(Data key, long timeout, boolean shouldLoad) { return versionedValue; } boolean blockReads = tx.getTransactionType() == TransactionType.ONE_PHASE; - MapOperation operation = operationProvider.createTxnLockAndGetOperation(name, key, timeout, timeout, - tx.getOwnerUuid(), shouldLoad, blockReads); + MapOperation operation = operationProvider.createTxnLockAndGetOperation(name, key, timeout, timeout, tx.getOwnerUuid(), + shouldLoad, blockReads); operation.setThreadId(ThreadUtil.getThreadId()); try { int partitionId = partitionService.getPartitionId(key);