From 80cb20a6392417b163180da06672014872dea293 Mon Sep 17 00:00:00 2001 From: Mehmet Dogan Date: Fri, 27 Jul 2018 14:53:51 +0300 Subject: [PATCH] Transactional put/set operation should replicate result of interceptPut `TxnSetOperation` was not taking changes made by `MapInterceptor` into account while sending backups. --- .../map/impl/operation/BasePutOperation.java | 11 +- .../impl/operation/LegacyMergeOperation.java | 6 +- .../map/impl/tx/TxnSetOperation.java | 11 +- .../com/hazelcast/map/InterceptorTest.java | 108 ++++++++++++++---- 4 files changed, 98 insertions(+), 38 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/BasePutOperation.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/BasePutOperation.java index 6a7c401bb075..17c654db8942 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/BasePutOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/BasePutOperation.java @@ -95,12 +95,17 @@ public boolean shouldBackup() { @Override public Operation getBackupOperation() { - final Record record = recordStore.getRecord(dataKey); - final RecordInfo replicationInfo = buildRecordInfo(record); + Record record = recordStore.getRecord(dataKey); + RecordInfo replicationInfo = buildRecordInfo(record); if (isPostProcessing(recordStore)) { dataValue = mapServiceContext.toData(record.getValue()); } - return new PutBackupOperation(name, dataKey, dataValue, replicationInfo, putTransient); + return new PutBackupOperation(name, dataKey, dataValue, replicationInfo, shouldUnlockKeyOnBackup(), + putTransient, !canThisOpGenerateWANEvent()); + } + + protected boolean shouldUnlockKeyOnBackup() { + return false; } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/LegacyMergeOperation.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/LegacyMergeOperation.java index 52624c61f996..d12666a121f0 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/LegacyMergeOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/LegacyMergeOperation.java @@ -20,8 +20,6 @@ import com.hazelcast.core.EntryView; import com.hazelcast.map.impl.MapDataSerializerHook; import com.hazelcast.map.impl.record.Record; -import com.hazelcast.map.impl.record.RecordInfo; -import com.hazelcast.map.impl.record.Records; import com.hazelcast.map.merge.MapMergePolicy; import com.hazelcast.nio.ObjectDataInput; import com.hazelcast.nio.ObjectDataOutput; @@ -96,9 +94,7 @@ public Operation getBackupOperation() { if (dataValue == null) { return new RemoveBackupOperation(name, dataKey, false, disableWanReplicationEvent); } else { - final Record record = recordStore.getRecord(dataKey); - final RecordInfo replicationInfo = Records.buildRecordInfo(record); - return new PutBackupOperation(name, dataKey, dataValue, replicationInfo, false, false, disableWanReplicationEvent); + return super.getBackupOperation(); } } diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TxnSetOperation.java b/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TxnSetOperation.java index 690a5f4300b2..adcc981d8423 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TxnSetOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/tx/TxnSetOperation.java @@ -20,15 +20,11 @@ import com.hazelcast.map.impl.MapDataSerializerHook; import com.hazelcast.map.impl.MapService; import com.hazelcast.map.impl.operation.BasePutOperation; -import com.hazelcast.map.impl.operation.PutBackupOperation; import com.hazelcast.map.impl.record.Record; -import com.hazelcast.map.impl.record.RecordInfo; -import com.hazelcast.map.impl.record.Records; import com.hazelcast.nio.ObjectDataInput; import com.hazelcast.nio.ObjectDataOutput; import com.hazelcast.nio.serialization.Data; import com.hazelcast.spi.EventService; -import com.hazelcast.spi.Operation; import com.hazelcast.spi.WaitNotifyKey; import com.hazelcast.spi.impl.MutatingOperation; import com.hazelcast.transaction.TransactionException; @@ -112,10 +108,9 @@ public boolean shouldNotify() { return true; } - public Operation getBackupOperation() { - final Record record = recordStore.getRecord(dataKey); - final RecordInfo replicationInfo = record != null ? Records.buildRecordInfo(record) : null; - return new PutBackupOperation(name, dataKey, dataValue, replicationInfo, true, false); + @Override + protected boolean shouldUnlockKeyOnBackup() { + return true; } public void onWaitExpire() { diff --git a/hazelcast/src/test/java/com/hazelcast/map/InterceptorTest.java b/hazelcast/src/test/java/com/hazelcast/map/InterceptorTest.java index ec937e9c172f..907bd41793ce 100644 --- a/hazelcast/src/test/java/com/hazelcast/map/InterceptorTest.java +++ b/hazelcast/src/test/java/com/hazelcast/map/InterceptorTest.java @@ -22,6 +22,7 @@ import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IMap; import com.hazelcast.core.MapLoader; +import com.hazelcast.core.TransactionalMap; import com.hazelcast.map.listener.EntryAddedListener; import com.hazelcast.test.AssertTask; import com.hazelcast.test.HazelcastParallelClassRunner; @@ -29,12 +30,14 @@ import com.hazelcast.test.TestHazelcastInstanceFactory; import com.hazelcast.test.annotation.ParallelTest; import com.hazelcast.test.annotation.QuickTest; +import com.hazelcast.transaction.TransactionException; +import com.hazelcast.transaction.TransactionalTask; +import com.hazelcast.transaction.TransactionalTaskContext; import com.hazelcast.util.StringUtil; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import java.io.Serializable; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -106,7 +109,7 @@ public void testMapInterceptorOnNewMember() throws InterruptedException { for (int i = 0; i < 100; i++) { map.put(i, i); } - map.addInterceptor(new NegativeInterceptor()); + map.addInterceptor(new NegativeGetInterceptor()); for (int i = 0; i < 100; i++) { assertEquals(i * -1, map.get(i)); } @@ -215,6 +218,63 @@ public void run() throws Exception { }, 15); } + @Test + public void testInterceptPut_replicatedToBackups() { + String name = randomString(); + TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory(); + + Config config = getConfig(); + HazelcastInstance hz1 = factory.newHazelcastInstance(config); + HazelcastInstance hz2 = factory.newHazelcastInstance(config); + + IMap map = hz2.getMap(name); + map.addInterceptor(new NegativePutInterceptor()); + + int count = 1000; + for (int i = 1; i <= count; i++) { + map.set(i, i); + } + waitAllForSafeState(hz1, hz2); + + hz1.getLifecycleService().terminate(); + + for (int i = 1; i <= count; i++) { + assertEquals(-i, map.get(i)); + } + } + + @Test + public void testInterceptPut_replicatedToBackups_usingTransactions() { + final String name = randomString(); + TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory(); + + Config config = getConfig(); + HazelcastInstance hz1 = factory.newHazelcastInstance(config); + HazelcastInstance hz2 = factory.newHazelcastInstance(config); + + IMap map = hz2.getMap(name); + map.addInterceptor(new NegativePutInterceptor()); + + final int count = 1000; + hz2.executeTransaction(new TransactionalTask() { + @Override + public Object execute(TransactionalTaskContext context) throws TransactionException { + TransactionalMap txMap = context.getMap(name); + for (int i = 1; i <= count; i++) { + txMap.set(i, i); + } + return null; + } + }); + waitAllForSafeState(hz1, hz2); + + hz1.getLifecycleService().terminate(); + + for (int i = 1; i <= count; i++) { + assertEquals(-i, map.get(i)); + } + } + static class DummyLoader implements MapLoader { @Override public String load(Integer key) { @@ -263,14 +323,11 @@ public void entryAdded(EntryEvent event) { } } - public static class SimpleInterceptor implements MapInterceptor, Serializable { + static class MapInterceptorAdaptor implements MapInterceptor { @Override public Object interceptGet(Object value) { - if (value == null) { - return null; - } - return value + ":"; + return value; } @Override @@ -279,7 +336,7 @@ public void afterGet(Object value) { @Override public Object interceptPut(Object oldValue, Object newValue) { - return newValue.toString().toUpperCase(StringUtil.LOCALE_INTERNAL); + return newValue; } @Override @@ -288,9 +345,6 @@ public void afterPut(Object value) { @Override public Object interceptRemove(Object removedValue) { - if (removedValue.equals("ISTANBUL")) { - throw new RuntimeException("you can not remove this"); - } return removedValue; } @@ -299,32 +353,42 @@ public void afterRemove(Object value) { } } - static class NegativeInterceptor implements MapInterceptor, Serializable { + public static class SimpleInterceptor extends MapInterceptorAdaptor { @Override public Object interceptGet(Object value) { - return ((Integer) value) * -1; - } - - @Override - public void afterGet(Object value) { + if (value == null) { + return null; + } + return value + ":"; } @Override public Object interceptPut(Object oldValue, Object newValue) { - return newValue; + return newValue.toString().toUpperCase(StringUtil.LOCALE_INTERNAL); } @Override - public void afterPut(Object value) { + public Object interceptRemove(Object removedValue) { + if (removedValue.equals("ISTANBUL")) { + throw new RuntimeException("you can not remove this"); + } + return removedValue; } + } + + static class NegativeGetInterceptor extends MapInterceptorAdaptor { @Override - public Object interceptRemove(Object removedValue) { - return removedValue; + public Object interceptGet(Object value) { + return ((Integer) value) * -1; } + } + + static class NegativePutInterceptor extends MapInterceptorAdaptor { @Override - public void afterRemove(Object value) { + public Object interceptPut(Object oldValue, Object newValue) { + return ((Integer) newValue) * -1; } } }