From 49d057d58dc5961b080e25fc8c06550acf4a86d1 Mon Sep 17 00:00:00 2001 From: donnerbart Date: Mon, 2 Jul 2018 12:52:17 +0200 Subject: [PATCH 1/2] Replaced MapServiceContext serialization calls with SerializationService This removes one indirection, which makes inlining easier for the JVM. It also shortens the code a bit. --- .../map/impl/tx/TransactionalMapProxy.java | 39 +++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxy.java b/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxy.java index 321d14c39e8c..9710b3e2425d 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxy.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxy.java @@ -63,7 +63,7 @@ public boolean containsKey(Object key) { checkTransactionState(); checkNotNull(key, "key can't be null"); - Data keyData = mapServiceContext.toData(key, partitionStrategy); + Data keyData = ss.toData(key, partitionStrategy); TxnValueWrapper valueWrapper = txMap.get(keyData); if (valueWrapper != null) { return (valueWrapper.type != Type.REMOVED); @@ -101,7 +101,7 @@ public Object get(Object key) { checkNotNull(key, "key can't be null"); Object nearCacheKey = toNearCacheKeyWithStrategy(key); - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); + Data keyData = nearCacheKey instanceof Data ? (Data) nearCacheKey : ss.toData(key, partitionStrategy); TxnValueWrapper currentValue = txMap.get(keyData); if (currentValue != null) { @@ -115,7 +115,7 @@ public Object getForUpdate(Object key) { checkTransactionState(); checkNotNull(key, "key can't be null"); - Data keyData = mapServiceContext.toData(key, partitionStrategy); + Data keyData = ss.toData(key, partitionStrategy); TxnValueWrapper currentValue = txMap.get(keyData); if (currentValue != null) { @@ -138,8 +138,8 @@ public Object put(Object key, Object value, long ttl, TimeUnit timeUnit) { Object nearCacheKey = toNearCacheKeyWithStrategy(key); try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); - Object valueBeforeTxn = toObjectIfNeeded(putInternal(keyData, mapServiceContext.toData(value), ttl, timeUnit)); + Data keyData = ss.toData(nearCacheKey, partitionStrategy); + Object valueBeforeTxn = toObjectIfNeeded(putInternal(keyData, ss.toData(value), ttl, timeUnit)); TxnValueWrapper currentValue = txMap.get(keyData); Type type = valueBeforeTxn == null ? Type.NEW : Type.UPDATED; @@ -159,8 +159,8 @@ public void set(Object key, Object value) { Object nearCacheKey = toNearCacheKeyWithStrategy(key); try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); - Data dataBeforeTxn = putInternal(keyData, mapServiceContext.toData(value), -1, MILLISECONDS); + Data keyData = ss.toData(nearCacheKey, partitionStrategy); + Data dataBeforeTxn = putInternal(keyData, ss.toData(value), -1, MILLISECONDS); Type type = dataBeforeTxn == null ? Type.NEW : Type.UPDATED; TxnValueWrapper wrapper = new TxnValueWrapper(value, type); txMap.put(keyData, wrapper); @@ -177,18 +177,18 @@ public Object putIfAbsent(Object key, Object value) { Object nearCacheKey = toNearCacheKeyWithStrategy(key); try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); + Data keyData = ss.toData(nearCacheKey, partitionStrategy); TxnValueWrapper wrapper = txMap.get(keyData); boolean haveTxnPast = wrapper != null; if (haveTxnPast) { if (wrapper.type != Type.REMOVED) { return wrapper.value; } - putInternal(keyData, mapServiceContext.toData(value), -1, MILLISECONDS); + putInternal(keyData, ss.toData(value), -1, MILLISECONDS); txMap.put(keyData, new TxnValueWrapper(value, Type.NEW)); return null; } else { - Data oldValue = putIfAbsentInternal(keyData, mapServiceContext.toData(value)); + Data oldValue = putIfAbsentInternal(keyData, ss.toData(value)); if (oldValue == null) { txMap.put(keyData, new TxnValueWrapper(value, Type.NEW)); } @@ -207,7 +207,7 @@ public Object replace(Object key, Object value) { Object nearCacheKey = toNearCacheKeyWithStrategy(key); try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); + Data keyData = ss.toData(nearCacheKey, partitionStrategy); TxnValueWrapper wrapper = txMap.get(keyData); boolean haveTxnPast = wrapper != null; @@ -215,11 +215,11 @@ public Object replace(Object key, Object value) { if (wrapper.type == Type.REMOVED) { return null; } - putInternal(keyData, mapServiceContext.toData(value), -1, MILLISECONDS); + putInternal(keyData, ss.toData(value), -1, MILLISECONDS); txMap.put(keyData, new TxnValueWrapper(value, Type.UPDATED)); return wrapper.value; } else { - Data oldValue = replaceInternal(keyData, mapServiceContext.toData(value)); + Data oldValue = replaceInternal(keyData, ss.toData(value)); if (oldValue != null) { txMap.put(keyData, new TxnValueWrapper(value, Type.UPDATED)); } @@ -239,7 +239,7 @@ public boolean replace(Object key, Object oldValue, Object newValue) { Object nearCacheKey = toNearCacheKeyWithStrategy(key); try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); + Data keyData = ss.toData(nearCacheKey, partitionStrategy); TxnValueWrapper wrapper = txMap.get(keyData); boolean haveTxnPast = wrapper != null; @@ -247,12 +247,11 @@ public boolean replace(Object key, Object oldValue, Object newValue) { if (!wrapper.value.equals(oldValue)) { return false; } - putInternal(keyData, mapServiceContext.toData(newValue), -1, MILLISECONDS); + putInternal(keyData, ss.toData(newValue), -1, MILLISECONDS); txMap.put(keyData, new TxnValueWrapper(wrapper.value, Type.UPDATED)); return true; } else { - boolean success = replaceIfSameInternal(keyData, - mapServiceContext.toData(oldValue), mapServiceContext.toData(newValue)); + boolean success = replaceIfSameInternal(keyData, ss.toData(oldValue), ss.toData(newValue)); if (success) { txMap.put(keyData, new TxnValueWrapper(newValue, Type.UPDATED)); } @@ -271,7 +270,7 @@ public boolean remove(Object key, Object value) { Object nearCacheKey = toNearCacheKeyWithStrategy(key); try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); + Data keyData = ss.toData(nearCacheKey, partitionStrategy); TxnValueWrapper wrapper = txMap.get(keyData); // wrapper is null which means this entry is not touched by transaction @@ -307,7 +306,7 @@ public Object remove(Object key) { Object nearCacheKey = toNearCacheKeyWithStrategy(key); try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); + Data keyData = ss.toData(nearCacheKey, partitionStrategy); Object valueBeforeTxn = toObjectIfNeeded(removeInternal(keyData)); TxnValueWrapper wrapper = null; @@ -327,7 +326,7 @@ public void delete(Object key) { Object nearCacheKey = toNearCacheKeyWithStrategy(key); try { - Data keyData = mapServiceContext.toData(key, partitionStrategy); + Data keyData = ss.toData(key, partitionStrategy); Data data = removeInternal(keyData); if (data != null || txMap.containsKey(keyData)) { txMap.put(keyData, new TxnValueWrapper(toObjectIfNeeded(data), Type.REMOVED)); From 71d8ed3cde1db9211e1fb958c569136e206192ec Mon Sep 17 00:00:00 2001 From: donnerbart Date: Mon, 2 Jul 2018 12:52:17 +0200 Subject: [PATCH 2/2] Fixed TransactionalMapProxy with Near Cache The Near Cache key for a TransactionalMapProxy was not deserialized when it was in binary format, which happens when a Hazelcast client is used. Another problem was that the Near Cache key was created first, even if it was not used at all. Especially with this fix, we might run into an unused deserialization, when no Near Cache was configured. The order of read-only calls is: * read value from internal TXN map (always binary key) * read value from Near Cache (binary/object key) * read value from remote (always binary key) The Near Cache key creation has been moved behind the TXN map lookup. This prevents the key creation if it's not used at all. It uses the original and binary key, to prevent a duplicated (de)serialization. The order of write/remove calls is: * remove the value (always binary key) * invalidate the Near Cache (binary/object key) The Near Cache key creation has been moved to the invalidation method, so behind all checks if the Near Cache is enabled. This prevents the key creation if it's not used at all. --- ...apNearCacheInvalidationFromClientTest.java | 118 ++++++++------- .../map/impl/tx/TransactionalMapProxy.java | 139 ++++++++---------- .../impl/tx/TransactionalMapProxySupport.java | 128 ++++++++-------- 3 files changed, 190 insertions(+), 195 deletions(-) diff --git a/hazelcast-client/src/test/java/com/hazelcast/client/map/impl/nearcache/MapNearCacheInvalidationFromClientTest.java b/hazelcast-client/src/test/java/com/hazelcast/client/map/impl/nearcache/MapNearCacheInvalidationFromClientTest.java index dac538ec6078..085b7fc76fc5 100644 --- a/hazelcast-client/src/test/java/com/hazelcast/client/map/impl/nearcache/MapNearCacheInvalidationFromClientTest.java +++ b/hazelcast-client/src/test/java/com/hazelcast/client/map/impl/nearcache/MapNearCacheInvalidationFromClientTest.java @@ -22,15 +22,18 @@ import com.hazelcast.config.NearCacheConfig; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IMap; +import com.hazelcast.core.TransactionalMap; import com.hazelcast.internal.nearcache.NearCache; import com.hazelcast.map.impl.MapService; import com.hazelcast.map.impl.MapServiceContext; import com.hazelcast.map.impl.nearcache.MapNearCacheManager; +import com.hazelcast.spi.NodeEngine; import com.hazelcast.test.AssertTask; import com.hazelcast.test.HazelcastParallelClassRunner; import com.hazelcast.test.HazelcastTestSupport; import com.hazelcast.test.annotation.ParallelTest; import com.hazelcast.test.annotation.QuickTest; +import com.hazelcast.transaction.TransactionContext; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -42,6 +45,7 @@ import static com.hazelcast.spi.properties.GroupProperty.MAP_INVALIDATION_MESSAGE_BATCH_FREQUENCY_SECONDS; import static com.hazelcast.spi.properties.GroupProperty.MAP_INVALIDATION_MESSAGE_BATCH_SIZE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -50,11 +54,12 @@ @Category({QuickTest.class, ParallelTest.class}) public class MapNearCacheInvalidationFromClientTest extends HazelcastTestSupport { - private String mapName; + private static final int ENTRY_COUNT = 100; + private String mapName; private TestHazelcastFactory factory; - private HazelcastInstance lite; + private HazelcastInstance liteMember; private HazelcastInstance client; @Before @@ -63,8 +68,7 @@ public void init() { factory = new TestHazelcastFactory(); factory.newHazelcastInstance(createServerConfig(mapName, false)); - - lite = factory.newHazelcastInstance(createServerConfig(mapName, true)); + liteMember = factory.newHazelcastInstance(createServerConfig(mapName, true)); client = factory.newHazelcastClient(); } @@ -76,19 +80,16 @@ public void tearDown() { @Test public void testPut() { IMap map = client.getMap(mapName); - - int count = 100; - for (int i = 0; i < count; i++) { + for (int i = 0; i < ENTRY_COUNT; i++) { map.put(i, i); } - IMap liteMap = lite.getMap(mapName); - - for (int i = 0; i < count; i++) { + IMap liteMap = liteMember.getMap(mapName); + for (int i = 0; i < ENTRY_COUNT; i++) { assertNotNull(liteMap.get(i)); } - NearCache nearCache = getNearCache(lite, mapName); + NearCache nearCache = getNearCache(liteMember, mapName); int sizeAfterPut = nearCache.size(); assertTrue("Near Cache size should be > 0 but was " + sizeAfterPut, sizeAfterPut > 0); } @@ -96,27 +97,23 @@ public void testPut() { @Test public void testClear() { IMap map = client.getMap(mapName); - - final int count = 100; - for (int i = 0; i < count; i++) { + for (int i = 0; i < ENTRY_COUNT; i++) { map.put(i, i); } - final IMap liteMap = lite.getMap(mapName); - final NearCache nearCache = getNearCache(lite, mapName); - + final IMap liteMap = liteMember.getMap(mapName); + final NearCache nearCache = getNearCache(liteMember, mapName); assertTrueEventually(new AssertTask() { @Override public void run() { - for (int i = 0; i < count; i++) { + for (int i = 0; i < ENTRY_COUNT; i++) { liteMap.get(i); } - assertEquals(count, nearCache.size()); + assertEquals(ENTRY_COUNT, nearCache.size()); } }); map.clear(); - assertTrueEventually(new AssertTask() { @Override public void run() { @@ -128,31 +125,27 @@ public void run() { @Test public void testEvictAll() { IMap map = client.getMap(mapName); - - final int count = 100; - for (int i = 0; i < count; i++) { + for (int i = 0; i < ENTRY_COUNT; i++) { map.put(i, i); } - final IMap liteMap = lite.getMap(mapName); - final NearCache nearCache = getNearCache(lite, mapName); - + final IMap liteMap = liteMember.getMap(mapName); + final NearCache nearCache = getNearCache(liteMember, mapName); assertTrueEventually(new AssertTask() { @Override public void run() { - for (int i = 0; i < count; i++) { + for (int i = 0; i < ENTRY_COUNT; i++) { liteMap.get(i); } - assertEquals(count, nearCache.size()); + assertEquals(ENTRY_COUNT, nearCache.size()); } }); map.evictAll(); - assertTrueEventually(new AssertTask() { @Override public void run() { - assertTrue(nearCache.size() < count); + assertTrue(nearCache.size() < ENTRY_COUNT); } }); } @@ -160,31 +153,27 @@ public void run() { @Test public void testEvict() { IMap map = client.getMap(mapName); - - final int count = 100; - for (int i = 0; i < count; i++) { + for (int i = 0; i < ENTRY_COUNT; i++) { map.put(i, i); } - final IMap liteMap = lite.getMap(mapName); - final NearCache nearCache = getNearCache(lite, mapName); - + final IMap liteMap = liteMember.getMap(mapName); + final NearCache nearCache = getNearCache(liteMember, mapName); assertTrueEventually(new AssertTask() { @Override public void run() { - for (int i = 0; i < count; i++) { + for (int i = 0; i < ENTRY_COUNT; i++) { liteMap.get(i); } - assertEquals(count, nearCache.size()); + assertEquals(ENTRY_COUNT, nearCache.size()); } }); map.evict(0); - assertTrueEventually(new AssertTask() { @Override public void run() { - assertTrue(nearCache.size() < count); + assertTrue(nearCache.size() < ENTRY_COUNT); } }); } @@ -194,9 +183,8 @@ public void testUpdate() { IMap map = client.getMap(mapName); map.put(1, 1); - final IMap liteMap = lite.getMap(mapName); - final NearCache nearCache = getNearCache(lite, mapName); - + final IMap liteMap = liteMember.getMap(mapName); + final NearCache nearCache = getNearCache(liteMember, mapName); assertTrueEventually(new AssertTask() { @Override public void run() { @@ -206,7 +194,6 @@ public void run() { }); map.put(1, 2); - assertTrueEventually(new AssertTask() { @Override public void run() { @@ -220,9 +207,8 @@ public void testRemove() { IMap map = client.getMap(mapName); map.put(1, 1); - final IMap liteMap = lite.getMap(mapName); - final NearCache nearCache = getNearCache(lite, mapName); - + final IMap liteMap = liteMember.getMap(mapName); + final NearCache nearCache = getNearCache(liteMember, mapName); assertTrueEventually(new AssertTask() { @Override public void run() { @@ -232,7 +218,6 @@ public void run() { }); map.remove(1); - assertTrueEventually(new AssertTask() { @Override public void run() { @@ -241,6 +226,31 @@ public void run() { }); } + @Test + public void testWithTransactionalMap() { + TransactionContext context = client.newTransactionContext(); + context.beginTransaction(); + try { + TransactionalMap txnMap = context.getMap(mapName); + assertNull("Expected null for a non-existent key", txnMap.get("key")); + assertFalse("Expected non-existent key not to be found", txnMap.containsKey("key")); + + assertNull("Expected no old value for new key", txnMap.put("key", "value")); + assertEquals("Expected value for existing key", "value", txnMap.get("key")); + assertTrue("Expected existing key to be found", txnMap.containsKey("key")); + + assertEquals("Expected value when removing existing key", "value", txnMap.remove("key")); + assertNull("Expected null for a non-existent key", txnMap.get("key")); + assertFalse("Expected non-existent key not to be found", txnMap.containsKey("key")); + } finally { + context.rollbackTransaction(); + } + + IMap map = client.getMap(mapName); + assertNull("Expected null for a non-existent key", map.get("key")); + assertFalse("Expected non-existent key not to be found", map.containsKey("key")); + } + private Config createServerConfig(String mapName, boolean liteMember) { NearCacheConfig nearCacheConfig = new NearCacheConfig() .setInvalidateOnChange(true); @@ -256,15 +266,15 @@ private Config createServerConfig(String mapName, boolean liteMember) { .addMapConfig(mapConfig); } - @SuppressWarnings("unchecked") private NearCache getNearCache(HazelcastInstance instance, String mapName) { - MapServiceContext mapServiceContext = getMapService(instance).getMapServiceContext(); + NodeEngine nodeEngine = getNodeEngineImpl(instance); + MapServiceContext mapServiceContext = getMapService(nodeEngine).getMapServiceContext(); MapNearCacheManager mapNearCacheManager = mapServiceContext.getMapNearCacheManager(); - NearCacheConfig nearCacheConfig = getNodeEngineImpl(instance).getConfig().findMapConfig(mapName).getNearCacheConfig(); - return mapNearCacheManager.getOrCreateNearCache(mapName, nearCacheConfig); + MapConfig mapConfig = nodeEngine.getConfig().findMapConfig(mapName); + return mapNearCacheManager.getOrCreateNearCache(mapName, mapConfig.getNearCacheConfig()); } - private MapService getMapService(HazelcastInstance instance) { - return getNodeEngineImpl(instance).getService(MapService.SERVICE_NAME); + private MapService getMapService(NodeEngine nodeEngine) { + return nodeEngine.getService(MapService.SERVICE_NAME); } } diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxy.java b/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxy.java index 9710b3e2425d..7aade4dbfbcb 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxy.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxy.java @@ -48,7 +48,8 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; /** - * Proxy implementation of {@link com.hazelcast.core.TransactionalMap} interface. + * Proxy implementation of {@link com.hazelcast.core.TransactionalMap} + * interface. */ public class TransactionalMapProxy extends TransactionalMapProxySupport implements TransactionalMap { @@ -63,12 +64,13 @@ public boolean containsKey(Object key) { checkTransactionState(); checkNotNull(key, "key can't be null"); - Data keyData = ss.toData(key, partitionStrategy); - TxnValueWrapper valueWrapper = txMap.get(keyData); + Data dataKey = ss.toData(key, partitionStrategy); + + TxnValueWrapper valueWrapper = txMap.get(dataKey); if (valueWrapper != null) { return (valueWrapper.type != Type.REMOVED); } - return containsKeyInternal(keyData, key); + return containsKeyInternal(key, dataKey); } @Override @@ -100,14 +102,13 @@ public Object get(Object key) { checkTransactionState(); checkNotNull(key, "key can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); - Data keyData = nearCacheKey instanceof Data ? (Data) nearCacheKey : ss.toData(key, partitionStrategy); + Data dataKey = ss.toData(key, partitionStrategy); - TxnValueWrapper currentValue = txMap.get(keyData); + TxnValueWrapper currentValue = txMap.get(dataKey); if (currentValue != null) { return checkIfRemoved(currentValue); } - return toObjectIfNeeded(getInternal(nearCacheKey, keyData)); + return toObjectIfNeeded(getInternal(key, dataKey)); } @Override @@ -115,14 +116,14 @@ public Object getForUpdate(Object key) { checkTransactionState(); checkNotNull(key, "key can't be null"); - Data keyData = ss.toData(key, partitionStrategy); + Data dataKey = ss.toData(key, partitionStrategy); - TxnValueWrapper currentValue = txMap.get(keyData); + TxnValueWrapper currentValue = txMap.get(dataKey); if (currentValue != null) { return checkIfRemoved(currentValue); } - return toObjectIfNeeded(getForUpdateInternal(keyData)); + return toObjectIfNeeded(getForUpdateInternal(dataKey)); } @Override @@ -136,18 +137,17 @@ public Object put(Object key, Object value, long ttl, TimeUnit timeUnit) { checkNotNull(key, "key can't be null"); checkNotNull(value, "value can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); + Data dataKey = ss.toData(key, partitionStrategy); try { - Data keyData = ss.toData(nearCacheKey, partitionStrategy); - Object valueBeforeTxn = toObjectIfNeeded(putInternal(keyData, ss.toData(value), ttl, timeUnit)); + Object valueBeforeTxn = toObjectIfNeeded(putInternal(dataKey, ss.toData(value), ttl, timeUnit)); - TxnValueWrapper currentValue = txMap.get(keyData); + TxnValueWrapper currentValue = txMap.get(dataKey); Type type = valueBeforeTxn == null ? Type.NEW : Type.UPDATED; TxnValueWrapper wrapper = new TxnValueWrapper(value, type); - txMap.put(keyData, wrapper); + txMap.put(dataKey, wrapper); return currentValue == null ? valueBeforeTxn : checkIfRemoved(currentValue); } finally { - invalidateNearCache(nearCacheKey); + invalidateNearCache(key, dataKey); } } @@ -157,15 +157,14 @@ public void set(Object key, Object value) { checkNotNull(key, "key can't be null"); checkNotNull(value, "value can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); + Data dataKey = ss.toData(key, partitionStrategy); try { - Data keyData = ss.toData(nearCacheKey, partitionStrategy); - Data dataBeforeTxn = putInternal(keyData, ss.toData(value), -1, MILLISECONDS); + Data dataBeforeTxn = putInternal(dataKey, ss.toData(value), -1, MILLISECONDS); Type type = dataBeforeTxn == null ? Type.NEW : Type.UPDATED; TxnValueWrapper wrapper = new TxnValueWrapper(value, type); - txMap.put(keyData, wrapper); + txMap.put(dataKey, wrapper); } finally { - invalidateNearCache(nearCacheKey); + invalidateNearCache(key, dataKey); } } @@ -175,27 +174,26 @@ public Object putIfAbsent(Object key, Object value) { checkNotNull(key, "key can't be null"); checkNotNull(value, "value can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); + Data dataKey = ss.toData(key, partitionStrategy); try { - Data keyData = ss.toData(nearCacheKey, partitionStrategy); - TxnValueWrapper wrapper = txMap.get(keyData); + TxnValueWrapper wrapper = txMap.get(dataKey); boolean haveTxnPast = wrapper != null; if (haveTxnPast) { if (wrapper.type != Type.REMOVED) { return wrapper.value; } - putInternal(keyData, ss.toData(value), -1, MILLISECONDS); - txMap.put(keyData, new TxnValueWrapper(value, Type.NEW)); + putInternal(dataKey, ss.toData(value), -1, MILLISECONDS); + txMap.put(dataKey, new TxnValueWrapper(value, Type.NEW)); return null; } else { - Data oldValue = putIfAbsentInternal(keyData, ss.toData(value)); + Data oldValue = putIfAbsentInternal(dataKey, ss.toData(value)); if (oldValue == null) { - txMap.put(keyData, new TxnValueWrapper(value, Type.NEW)); + txMap.put(dataKey, new TxnValueWrapper(value, Type.NEW)); } return toObjectIfNeeded(oldValue); } } finally { - invalidateNearCache(nearCacheKey); + invalidateNearCache(key, dataKey); } } @@ -205,28 +203,26 @@ public Object replace(Object key, Object value) { checkNotNull(key, "key can't be null"); checkNotNull(value, "value can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); + Data dataKey = ss.toData(key, partitionStrategy); try { - Data keyData = ss.toData(nearCacheKey, partitionStrategy); - - TxnValueWrapper wrapper = txMap.get(keyData); + TxnValueWrapper wrapper = txMap.get(dataKey); boolean haveTxnPast = wrapper != null; if (haveTxnPast) { if (wrapper.type == Type.REMOVED) { return null; } - putInternal(keyData, ss.toData(value), -1, MILLISECONDS); - txMap.put(keyData, new TxnValueWrapper(value, Type.UPDATED)); + putInternal(dataKey, ss.toData(value), -1, MILLISECONDS); + txMap.put(dataKey, new TxnValueWrapper(value, Type.UPDATED)); return wrapper.value; } else { - Data oldValue = replaceInternal(keyData, ss.toData(value)); + Data oldValue = replaceInternal(dataKey, ss.toData(value)); if (oldValue != null) { - txMap.put(keyData, new TxnValueWrapper(value, Type.UPDATED)); + txMap.put(dataKey, new TxnValueWrapper(value, Type.UPDATED)); } return toObjectIfNeeded(oldValue); } } finally { - invalidateNearCache(nearCacheKey); + invalidateNearCache(key, dataKey); } } @@ -237,28 +233,26 @@ public boolean replace(Object key, Object oldValue, Object newValue) { checkNotNull(oldValue, "oldValue can't be null"); checkNotNull(newValue, "newValue can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); + Data dataKey = ss.toData(key, partitionStrategy); try { - Data keyData = ss.toData(nearCacheKey, partitionStrategy); - - TxnValueWrapper wrapper = txMap.get(keyData); + TxnValueWrapper wrapper = txMap.get(dataKey); boolean haveTxnPast = wrapper != null; if (haveTxnPast) { if (!wrapper.value.equals(oldValue)) { return false; } - putInternal(keyData, ss.toData(newValue), -1, MILLISECONDS); - txMap.put(keyData, new TxnValueWrapper(wrapper.value, Type.UPDATED)); + putInternal(dataKey, ss.toData(newValue), -1, MILLISECONDS); + txMap.put(dataKey, new TxnValueWrapper(wrapper.value, Type.UPDATED)); return true; } else { - boolean success = replaceIfSameInternal(keyData, ss.toData(oldValue), ss.toData(newValue)); + boolean success = replaceIfSameInternal(dataKey, ss.toData(oldValue), ss.toData(newValue)); if (success) { - txMap.put(keyData, new TxnValueWrapper(newValue, Type.UPDATED)); + txMap.put(dataKey, new TxnValueWrapper(newValue, Type.UPDATED)); } return success; } } finally { - invalidateNearCache(nearCacheKey); + invalidateNearCache(key, dataKey); } } @@ -268,16 +262,14 @@ public boolean remove(Object key, Object value) { checkNotNull(key, "key can't be null"); checkNotNull(value, "value can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); + Data dataKey = ss.toData(key, partitionStrategy); try { - Data keyData = ss.toData(nearCacheKey, partitionStrategy); - - TxnValueWrapper wrapper = txMap.get(keyData); + TxnValueWrapper wrapper = txMap.get(dataKey); // wrapper is null which means this entry is not touched by transaction if (wrapper == null) { - boolean removed = removeIfSameInternal(keyData, value); + boolean removed = removeIfSameInternal(dataKey, value); if (removed) { - txMap.put(keyData, new TxnValueWrapper(value, Type.REMOVED)); + txMap.put(dataKey, new TxnValueWrapper(value, Type.REMOVED)); } return removed; } @@ -290,13 +282,12 @@ public boolean remove(Object key, Object value) { return false; } // wrapper value is equal to passed value, we call removeInternal just to add delete log - removeInternal(keyData); - txMap.put(keyData, new TxnValueWrapper(value, Type.REMOVED)); + removeInternal(dataKey); + txMap.put(dataKey, new TxnValueWrapper(value, Type.REMOVED)); + return true; } finally { - invalidateNearCache(nearCacheKey); + invalidateNearCache(key, dataKey); } - - return true; } @Override @@ -304,18 +295,17 @@ public Object remove(Object key) { checkTransactionState(); checkNotNull(key, "key can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); + Data dataKey = ss.toData(key, partitionStrategy); try { - Data keyData = ss.toData(nearCacheKey, partitionStrategy); - Object valueBeforeTxn = toObjectIfNeeded(removeInternal(keyData)); + Object valueBeforeTxn = toObjectIfNeeded(removeInternal(dataKey)); TxnValueWrapper wrapper = null; - if (valueBeforeTxn != null || txMap.containsKey(keyData)) { - wrapper = txMap.put(keyData, new TxnValueWrapper(valueBeforeTxn, Type.REMOVED)); + if (valueBeforeTxn != null || txMap.containsKey(dataKey)) { + wrapper = txMap.put(dataKey, new TxnValueWrapper(valueBeforeTxn, Type.REMOVED)); } return wrapper == null ? valueBeforeTxn : checkIfRemoved(wrapper); } finally { - invalidateNearCache(nearCacheKey); + invalidateNearCache(key, dataKey); } } @@ -324,15 +314,14 @@ public void delete(Object key) { checkTransactionState(); checkNotNull(key, "key can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); + Data dataKey = ss.toData(key, partitionStrategy); try { - Data keyData = ss.toData(key, partitionStrategy); - Data data = removeInternal(keyData); - if (data != null || txMap.containsKey(keyData)) { - txMap.put(keyData, new TxnValueWrapper(toObjectIfNeeded(data), Type.REMOVED)); + Data data = removeInternal(dataKey); + if (data != null || txMap.containsKey(dataKey)) { + txMap.put(dataKey, new TxnValueWrapper(toObjectIfNeeded(data), Type.REMOVED)); } } finally { - invalidateNearCache(nearCacheKey); + invalidateNearCache(key, dataKey); } } @@ -364,15 +353,15 @@ public Set keySet(Predicate predicate) { // meanwhile remove keys which are not in txMap returningKeySet.remove(toObjectIfNeeded(entry.getKey())); } else { - Data keyData = entry.getKey(); + Data dataKey = entry.getKey(); if (predicate == TruePredicate.INSTANCE) { - returningKeySet.add(toObjectIfNeeded(keyData)); + returningKeySet.add(toObjectIfNeeded(dataKey)); } else { - cachedQueryEntry.init(ss, keyData, entry.getValue().value, extractors); + cachedQueryEntry.init(ss, dataKey, entry.getValue().value, extractors); // apply predicate on txMap if (predicate.apply(cachedQueryEntry)) { - returningKeySet.add(toObjectIfNeeded(keyData)); + returningKeySet.add(toObjectIfNeeded(dataKey)); } } } diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxySupport.java b/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxySupport.java index d69fb9ee0f6c..0ae07fd8d419 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxySupport.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxySupport.java @@ -49,7 +49,8 @@ import static com.hazelcast.util.ExceptionUtil.rethrow; /** - * Base class contains proxy helper methods for {@link com.hazelcast.map.impl.tx.TransactionalMapProxy} + * Base class contains proxy helper methods for + * {@link com.hazelcast.map.impl.tx.TransactionalMapProxy}. */ public abstract class TransactionalMapProxySupport extends TransactionalDistributedObject { @@ -86,7 +87,7 @@ public abstract class TransactionalMapProxySupport extends TransactionalDistribu } @Override - public String getName() { + public final String getName() { return name; } @@ -95,19 +96,19 @@ public final String getServiceName() { return SERVICE_NAME; } - boolean isEquals(Object value1, Object value2) { + final boolean isEquals(Object value1, Object value2) { return recordComparator.isEqual(value1, value2); } - void checkTransactionState() { + final void checkTransactionState() { if (!tx.getState().equals(Transaction.State.ACTIVE)) { throw new TransactionNotActiveException("Transaction is not active!"); } } - boolean containsKeyInternal(Data dataKey, Object objectKey) { + final boolean containsKeyInternal(Object key, Data dataKey) { if (nearCacheEnabled) { - Object nearCacheKey = serializeKeys ? dataKey : objectKey; + Object nearCacheKey = toNearCacheKeyWithStrategy(key, dataKey); Object cachedValue = getCachedValue(nearCacheKey, false); if (cachedValue != NOT_CACHED) { return cachedValue != null; @@ -125,17 +126,18 @@ boolean containsKeyInternal(Data dataKey, Object objectKey) { } } - Object getInternal(Object nearCacheKey, Data keyData) { + final Object getInternal(Object key, Data dataKey) { if (nearCacheEnabled) { + Object nearCacheKey = toNearCacheKeyWithStrategy(key, dataKey); Object value = getCachedValue(nearCacheKey, true); if (value != NOT_CACHED) { return value; } } - MapOperation operation = operationProvider.createGetOperation(name, keyData); + MapOperation operation = operationProvider.createGetOperation(name, dataKey); operation.setThreadId(ThreadUtil.getThreadId()); - int partitionId = partitionService.getPartitionId(keyData); + int partitionId = partitionService.getPartitionId(dataKey); try { Future future = operationService.createInvocationBuilder(SERVICE_NAME, operation, partitionId) .setResultDeserialized(false) @@ -146,54 +148,13 @@ Object getInternal(Object nearCacheKey, Data keyData) { } } - final Object toNearCacheKeyWithStrategy(Object key) { - if (!nearCacheEnabled) { - return key; - } - - return serializeKeys ? ss.toData(key, partitionStrategy) : key; - } - - final void invalidateNearCache(Object nearCacheKey) { - if (!nearCacheEnabled) { - return; - } - if (nearCacheKey == null) { - return; - } - NearCache nearCache = mapNearCacheManager.getNearCache(name); - if (nearCache == null) { - return; - } - - nearCache.invalidate(nearCacheKey); - } - - private Object getCachedValue(Object nearCacheKey, boolean deserializeValue) { - NearCache nearCache = mapNearCacheManager.getNearCache(name); - if (nearCache == null) { - return NOT_CACHED; - } - - Object value = nearCache.get(nearCacheKey); - if (value == null) { - return NOT_CACHED; - } - if (value == CACHED_AS_NULL) { - return null; - } - - mapServiceContext.interceptAfterGet(name, value); - return deserializeValue ? ss.toObject(value) : value; - } - - Object getForUpdateInternal(Data key) { + final Object getForUpdateInternal(Data key) { VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis(), true); addUnlockTransactionRecord(key, versionedValue.version); return versionedValue.value; } - int sizeInternal() { + final int sizeInternal() { try { OperationFactory sizeOperationFactory = operationProvider.createMapSizeOperationFactory(name); Map results = operationService.invokeOnAllPartitions(SERVICE_NAME, sizeOperationFactory); @@ -208,7 +169,7 @@ int sizeInternal() { } } - Data putInternal(Data key, Data value, long ttl, TimeUnit timeUnit) { + final Data putInternal(Data key, Data value, long ttl, TimeUnit timeUnit) { VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis()); long timeInMillis = getTimeInMillis(ttl, timeUnit); MapOperation operation = operationProvider.createTxnSetOperation(name, key, value, versionedValue.version, timeInMillis); @@ -216,7 +177,7 @@ Data putInternal(Data key, Data value, long ttl, TimeUnit timeUnit) { return versionedValue.value; } - Data putIfAbsentInternal(Data key, Data value) { + final Data putIfAbsentInternal(Data key, Data value) { boolean unlockImmediately = !valueMap.containsKey(key); VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis()); if (versionedValue.value != null) { @@ -233,7 +194,7 @@ Data putIfAbsentInternal(Data key, Data value) { return versionedValue.value; } - Data replaceInternal(Data key, Data value) { + final Data replaceInternal(Data key, Data value) { boolean unlockImmediately = !valueMap.containsKey(key); VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis()); if (versionedValue.value == null) { @@ -249,7 +210,7 @@ Data replaceInternal(Data key, Data value) { return versionedValue.value; } - boolean replaceIfSameInternal(Data key, Object oldValue, Data newValue) { + final boolean replaceIfSameInternal(Data key, Object oldValue, Data newValue) { boolean unlockImmediately = !valueMap.containsKey(key); VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis()); if (!isEquals(oldValue, versionedValue.value)) { @@ -265,15 +226,14 @@ boolean replaceIfSameInternal(Data key, Object oldValue, Data newValue) { return true; } - Data removeInternal(Data key) { + final Data removeInternal(Data key) { VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis()); - tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), - operationProvider.createTxnDeleteOperation(name, key, versionedValue.version), - versionedValue.version, tx.getOwnerUuid())); + MapOperation operation = operationProvider.createTxnDeleteOperation(name, key, versionedValue.version); + tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), operation, versionedValue.version, tx.getOwnerUuid())); return versionedValue.value; } - boolean removeIfSameInternal(Data key, Object value) { + final boolean removeIfSameInternal(Data key, Object value) { boolean unlockImmediately = !valueMap.containsKey(key); VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis()); if (!isEquals(versionedValue.value, value)) { @@ -284,12 +244,48 @@ boolean removeIfSameInternal(Data key, Object value) { addUnlockTransactionRecord(key, versionedValue.version); return false; } - tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), - operationProvider.createTxnDeleteOperation(name, key, versionedValue.version), - versionedValue.version, tx.getOwnerUuid())); + MapOperation operation = operationProvider.createTxnDeleteOperation(name, key, versionedValue.version); + tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), operation, versionedValue.version, tx.getOwnerUuid())); return true; } + final void invalidateNearCache(Object key, Data dataKey) { + if (!nearCacheEnabled || key == null) { + return; + } + NearCache nearCache = mapNearCacheManager.getNearCache(name); + if (nearCache == null) { + return; + } + Object nearCacheKey = toNearCacheKeyWithStrategy(key, dataKey); + nearCache.invalidate(nearCacheKey); + } + + private Object toNearCacheKeyWithStrategy(Object key, Data dataKey) { + if (!nearCacheEnabled) { + return null; + } + return serializeKeys ? dataKey : ss.toObject(key); + } + + private Object getCachedValue(Object nearCacheKey, boolean deserializeValue) { + NearCache nearCache = mapNearCacheManager.getNearCache(name); + if (nearCache == null) { + return NOT_CACHED; + } + + Object value = nearCache.get(nearCacheKey); + if (value == null) { + return NOT_CACHED; + } + if (value == CACHED_AS_NULL) { + return null; + } + + mapServiceContext.interceptAfterGet(name, value); + return deserializeValue ? ss.toObject(value) : value; + } + private void unlock(Data key, VersionedValue versionedValue) { try { TxnUnlockOperation unlockOperation = new TxnUnlockOperation(name, key, versionedValue.version); @@ -327,8 +323,8 @@ private VersionedValue lockAndGet(Data key, long timeout, boolean shouldLoad) { return versionedValue; } boolean blockReads = tx.getTransactionType() == TransactionType.ONE_PHASE; - MapOperation operation = operationProvider.createTxnLockAndGetOperation(name, key, timeout, timeout, - tx.getOwnerUuid(), shouldLoad, blockReads); + MapOperation operation = operationProvider.createTxnLockAndGetOperation(name, key, timeout, timeout, tx.getOwnerUuid(), + shouldLoad, blockReads); operation.setThreadId(ThreadUtil.getThreadId()); try { int partitionId = partitionService.getPartitionId(key);