From 5074bf597ac7bb14b5a0be3593d597ff82e73c25 Mon Sep 17 00:00:00 2001 From: Ahmet Mircik Date: Fri, 31 Jan 2020 17:28:28 +0300 Subject: [PATCH] Fix txn near cache --- .../CollectionTransactionLogRecord.java | 10 + .../ClusterStateTransactionLogRecord.java | 12 +- .../internal/nearcache/NearCachingHook.java | 52 ++++ .../map/impl/tx/MapTransactionLogRecord.java | 23 +- .../map/impl/tx/TransactionalMapProxy.java | 258 +++++++++--------- .../impl/tx/TransactionalMapProxySupport.java | 53 +++- .../txn/MultiMapTransactionLogRecord.java | 10 + .../transaction/impl/TransactionImpl.java | 2 + .../transaction/impl/TransactionLog.java | 12 + .../impl/TransactionLogRecord.java | 4 + .../transaction/impl/xa/XATransaction.java | 27 +- .../map/impl/tx/MapTransactionStressTest.java | 10 + .../tx/TxnMapNearCacheInvalidationTest.java | 79 +++++- .../impl/MockTransactionLogRecord.java | 10 + 14 files changed, 405 insertions(+), 157 deletions(-) create mode 100644 hazelcast/src/main/java/com/hazelcast/internal/nearcache/NearCachingHook.java diff --git a/hazelcast/src/main/java/com/hazelcast/collection/impl/txncollection/CollectionTransactionLogRecord.java b/hazelcast/src/main/java/com/hazelcast/collection/impl/txncollection/CollectionTransactionLogRecord.java index f758389667b26..c336dfebb9845 100644 --- a/hazelcast/src/main/java/com/hazelcast/collection/impl/txncollection/CollectionTransactionLogRecord.java +++ b/hazelcast/src/main/java/com/hazelcast/collection/impl/txncollection/CollectionTransactionLogRecord.java @@ -64,6 +64,16 @@ public Operation newCommitOperation() { return new CollectionCommitOperation(partitionId, name, serviceName, operationList); } + @Override + public void onCommitSuccess() { + // NOP + } + + @Override + public void onCommitFailure() { + // NOP + } + @Override public Operation newRollbackOperation() { long[] itemIds = createItemIdArray(); diff --git a/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/ClusterStateTransactionLogRecord.java b/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/ClusterStateTransactionLogRecord.java index 7a0444cddfb04..d78cc56680c6c 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/ClusterStateTransactionLogRecord.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/ClusterStateTransactionLogRecord.java @@ -49,7 +49,7 @@ public ClusterStateTransactionLogRecord() { } public ClusterStateTransactionLogRecord(ClusterStateChange stateChange, Address initiator, Address target, - String txnId, long leaseTime, int partitionStateVersion, boolean isTransient) { + String txnId, long leaseTime, int partitionStateVersion, boolean isTransient) { Preconditions.checkNotNull(stateChange); Preconditions.checkNotNull(initiator); Preconditions.checkNotNull(target); @@ -85,6 +85,16 @@ public Operation newRollbackOperation() { return new RollbackClusterStateOp(initiator, txnId); } + @Override + public void onCommitSuccess() { + // NOP + } + + @Override + public void onCommitFailure() { + // NOP + } + @Override public Address getTarget() { return target; diff --git a/hazelcast/src/main/java/com/hazelcast/internal/nearcache/NearCachingHook.java b/hazelcast/src/main/java/com/hazelcast/internal/nearcache/NearCachingHook.java new file mode 100644 index 0000000000000..883e39ec570ef --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/internal/nearcache/NearCachingHook.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.internal.nearcache; + + +import com.hazelcast.nio.serialization.Data; + +/** + * Hook to be used by near cache enabled proxy objects. + * + * With this hook, you can implement needed logic + * for truly invalidate/populate local near cache. + */ +public interface NearCachingHook { + + NearCachingHook EMPTY_HOOK = new NearCachingHook() { + + @Override + public void beforeRemoteCall(Object key, Data keyData, + Object value, Data valueData) { + } + + @Override + public void onRemoteCallSuccess() { + } + + @Override + public void onRemoteCallFailure() { + + } + }; + + void beforeRemoteCall(K key, Data keyData, V value, Data valueData); + + void onRemoteCallSuccess(); + + void onRemoteCallFailure(); +} diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/tx/MapTransactionLogRecord.java b/hazelcast/src/main/java/com/hazelcast/map/impl/tx/MapTransactionLogRecord.java index 71b0a266bb5e3..741b5fb36158d 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/tx/MapTransactionLogRecord.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/tx/MapTransactionLogRecord.java @@ -16,6 +16,7 @@ package com.hazelcast.map.impl.tx; +import com.hazelcast.internal.nearcache.NearCachingHook; import com.hazelcast.map.impl.MapDataSerializerHook; import com.hazelcast.map.impl.MapRecordKey; import com.hazelcast.nio.ObjectDataInput; @@ -39,10 +40,15 @@ public class MapTransactionLogRecord implements TransactionLogRecord { private String ownerUuid; private Operation op; + private transient NearCachingHook nearCachingHook = NearCachingHook.EMPTY_HOOK; + public MapTransactionLogRecord() { } - public MapTransactionLogRecord(String name, Data key, int partitionId, Operation op, long version, String ownerUuid) { + public MapTransactionLogRecord(String name, Data key, int partitionId, + Operation op, String ownerUuid, NearCachingHook nearCachingHook) { + assert nearCachingHook != null; + this.name = name; this.key = key; if (!(op instanceof MapTxnOperation)) { @@ -51,6 +57,7 @@ public MapTransactionLogRecord(String name, Data key, int partitionId, Operation this.op = op; this.ownerUuid = ownerUuid; this.partitionId = partitionId; + this.nearCachingHook = nearCachingHook; } @Override @@ -69,6 +76,20 @@ public Operation newCommitOperation() { return op; } + @Override + public void onCommitSuccess() { + assert nearCachingHook != null; + + nearCachingHook.onRemoteCallSuccess(); + } + + @Override + public void onCommitFailure() { + assert nearCachingHook != null; + + nearCachingHook.onRemoteCallFailure(); + } + @Override public Operation newRollbackOperation() { TxnRollbackOperation operation = new TxnRollbackOperation(partitionId, name, key, ownerUuid); diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxy.java b/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxy.java index 71500e43627a9..a0b5c7797dbfb 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxy.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxy.java @@ -17,6 +17,7 @@ package com.hazelcast.map.impl.tx; import com.hazelcast.core.TransactionalMap; +import com.hazelcast.internal.nearcache.NearCachingHook; import com.hazelcast.map.impl.MapService; import com.hazelcast.map.impl.query.MapQueryEngine; import com.hazelcast.map.impl.query.Query; @@ -54,7 +55,8 @@ public class TransactionalMapProxy extends TransactionalMapProxySupport implemen private final Map txMap = new HashMap(); - public TransactionalMapProxy(String name, MapService mapService, NodeEngine nodeEngine, Transaction transaction) { + public TransactionalMapProxy(String name, MapService mapService, + NodeEngine nodeEngine, Transaction transaction) { super(name, mapService, nodeEngine, transaction); } @@ -143,19 +145,19 @@ public Object put(Object key, Object value, long ttl, TimeUnit timeUnit) { checkNotNull(key, "key can't be null"); checkNotNull(value, "value can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); - try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); - Object valueBeforeTxn = toObjectIfNeeded(putInternal(keyData, mapServiceContext.toData(value), ttl, timeUnit)); - - TxnValueWrapper currentValue = txMap.get(keyData); - Type type = valueBeforeTxn == null ? Type.NEW : Type.UPDATED; - TxnValueWrapper wrapper = new TxnValueWrapper(value, type); - txMap.put(keyData, wrapper); - return currentValue == null ? valueBeforeTxn : checkIfRemoved(currentValue); - } finally { - invalidateNearCache(nearCacheKey); - } + Data keyData = mapServiceContext.toData(key, partitionStrategy); + Data valueData = mapServiceContext.toData(value); + + NearCachingHook nearCachingHook = newNearCachingHook(); + nearCachingHook.beforeRemoteCall(key, keyData, value, valueData); + + Object valueBeforeTxn = toObjectIfNeeded(putInternal(keyData, valueData, ttl, timeUnit, nearCachingHook)); + + TxnValueWrapper currentValue = txMap.get(keyData); + Type type = valueBeforeTxn == null ? Type.NEW : Type.UPDATED; + TxnValueWrapper wrapper = new TxnValueWrapper(value, type); + txMap.put(keyData, wrapper); + return currentValue == null ? valueBeforeTxn : checkIfRemoved(currentValue); } @Override @@ -164,16 +166,16 @@ public void set(Object key, Object value) { checkNotNull(key, "key can't be null"); checkNotNull(value, "value can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); - try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); - Data dataBeforeTxn = putInternal(keyData, mapServiceContext.toData(value), -1, MILLISECONDS); - Type type = dataBeforeTxn == null ? Type.NEW : Type.UPDATED; - TxnValueWrapper wrapper = new TxnValueWrapper(value, type); - txMap.put(keyData, wrapper); - } finally { - invalidateNearCache(nearCacheKey); - } + Data keyData = mapServiceContext.toData(key, partitionStrategy); + Data valueData = mapServiceContext.toData(value); + + NearCachingHook nearCachingHook = newNearCachingHook(); + nearCachingHook.beforeRemoteCall(key, keyData, value, valueData); + + Data dataBeforeTxn = putInternal(keyData, valueData, -1, MILLISECONDS, nearCachingHook); + Type type = dataBeforeTxn == null ? Type.NEW : Type.UPDATED; + TxnValueWrapper wrapper = new TxnValueWrapper(value, type); + txMap.put(keyData, wrapper); } @Override @@ -182,27 +184,27 @@ public Object putIfAbsent(Object key, Object value) { checkNotNull(key, "key can't be null"); checkNotNull(value, "value can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); - try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); - TxnValueWrapper wrapper = txMap.get(keyData); - boolean haveTxnPast = wrapper != null; - if (haveTxnPast) { - if (wrapper.type != Type.REMOVED) { - return wrapper.value; - } - putInternal(keyData, mapServiceContext.toData(value), -1, MILLISECONDS); + Data keyData = mapServiceContext.toData(key, partitionStrategy); + Data valueData = mapServiceContext.toData(value); + + NearCachingHook nearCachingHook = newNearCachingHook(); + nearCachingHook.beforeRemoteCall(key, keyData, value, valueData); + + TxnValueWrapper wrapper = txMap.get(keyData); + boolean haveTxnPast = wrapper != null; + if (haveTxnPast) { + if (wrapper.type != Type.REMOVED) { + return wrapper.value; + } + putInternal(keyData, valueData, -1, MILLISECONDS, nearCachingHook); + txMap.put(keyData, new TxnValueWrapper(value, Type.NEW)); + return null; + } else { + Data oldValue = putIfAbsentInternal(keyData, valueData, nearCachingHook); + if (oldValue == null) { txMap.put(keyData, new TxnValueWrapper(value, Type.NEW)); - return null; - } else { - Data oldValue = putIfAbsentInternal(keyData, mapServiceContext.toData(value)); - if (oldValue == null) { - txMap.put(keyData, new TxnValueWrapper(value, Type.NEW)); - } - return toObjectIfNeeded(oldValue); } - } finally { - invalidateNearCache(nearCacheKey); + return toObjectIfNeeded(oldValue); } } @@ -212,28 +214,27 @@ public Object replace(Object key, Object value) { checkNotNull(key, "key can't be null"); checkNotNull(value, "value can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); - try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); - - TxnValueWrapper wrapper = txMap.get(keyData); - boolean haveTxnPast = wrapper != null; - if (haveTxnPast) { - if (wrapper.type == Type.REMOVED) { - return null; - } - putInternal(keyData, mapServiceContext.toData(value), -1, MILLISECONDS); + Data keyData = mapServiceContext.toData(key, partitionStrategy); + Data valueData = mapServiceContext.toData(value); + + NearCachingHook nearCachingHook = newNearCachingHook(); + nearCachingHook.beforeRemoteCall(key, keyData, value, valueData); + + TxnValueWrapper wrapper = txMap.get(keyData); + boolean haveTxnPast = wrapper != null; + if (haveTxnPast) { + if (wrapper.type == Type.REMOVED) { + return null; + } + putInternal(keyData, valueData, -1, MILLISECONDS, nearCachingHook); + txMap.put(keyData, new TxnValueWrapper(value, Type.UPDATED)); + return wrapper.value; + } else { + Data oldValue = replaceInternal(keyData, valueData, nearCachingHook); + if (oldValue != null) { txMap.put(keyData, new TxnValueWrapper(value, Type.UPDATED)); - return wrapper.value; - } else { - Data oldValue = replaceInternal(keyData, mapServiceContext.toData(value)); - if (oldValue != null) { - txMap.put(keyData, new TxnValueWrapper(value, Type.UPDATED)); - } - return toObjectIfNeeded(oldValue); } - } finally { - invalidateNearCache(nearCacheKey); + return toObjectIfNeeded(oldValue); } } @@ -244,29 +245,28 @@ public boolean replace(Object key, Object oldValue, Object newValue) { checkNotNull(oldValue, "oldValue can't be null"); checkNotNull(newValue, "newValue can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); - try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); - - TxnValueWrapper wrapper = txMap.get(keyData); - boolean haveTxnPast = wrapper != null; - if (haveTxnPast) { - if (!wrapper.value.equals(oldValue)) { - return false; - } - putInternal(keyData, mapServiceContext.toData(newValue), -1, MILLISECONDS); - txMap.put(keyData, new TxnValueWrapper(wrapper.value, Type.UPDATED)); - return true; - } else { - boolean success = replaceIfSameInternal(keyData, - mapServiceContext.toData(oldValue), mapServiceContext.toData(newValue)); - if (success) { - txMap.put(keyData, new TxnValueWrapper(newValue, Type.UPDATED)); - } - return success; + Data keyData = mapServiceContext.toData(key, partitionStrategy); + Data newValueData = mapServiceContext.toData(newValue); + + NearCachingHook nearCachingHook = newNearCachingHook(); + nearCachingHook.beforeRemoteCall(key, keyData, newValue, newValueData); + + TxnValueWrapper wrapper = txMap.get(keyData); + boolean haveTxnPast = wrapper != null; + if (haveTxnPast) { + if (!wrapper.value.equals(oldValue)) { + return false; + } + putInternal(keyData, newValueData, -1, MILLISECONDS, nearCachingHook); + txMap.put(keyData, new TxnValueWrapper(wrapper.value, Type.UPDATED)); + return true; + } else { + boolean success = replaceIfSameInternal(keyData, + mapServiceContext.toData(oldValue), newValueData, nearCachingHook); + if (success) { + txMap.put(keyData, new TxnValueWrapper(newValue, Type.UPDATED)); } - } finally { - invalidateNearCache(nearCacheKey); + return success; } } @@ -276,33 +276,31 @@ public boolean remove(Object key, Object value) { checkNotNull(key, "key can't be null"); checkNotNull(value, "value can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); - try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); - - TxnValueWrapper wrapper = txMap.get(keyData); - // wrapper is null which means this entry is not touched by transaction - if (wrapper == null) { - boolean removed = removeIfSameInternal(keyData, value); - if (removed) { - txMap.put(keyData, new TxnValueWrapper(value, Type.REMOVED)); - } - return removed; - } - // wrapper type is REMOVED which means entry is already removed inside the transaction - if (wrapper.type == Type.REMOVED) { - return false; - } - // wrapper value is not equal to passed value - if (!isEquals(wrapper.value, value)) { - return false; + Data keyData = mapServiceContext.toData(key, partitionStrategy); + + NearCachingHook nearCachingHook = newNearCachingHook(); + nearCachingHook.beforeRemoteCall(key, keyData, null, null); + + TxnValueWrapper wrapper = txMap.get(keyData); + // wrapper is null which means this entry is not touched by transaction + if (wrapper == null) { + boolean removed = removeIfSameInternal(keyData, value, nearCachingHook); + if (removed) { + txMap.put(keyData, new TxnValueWrapper(value, Type.REMOVED)); } - // wrapper value is equal to passed value, we call removeInternal just to add delete log - removeInternal(keyData); - txMap.put(keyData, new TxnValueWrapper(value, Type.REMOVED)); - } finally { - invalidateNearCache(nearCacheKey); + return removed; } + // wrapper type is REMOVED which means entry is already removed inside the transaction + if (wrapper.type == Type.REMOVED) { + return false; + } + // wrapper value is not equal to passed value + if (!isEquals(wrapper.value, value)) { + return false; + } + // wrapper value is equal to passed value, we call removeInternal just to add delete log + removeInternal(keyData, nearCachingHook); + txMap.put(keyData, new TxnValueWrapper(value, Type.REMOVED)); return true; } @@ -312,19 +310,18 @@ public Object remove(Object key) { checkTransactionState(); checkNotNull(key, "key can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); - try { - Data keyData = mapServiceContext.toData(nearCacheKey, partitionStrategy); - Object valueBeforeTxn = toObjectIfNeeded(removeInternal(keyData)); + Data keyData = mapServiceContext.toData(key, partitionStrategy); - TxnValueWrapper wrapper = null; - if (valueBeforeTxn != null || txMap.containsKey(keyData)) { - wrapper = txMap.put(keyData, new TxnValueWrapper(valueBeforeTxn, Type.REMOVED)); - } - return wrapper == null ? valueBeforeTxn : checkIfRemoved(wrapper); - } finally { - invalidateNearCache(nearCacheKey); + NearCachingHook nearCachingHook = newNearCachingHook(); + nearCachingHook.beforeRemoteCall(key, keyData, null, null); + + Object valueBeforeTxn = toObjectIfNeeded(removeInternal(keyData, nearCachingHook)); + + TxnValueWrapper wrapper = null; + if (valueBeforeTxn != null || txMap.containsKey(keyData)) { + wrapper = txMap.put(keyData, new TxnValueWrapper(valueBeforeTxn, Type.REMOVED)); } + return wrapper == null ? valueBeforeTxn : checkIfRemoved(wrapper); } @Override @@ -332,15 +329,14 @@ public void delete(Object key) { checkTransactionState(); checkNotNull(key, "key can't be null"); - Object nearCacheKey = toNearCacheKeyWithStrategy(key); - try { - Data keyData = mapServiceContext.toData(key, partitionStrategy); - Data data = removeInternal(keyData); - if (data != null || txMap.containsKey(keyData)) { - txMap.put(keyData, new TxnValueWrapper(toObjectIfNeeded(data), Type.REMOVED)); - } - } finally { - invalidateNearCache(nearCacheKey); + Data keyData = mapServiceContext.toData(key, partitionStrategy); + + NearCachingHook nearCachingHook = newNearCachingHook(); + nearCachingHook.beforeRemoteCall(key, keyData, null, null); + + Data data = removeInternal(keyData, nearCachingHook); + if (data != null || txMap.containsKey(keyData)) { + txMap.put(keyData, new TxnValueWrapper(toObjectIfNeeded(data), Type.REMOVED)); } } @@ -449,7 +445,7 @@ private Object checkIfRemoved(TxnValueWrapper wrapper) { private void removeFromResultSet(Set queryResultSet, List valueSet, Set keyWontBeIncluded) { for (Map.Entry entry : queryResultSet) { - if (keyWontBeIncluded.contains((Data) entry.getKey())) { + if (keyWontBeIncluded.contains(entry.getKey())) { continue; } valueSet.add(toObjectIfNeeded(entry.getValue())); diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxySupport.java b/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxySupport.java index 43d6ac06f9961..89f15de86b4dd 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxySupport.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TransactionalMapProxySupport.java @@ -20,6 +20,7 @@ import com.hazelcast.config.NearCacheConfig; import com.hazelcast.core.PartitioningStrategy; import com.hazelcast.internal.nearcache.NearCache; +import com.hazelcast.internal.nearcache.NearCachingHook; import com.hazelcast.internal.serialization.InternalSerializationService; import com.hazelcast.map.impl.MapService; import com.hazelcast.map.impl.MapServiceContext; @@ -215,15 +216,15 @@ int sizeInternal() { } } - Data putInternal(Data key, Data value, long ttl, TimeUnit timeUnit) { + Data putInternal(Data key, Data value, long ttl, TimeUnit timeUnit, NearCachingHook hook) { VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis()); long timeInMillis = getTimeInMillis(ttl, timeUnit); MapOperation operation = operationProvider.createTxnSetOperation(name, key, value, versionedValue.version, timeInMillis); - tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), operation, versionedValue.version, tx.getOwnerUuid())); + tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), operation, tx.getOwnerUuid(), hook)); return versionedValue.value; } - Data putIfAbsentInternal(Data key, Data value) { + Data putIfAbsentInternal(Data key, Data value, NearCachingHook hook) { boolean unlockImmediately = !valueMap.containsKey(key); VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis()); if (versionedValue.value != null) { @@ -236,11 +237,11 @@ Data putIfAbsentInternal(Data key, Data value) { } MapOperation operation = operationProvider.createTxnSetOperation(name, key, value, versionedValue.version, -1); - tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), operation, versionedValue.version, tx.getOwnerUuid())); + tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), operation, tx.getOwnerUuid(), hook)); return versionedValue.value; } - Data replaceInternal(Data key, Data value) { + Data replaceInternal(Data key, Data value, NearCachingHook hook) { boolean unlockImmediately = !valueMap.containsKey(key); VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis()); if (versionedValue.value == null) { @@ -252,11 +253,11 @@ Data replaceInternal(Data key, Data value) { return null; } MapOperation operation = operationProvider.createTxnSetOperation(name, key, value, versionedValue.version, -1); - tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), operation, versionedValue.version, tx.getOwnerUuid())); + tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), operation, tx.getOwnerUuid(), hook)); return versionedValue.value; } - boolean replaceIfSameInternal(Data key, Object oldValue, Data newValue) { + boolean replaceIfSameInternal(Data key, Object oldValue, Data newValue, NearCachingHook hook) { boolean unlockImmediately = !valueMap.containsKey(key); VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis()); if (!isEquals(oldValue, versionedValue.value)) { @@ -268,19 +269,19 @@ boolean replaceIfSameInternal(Data key, Object oldValue, Data newValue) { return false; } MapOperation operation = operationProvider.createTxnSetOperation(name, key, newValue, versionedValue.version, -1); - tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), operation, versionedValue.version, tx.getOwnerUuid())); + tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), operation, tx.getOwnerUuid(), hook)); return true; } - Data removeInternal(Data key) { + Data removeInternal(Data key, NearCachingHook nearCachingHook) { VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis()); tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), operationProvider.createTxnDeleteOperation(name, key, versionedValue.version), - versionedValue.version, tx.getOwnerUuid())); + tx.getOwnerUuid(), nearCachingHook)); return versionedValue.value; } - boolean removeIfSameInternal(Data key, Object value) { + boolean removeIfSameInternal(Data key, Object value, NearCachingHook hook) { boolean unlockImmediately = !valueMap.containsKey(key); VersionedValue versionedValue = lockAndGet(key, tx.getTimeoutMillis()); if (!isEquals(versionedValue.value, value)) { @@ -292,8 +293,7 @@ boolean removeIfSameInternal(Data key, Object value) { return false; } tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), - operationProvider.createTxnDeleteOperation(name, key, versionedValue.version), - versionedValue.version, tx.getOwnerUuid())); + operationProvider.createTxnDeleteOperation(name, key, versionedValue.version), tx.getOwnerUuid(), hook)); return true; } @@ -313,7 +313,8 @@ private void unlock(Data key, VersionedValue versionedValue) { private void addUnlockTransactionRecord(Data key, long version) { TxnUnlockOperation operation = new TxnUnlockOperation(name, key, version); - tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), operation, version, tx.getOwnerUuid())); + tx.add(new MapTransactionLogRecord(name, key, getPartitionId(key), + operation, tx.getOwnerUuid(), NearCachingHook.EMPTY_HOOK)); } /** @@ -354,4 +355,28 @@ private VersionedValue lockAndGet(Data key, long timeout, boolean shouldLoad) { private static long getTimeInMillis(long time, TimeUnit timeunit) { return timeunit != null ? timeunit.toMillis(time) : time; } + + protected NearCachingHook newNearCachingHook() { + return nearCacheEnabled ? new InvalidationHook() : NearCachingHook.EMPTY_HOOK; + } + + private class InvalidationHook implements NearCachingHook { + + private Object nearCacheKey; + + @Override + public void beforeRemoteCall(Object key, Data keyData, Object value, Data valueData) { + nearCacheKey = serializeKeys ? keyData : key; + } + + @Override + public void onRemoteCallSuccess() { + invalidateNearCache(nearCacheKey); + } + + @Override + public void onRemoteCallFailure() { + invalidateNearCache(nearCacheKey); + } + } } diff --git a/hazelcast/src/main/java/com/hazelcast/multimap/impl/txn/MultiMapTransactionLogRecord.java b/hazelcast/src/main/java/com/hazelcast/multimap/impl/txn/MultiMapTransactionLogRecord.java index 352a6b37f6bb9..45728105ad2a6 100644 --- a/hazelcast/src/main/java/com/hazelcast/multimap/impl/txn/MultiMapTransactionLogRecord.java +++ b/hazelcast/src/main/java/com/hazelcast/multimap/impl/txn/MultiMapTransactionLogRecord.java @@ -60,6 +60,16 @@ public Operation newCommitOperation() { return new TxnCommitOperation(partitionId, name, key, threadId, opList); } + @Override + public void onCommitSuccess() { + // NOP + } + + @Override + public void onCommitFailure() { + // NOP + } + @Override public Operation newRollbackOperation() { return new TxnRollbackOperation(partitionId, name, key, threadId); diff --git a/hazelcast/src/main/java/com/hazelcast/transaction/impl/TransactionImpl.java b/hazelcast/src/main/java/com/hazelcast/transaction/impl/TransactionImpl.java index 3f842d1fcc311..e8a3c1ddfdc05 100644 --- a/hazelcast/src/main/java/com/hazelcast/transaction/impl/TransactionImpl.java +++ b/hazelcast/src/main/java/com/hazelcast/transaction/impl/TransactionImpl.java @@ -291,9 +291,11 @@ public void commit() throws TransactionException, IllegalStateException { List futures = transactionLog.commit(nodeEngine); waitWithDeadline(futures, Long.MAX_VALUE, MILLISECONDS, RETHROW_TRANSACTION_EXCEPTION); state = COMMITTED; + transactionLog.onCommitSuccess(); transactionManagerService.commitCount.inc(); } catch (Throwable e) { state = COMMIT_FAILED; + transactionLog.onCommitFailure(); throw rethrow(e, TransactionException.class); } finally { purgeBackupLogs(); diff --git a/hazelcast/src/main/java/com/hazelcast/transaction/impl/TransactionLog.java b/hazelcast/src/main/java/com/hazelcast/transaction/impl/TransactionLog.java index f9ec2869f9760..9439c3d020de4 100644 --- a/hazelcast/src/main/java/com/hazelcast/transaction/impl/TransactionLog.java +++ b/hazelcast/src/main/java/com/hazelcast/transaction/impl/TransactionLog.java @@ -105,6 +105,18 @@ public List commit(NodeEngine nodeEngine) { return futures; } + public void onCommitSuccess() { + for (TransactionLogRecord record : recordMap.values()) { + record.onCommitSuccess(); + } + } + + public void onCommitFailure() { + for (TransactionLogRecord record : recordMap.values()) { + record.onCommitFailure(); + } + } + public List prepare(NodeEngine nodeEngine) { List futures = new ArrayList(size()); for (TransactionLogRecord record : recordList) { diff --git a/hazelcast/src/main/java/com/hazelcast/transaction/impl/TransactionLogRecord.java b/hazelcast/src/main/java/com/hazelcast/transaction/impl/TransactionLogRecord.java index ddc8ef3a73646..a6efa81ff77f0 100644 --- a/hazelcast/src/main/java/com/hazelcast/transaction/impl/TransactionLogRecord.java +++ b/hazelcast/src/main/java/com/hazelcast/transaction/impl/TransactionLogRecord.java @@ -43,5 +43,9 @@ public interface TransactionLogRecord extends IdentifiedDataSerializable { Operation newCommitOperation(); + void onCommitSuccess(); + + void onCommitFailure(); + Operation newRollbackOperation(); } diff --git a/hazelcast/src/main/java/com/hazelcast/transaction/impl/xa/XATransaction.java b/hazelcast/src/main/java/com/hazelcast/transaction/impl/xa/XATransaction.java index 4b172adbb58f9..c7bca18fea0b6 100644 --- a/hazelcast/src/main/java/com/hazelcast/transaction/impl/xa/XATransaction.java +++ b/hazelcast/src/main/java/com/hazelcast/transaction/impl/xa/XATransaction.java @@ -174,18 +174,41 @@ public void commit() throws TransactionException, IllegalStateException { } } - public void commitAsync(ExecutionCallback callback) { + public void commitAsync(final ExecutionCallback callback) { if (state != PREPARED) { throw new IllegalStateException("Transaction is not prepared"); } checkTimeout(); state = COMMITTING; - transactionLog.commitAsync(nodeEngine, callback); + + transactionLog.commitAsync(nodeEngine, wrapExecutionCallback(callback)); // We should rethrow exception if transaction is not TWO_PHASE state = COMMITTED; } + private ExecutionCallback wrapExecutionCallback(final ExecutionCallback callback) { + return new ExecutionCallback() { + @Override + public void onResponse(Object response) { + try { + callback.onResponse(response); + } finally { + transactionLog.onCommitSuccess(); + } + } + + @Override + public void onFailure(Throwable t) { + try { + callback.onFailure(t); + } finally { + transactionLog.onCommitFailure(); + } + } + }; + } + @Override public void rollback() throws IllegalStateException { if (state == NO_TXN || state == ROLLED_BACK) { diff --git a/hazelcast/src/test/java/com/hazelcast/map/impl/tx/MapTransactionStressTest.java b/hazelcast/src/test/java/com/hazelcast/map/impl/tx/MapTransactionStressTest.java index 3a2b4d04f1221..6aebc7e881ebc 100644 --- a/hazelcast/src/test/java/com/hazelcast/map/impl/tx/MapTransactionStressTest.java +++ b/hazelcast/src/test/java/com/hazelcast/map/impl/tx/MapTransactionStressTest.java @@ -357,6 +357,16 @@ public void run() throws Exception { }; } + @Override + public void onCommitSuccess() { + // NOP + } + + @Override + public void onCommitFailure() { + // NOP + } + @Override public Operation newRollbackOperation() { return newEmptyOperation(); diff --git a/hazelcast/src/test/java/com/hazelcast/map/impl/tx/TxnMapNearCacheInvalidationTest.java b/hazelcast/src/test/java/com/hazelcast/map/impl/tx/TxnMapNearCacheInvalidationTest.java index 6d81f26e07611..acbe52a16d422 100644 --- a/hazelcast/src/test/java/com/hazelcast/map/impl/tx/TxnMapNearCacheInvalidationTest.java +++ b/hazelcast/src/test/java/com/hazelcast/map/impl/tx/TxnMapNearCacheInvalidationTest.java @@ -18,6 +18,7 @@ import com.hazelcast.config.Config; import com.hazelcast.config.InMemoryFormat; +import com.hazelcast.config.MapConfig; import com.hazelcast.config.NearCacheConfig; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IMap; @@ -27,7 +28,9 @@ import com.hazelcast.test.TestHazelcastInstanceFactory; import com.hazelcast.test.annotation.ParallelTest; import com.hazelcast.test.annotation.QuickTest; +import com.hazelcast.transaction.TransactionContext; import com.hazelcast.transaction.TransactionException; +import com.hazelcast.transaction.TransactionOptions; import com.hazelcast.transaction.TransactionalTask; import com.hazelcast.transaction.TransactionalTaskContext; import org.junit.Test; @@ -76,7 +79,60 @@ public static Collection parameters() { } @Test - public void txn_map_contains_newly_put_key_even_it_is_null_cached_after_addition() throws Exception { + public void after_txn_commit_near_cache_should_be_invalidated() { + Config cfg = getConfig(); + String mapName = "cache"; + MapConfig cacheConfig = cfg.getMapConfig(mapName); + NearCacheConfig nearCacheConfig = new NearCacheConfig(); + nearCacheConfig.setInvalidateOnChange(true) + .setCacheLocalEntries(true) + .setSerializeKeys(serializeKeys) + .setInMemoryFormat(inMemoryFormat); + + cacheConfig.setNearCacheConfig(nearCacheConfig); + + HazelcastInstance server = createHazelcastInstance(cfg); + IMap map = server.getMap(mapName); + + String key = "key"; + String oldValue = "oldValue"; + String updatedValue = "updatedValue"; + + // populate imap + map.put(key, oldValue); + + // populate near cache + Object valueReadBeforeTxnFromNonTxnMap = map.get(key); + + // begin txn + TransactionOptions opts = new TransactionOptions(); + opts.setTransactionType(TransactionOptions.TransactionType.TWO_PHASE); + TransactionContext ctx = server.newTransactionContext(opts); + ctx.beginTransaction(); + + TransactionalMap txnMap = ctx.getMap(mapName); + Object valueReadInsideTxnFromTxnMapBeforeUpdate = txnMap.get(key); + + txnMap.put(key, updatedValue); + + Object valueReadInsideTxnFromTxnMapAfterUpdate = txnMap.get(key); + Object valueReadInsideTxnFromNonTxnMapAfterUpdate = map.get(key); + + ctx.commitTransaction(); + + // check values read from txn map + assertEquals(oldValue, valueReadInsideTxnFromTxnMapBeforeUpdate); + assertEquals(updatedValue, valueReadInsideTxnFromTxnMapAfterUpdate); + + // check values read from non-txn map + assertEquals(oldValue, valueReadBeforeTxnFromNonTxnMap); + assertEquals(oldValue, valueReadInsideTxnFromNonTxnMapAfterUpdate); + Object valueReadAfterTxnFromNonTxnMap = map.get(key); + assertEquals(updatedValue, valueReadAfterTxnFromNonTxnMap); + } + + @Test + public void txn_map_contains_newly_put_key_even_it_is_null_cached_after_addition() { final String mapName = "test"; final int key = 1; @@ -141,36 +197,43 @@ public void txn_put_invalidates_near_cache() throws Exception { } @Test - public void txn_putIfAbsent_invalidates_near_cache() throws Exception { + public void txn_putIfAbsent_invalidates_near_cache() throws + Exception { txn_invalidates_near_cache(PUT_IF_ABSENT); } @Test - public void txn_putTTL_invalidates_near_cache() throws Exception { + public void txn_putTTL_invalidates_near_cache() throws + Exception { txn_invalidates_near_cache(PUT_TTL); } @Test - public void txn_delete_invalidates_near_cache() throws Exception { + public void txn_delete_invalidates_near_cache() throws + Exception { txn_invalidates_near_cache(DELETE); } @Test - public void txn_replace_invalidates_near_cache() throws Exception { + public void txn_replace_invalidates_near_cache() throws + Exception { txn_invalidates_near_cache(REPLACE); } @Test - public void txn_replaceIfSame_invalidates_near_cache() throws Exception { + public void txn_replaceIfSame_invalidates_near_cache() throws + Exception { txn_invalidates_near_cache(REPLACE_IF_SAME); } @Test - public void txn_containsKey_sees_latest_value_after_delete() throws Exception { + public void txn_containsKey_sees_latest_value_after_delete() throws + Exception { txn_invalidates_near_cache(CONTAINS_KEY); } - private void txn_invalidates_near_cache(InvalidatorTxnOp invalidatorTxnTask) throws Exception { + private void txn_invalidates_near_cache(InvalidatorTxnOp + invalidatorTxnTask) throws Exception { final String mapName = "test"; final int numberOfEntries = 1000; diff --git a/hazelcast/src/test/java/com/hazelcast/transaction/impl/MockTransactionLogRecord.java b/hazelcast/src/test/java/com/hazelcast/transaction/impl/MockTransactionLogRecord.java index 305460e372345..87bc77c5a9d24 100644 --- a/hazelcast/src/test/java/com/hazelcast/transaction/impl/MockTransactionLogRecord.java +++ b/hazelcast/src/test/java/com/hazelcast/transaction/impl/MockTransactionLogRecord.java @@ -76,6 +76,16 @@ public Operation newCommitOperation() { return createOperation(failCommit); } + @Override + public void onCommitSuccess() { + // NOP + } + + @Override + public void onCommitFailure() { + // NOP + } + @Override public Operation newRollbackOperation() { rollbackCalled = true;