Skip to content

Commit

Permalink
Fix handling of predicates for ReplicatedMap (#18650)
Browse files Browse the repository at this point in the history
ReplicatedMap was passing null extractors while constructing the QueryEntry
which was cauising NullPointerExceptions while evaluating predicates
that has an attribute path.

Also, refactored the ReplicatedMapListenerTests to reduce test duplication.
Note that, test logic is not altered.
  • Loading branch information
mdumandag committed May 7, 2021
1 parent ae52511 commit e22aab3
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 377 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.hazelcast.cluster.Address;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.query.impl.QueryEntry;
import com.hazelcast.query.impl.getters.Extractors;
import com.hazelcast.replicatedmap.ReplicatedMapCantBeCreatedOnLiteMemberException;
import com.hazelcast.replicatedmap.impl.record.AbstractReplicatedRecordStore;
import com.hazelcast.replicatedmap.impl.record.ReplicatedQueryEventFilter;
Expand Down Expand Up @@ -68,12 +69,16 @@ public class ReplicatedMapEventPublishingService
private final NodeEngine nodeEngine;
private final Config config;
private final EventService eventService;
private final InternalSerializationService serializationService;
private final Extractors extractors;

public ReplicatedMapEventPublishingService(ReplicatedMapService replicatedMapService) {
this.replicatedMapService = replicatedMapService;
this.nodeEngine = replicatedMapService.getNodeEngine();
this.config = nodeEngine.getConfig();
this.eventService = nodeEngine.getEventService();
this.serializationService = (InternalSerializationService) nodeEngine.getSerializationService();
this.extractors = Extractors.newBuilder(this.serializationService).build();
}

@Override
Expand Down Expand Up @@ -213,9 +218,7 @@ private boolean shouldPublish(Data key, Data oldValue, Data value, EntryEventTyp
} else {
testValue = value;
}
InternalSerializationService serializationService
= (InternalSerializationService) nodeEngine.getSerializationService();
queryEntry = new QueryEntry(serializationService, key, testValue, null);
queryEntry = new QueryEntry(serializationService, key, testValue, extractors);
}
return filter == null || filter.eval(queryEntry != null ? queryEntry : key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,133 +21,37 @@
import com.hazelcast.config.InMemoryFormat;
import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.MapEvent;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.DataSerializable;
import com.hazelcast.query.Predicates;
import com.hazelcast.replicatedmap.AbstractReplicatedMapListenerTest;
import com.hazelcast.replicatedmap.ReplicatedMap;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.Assert.assertEquals;

@Category(QuickTest.class)
@RunWith(HazelcastParallelClassRunner.class)
public class ClientReplicatedMapListenerTest extends HazelcastTestSupport {
public class ClientReplicatedMapListenerTest extends AbstractReplicatedMapListenerTest {

private TestHazelcastFactory factory = new TestHazelcastFactory();
private final TestHazelcastFactory factory = new TestHazelcastFactory();

@After
public void tearDown() {
factory.terminateAll();
}

@Test
public void testEntryAdded() {
ReplicatedMap<Object, Object> replicatedMap = createClusterAndGetRandomReplicatedMap();
final EventCountingListener listener = new EventCountingListener();
replicatedMap.addEntryListener(listener);
replicatedMap.put(1, 1);
assertTrueEventually(new AssertTask() {
@Override
public void run() {
assertEquals(1, listener.addCount.get());
}
});
}

@Test
public void testEntryUpdated() {
ReplicatedMap<Object, Object> replicatedMap = createClusterAndGetRandomReplicatedMap();
final EventCountingListener listener = new EventCountingListener();
replicatedMap.addEntryListener(listener);
replicatedMap.put(1, 1);
replicatedMap.put(1, 2);
assertTrueEventually(new AssertTask() {
@Override
public void run() {
assertEquals(1, listener.updateCount.get());
}
});
}

@Test
public void testEntryRemoved() {
ReplicatedMap<Object, Object> replicatedMap = createClusterAndGetRandomReplicatedMap();
final EventCountingListener listener = new EventCountingListener();
replicatedMap.addEntryListener(listener);
replicatedMap.put(1, 1);
replicatedMap.remove(1);
assertTrueEventually(new AssertTask() {
@Override
public void run() {
assertEquals(1, listener.removeCount.get());
}
});
}

@Test
public void testMapClear() {
ReplicatedMap<Object, Object> replicatedMap = createClusterAndGetRandomReplicatedMap();
final EventCountingListener listener = new EventCountingListener();
replicatedMap.addEntryListener(listener);
replicatedMap.put(1, 1);
replicatedMap.clear();
assertTrueEventually(new AssertTask() {
@Override
public void run() {
assertEquals(1, listener.mapClearCount.get());
}
});
}

@Test
public void testListenToKeyForEntryAdded() {
ReplicatedMap<Object, Object> replicatedMap = createClusterAndGetRandomReplicatedMap();
final EventCountingListener listener = new EventCountingListener();
replicatedMap.addEntryListener(listener, 1);
replicatedMap.put(1, 1);
replicatedMap.put(2, 2);
assertTrueEventually(new AssertTask() {
@Override
public void run() {
assertEquals(1, listener.keys.size());
assertEquals(1, listener.keys.peek());
assertEquals(1, listener.addCount.get());
}
});
}

@Test
@SuppressWarnings("unchecked")
public void testListenWithPredicate() {
ReplicatedMap<Object, Object> replicatedMap = createClusterAndGetRandomReplicatedMap();
final EventCountingListener listener = new EventCountingListener();
replicatedMap.addEntryListener(listener, Predicates.alwaysFalse());
replicatedMap.put(2, 2);
assertTrueFiveSeconds(new AssertTask() {
@Override
public void run() {
assertEquals(0, listener.addCount.get());
}
});
}

@Test
public void no_key_value_deserialization_on_server_when_in_memory_format_is_binary() {
final CountDownLatch eventReceivedLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -193,76 +97,10 @@ public void readData(ObjectDataInput in) throws IOException {
}
}

private <K, V> ReplicatedMap<K, V> createClusterAndGetRandomReplicatedMap() {
protected <K, V> ReplicatedMap<K, V> createClusterAndGetRandomReplicatedMap() {
factory.newHazelcastInstance();
HazelcastInstance client = factory.newHazelcastClient();
String mapName = randomMapName();
return client.getReplicatedMap(mapName);
}

public class EventCountingListener implements EntryListener<Object, Object> {

final ConcurrentLinkedQueue<Object> keys = new ConcurrentLinkedQueue<Object>();
final AtomicLong addCount = new AtomicLong();
final AtomicLong removeCount = new AtomicLong();
final AtomicLong updateCount = new AtomicLong();
final AtomicLong evictCount = new AtomicLong();
final AtomicLong expiryCount = new AtomicLong();
final AtomicLong mapClearCount = new AtomicLong();
final AtomicLong mapEvictCount = new AtomicLong();

EventCountingListener() {
}

@Override
public void entryAdded(EntryEvent<Object, Object> event) {
keys.add(event.getKey());
addCount.incrementAndGet();
}

@Override
public void entryRemoved(EntryEvent<Object, Object> event) {
keys.add(event.getKey());
removeCount.incrementAndGet();
}

@Override
public void entryUpdated(EntryEvent<Object, Object> event) {
keys.add(event.getKey());
updateCount.incrementAndGet();
}

@Override
public void entryEvicted(EntryEvent<Object, Object> event) {
keys.add(event.getKey());
evictCount.incrementAndGet();
}

@Override
public void entryExpired(EntryEvent<Object, Object> event) {
throw new UnsupportedOperationException("Expired event is not published by replicated map");
}

@Override
public void mapEvicted(MapEvent event) {
mapEvictCount.incrementAndGet();
}

@Override
public void mapCleared(MapEvent event) {
mapClearCount.incrementAndGet();
}

@Override
public String toString() {
return "EventCountingListener{"
+ "addCount=" + addCount
+ ", removeCount=" + removeCount
+ ", updateCount=" + updateCount
+ ", evictCount=" + evictCount
+ ", mapClearCount=" + mapClearCount
+ ", mapEvictCount=" + mapEvictCount
+ '}';
}
}
}

0 comments on commit e22aab3

Please sign in to comment.