Skip to content

Commit

Permalink
Transactional put/set operation should replicate result of interceptPut
Browse files Browse the repository at this point in the history
`TxnSetOperation` was not taking changes made by `MapInterceptor` into account
while sending backups.
  • Loading branch information
mdogan committed Sep 10, 2018
1 parent af34a2c commit 80cb20a
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 38 deletions.
Expand Up @@ -95,12 +95,17 @@ public boolean shouldBackup() {

@Override
public Operation getBackupOperation() {
final Record record = recordStore.getRecord(dataKey);
final RecordInfo replicationInfo = buildRecordInfo(record);
Record record = recordStore.getRecord(dataKey);
RecordInfo replicationInfo = buildRecordInfo(record);
if (isPostProcessing(recordStore)) {
dataValue = mapServiceContext.toData(record.getValue());
}
return new PutBackupOperation(name, dataKey, dataValue, replicationInfo, putTransient);
return new PutBackupOperation(name, dataKey, dataValue, replicationInfo, shouldUnlockKeyOnBackup(),
putTransient, !canThisOpGenerateWANEvent());
}

protected boolean shouldUnlockKeyOnBackup() {
return false;
}

@Override
Expand Down
Expand Up @@ -20,8 +20,6 @@
import com.hazelcast.core.EntryView;
import com.hazelcast.map.impl.MapDataSerializerHook;
import com.hazelcast.map.impl.record.Record;
import com.hazelcast.map.impl.record.RecordInfo;
import com.hazelcast.map.impl.record.Records;
import com.hazelcast.map.merge.MapMergePolicy;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
Expand Down Expand Up @@ -96,9 +94,7 @@ public Operation getBackupOperation() {
if (dataValue == null) {
return new RemoveBackupOperation(name, dataKey, false, disableWanReplicationEvent);
} else {
final Record record = recordStore.getRecord(dataKey);
final RecordInfo replicationInfo = Records.buildRecordInfo(record);
return new PutBackupOperation(name, dataKey, dataValue, replicationInfo, false, false, disableWanReplicationEvent);
return super.getBackupOperation();
}
}

Expand Down
Expand Up @@ -20,15 +20,11 @@
import com.hazelcast.map.impl.MapDataSerializerHook;
import com.hazelcast.map.impl.MapService;
import com.hazelcast.map.impl.operation.BasePutOperation;
import com.hazelcast.map.impl.operation.PutBackupOperation;
import com.hazelcast.map.impl.record.Record;
import com.hazelcast.map.impl.record.RecordInfo;
import com.hazelcast.map.impl.record.Records;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.EventService;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.WaitNotifyKey;
import com.hazelcast.spi.impl.MutatingOperation;
import com.hazelcast.transaction.TransactionException;
Expand Down Expand Up @@ -112,10 +108,9 @@ public boolean shouldNotify() {
return true;
}

public Operation getBackupOperation() {
final Record record = recordStore.getRecord(dataKey);
final RecordInfo replicationInfo = record != null ? Records.buildRecordInfo(record) : null;
return new PutBackupOperation(name, dataKey, dataValue, replicationInfo, true, false);
@Override
protected boolean shouldUnlockKeyOnBackup() {
return true;
}

public void onWaitExpire() {
Expand Down
108 changes: 86 additions & 22 deletions hazelcast/src/test/java/com/hazelcast/map/InterceptorTest.java
Expand Up @@ -22,19 +22,22 @@
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.core.MapLoader;
import com.hazelcast.core.TransactionalMap;
import com.hazelcast.map.listener.EntryAddedListener;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.TestHazelcastInstanceFactory;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.transaction.TransactionException;
import com.hazelcast.transaction.TransactionalTask;
import com.hazelcast.transaction.TransactionalTaskContext;
import com.hazelcast.util.StringUtil;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -106,7 +109,7 @@ public void testMapInterceptorOnNewMember() throws InterruptedException {
for (int i = 0; i < 100; i++) {
map.put(i, i);
}
map.addInterceptor(new NegativeInterceptor());
map.addInterceptor(new NegativeGetInterceptor());
for (int i = 0; i < 100; i++) {
assertEquals(i * -1, map.get(i));
}
Expand Down Expand Up @@ -215,6 +218,63 @@ public void run() throws Exception {
}, 15);
}

@Test
public void testInterceptPut_replicatedToBackups() {
String name = randomString();
TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory();

Config config = getConfig();
HazelcastInstance hz1 = factory.newHazelcastInstance(config);
HazelcastInstance hz2 = factory.newHazelcastInstance(config);

IMap<Object, Object> map = hz2.getMap(name);
map.addInterceptor(new NegativePutInterceptor());

int count = 1000;
for (int i = 1; i <= count; i++) {
map.set(i, i);
}
waitAllForSafeState(hz1, hz2);

hz1.getLifecycleService().terminate();

for (int i = 1; i <= count; i++) {
assertEquals(-i, map.get(i));
}
}

@Test
public void testInterceptPut_replicatedToBackups_usingTransactions() {
final String name = randomString();
TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory();

Config config = getConfig();
HazelcastInstance hz1 = factory.newHazelcastInstance(config);
HazelcastInstance hz2 = factory.newHazelcastInstance(config);

IMap<Object, Object> map = hz2.getMap(name);
map.addInterceptor(new NegativePutInterceptor());

final int count = 1000;
hz2.executeTransaction(new TransactionalTask<Object>() {
@Override
public Object execute(TransactionalTaskContext context) throws TransactionException {
TransactionalMap<Object, Object> txMap = context.getMap(name);
for (int i = 1; i <= count; i++) {
txMap.set(i, i);
}
return null;
}
});
waitAllForSafeState(hz1, hz2);

hz1.getLifecycleService().terminate();

for (int i = 1; i <= count; i++) {
assertEquals(-i, map.get(i));
}
}

static class DummyLoader implements MapLoader<Integer, String> {
@Override
public String load(Integer key) {
Expand Down Expand Up @@ -263,14 +323,11 @@ public void entryAdded(EntryEvent<Integer, String> event) {
}
}

public static class SimpleInterceptor implements MapInterceptor, Serializable {
static class MapInterceptorAdaptor implements MapInterceptor {

@Override
public Object interceptGet(Object value) {
if (value == null) {
return null;
}
return value + ":";
return value;
}

@Override
Expand All @@ -279,7 +336,7 @@ public void afterGet(Object value) {

@Override
public Object interceptPut(Object oldValue, Object newValue) {
return newValue.toString().toUpperCase(StringUtil.LOCALE_INTERNAL);
return newValue;
}

@Override
Expand All @@ -288,9 +345,6 @@ public void afterPut(Object value) {

@Override
public Object interceptRemove(Object removedValue) {
if (removedValue.equals("ISTANBUL")) {
throw new RuntimeException("you can not remove this");
}
return removedValue;
}

Expand All @@ -299,32 +353,42 @@ public void afterRemove(Object value) {
}
}

static class NegativeInterceptor implements MapInterceptor, Serializable {
public static class SimpleInterceptor extends MapInterceptorAdaptor {
@Override
public Object interceptGet(Object value) {
return ((Integer) value) * -1;
}

@Override
public void afterGet(Object value) {
if (value == null) {
return null;
}
return value + ":";
}

@Override
public Object interceptPut(Object oldValue, Object newValue) {
return newValue;
return newValue.toString().toUpperCase(StringUtil.LOCALE_INTERNAL);
}

@Override
public void afterPut(Object value) {
public Object interceptRemove(Object removedValue) {
if (removedValue.equals("ISTANBUL")) {
throw new RuntimeException("you can not remove this");
}
return removedValue;
}
}

static class NegativeGetInterceptor extends MapInterceptorAdaptor {

@Override
public Object interceptRemove(Object removedValue) {
return removedValue;
public Object interceptGet(Object value) {
return ((Integer) value) * -1;
}
}

static class NegativePutInterceptor extends MapInterceptorAdaptor {

@Override
public void afterRemove(Object value) {
public Object interceptPut(Object oldValue, Object newValue) {
return ((Integer) newValue) * -1;
}
}
}

0 comments on commit 80cb20a

Please sign in to comment.