Skip to content

Commit

Permalink
Merge pull request #13892 from taburet/fix/3.10.6/ee-issue-2277
Browse files Browse the repository at this point in the history
[BACKPORT] Fix EP index queries consistency
  • Loading branch information
jerrinot committed Oct 4, 2018
2 parents ecd6af7 + 1693899 commit 1fd5a09
Show file tree
Hide file tree
Showing 8 changed files with 311 additions and 85 deletions.
@@ -0,0 +1,108 @@
/*
* Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.client.map;

import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.query.SampleTestObjects.Employee;
import com.hazelcast.query.SqlPredicate;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.SlowTest;
import com.hazelcast.test.bounce.BounceMemberRule;
import com.hazelcast.test.jitter.JitterRule;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import static java.util.concurrent.TimeUnit.MINUTES;
import static org.junit.Assert.assertEquals;

/**
* Test removing entries by query from Hazelcast clients while members are
* shutting down and joining.
*/
@RunWith(HazelcastSerialClassRunner.class)
@Category(SlowTest.class)
public class ClientPutAllRemoveBounceTest extends HazelcastTestSupport {
private static final String TEST_MAP_NAME = "employees";
private static final int CONCURRENCY = 1;

@Rule
public BounceMemberRule bounceMemberRule = BounceMemberRule.with(getConfig()).clusterSize(4).driverCount(4).build();

@Before
public void setup() {
bounceMemberRule.getSteadyMember().getMap(TEST_MAP_NAME).addIndex("id", true);
}

@Test
public void testQuery() {
QueryRunnable[] testTasks = new QueryRunnable[CONCURRENCY];
for (int i = 0; i < CONCURRENCY; i++) {
testTasks[i] = new QueryRunnable(bounceMemberRule.getNextTestDriver());
}
bounceMemberRule.testRepeatedly(testTasks, MINUTES.toSeconds(1));
}

@After
public void assertMapEmpty() {
IMap<Integer, Employee> map = bounceMemberRule.getSteadyMember().getMap(TEST_MAP_NAME);
assertEquals("Map is not empty ", 0, map.size());
}

protected Config getConfig() {
return smallInstanceConfig();
}

public static class QueryRunnable implements Runnable {
private final HazelcastInstance hazelcastInstance;
private final Random random = new Random();
private IMap<Integer, Employee> map;
int range = 10;
int keyDomain = Integer.MAX_VALUE;

QueryRunnable(HazelcastInstance hazelcastInstance) {
this.hazelcastInstance = hazelcastInstance;
}

@Override
public void run() {
if (map == null) {
map = hazelcastInstance.getMap(TEST_MAP_NAME);
}
int min = random.nextInt(keyDomain - range);
int max = min + range;

Map<Integer, Employee> m = new HashMap<Integer, Employee>();
for (int i = min; i < max; i++) {
m.put(i, new Employee(i, "name" + i, i, true, i));
}

map.putAll(m);
map.removeAll(new SqlPredicate("id >= " + min + " and id < " + max));
}
}
}
Expand Up @@ -45,8 +45,6 @@ public class MultipleEntryOperation extends MapOperation
protected MapEntries responses;
protected EntryProcessor entryProcessor;

protected transient EntryOperator operator;

public MultipleEntryOperation() {
}

Expand All @@ -69,8 +67,11 @@ public void innerBeforeRun() throws Exception {
@SuppressWarnings("checkstyle:npathcomplexity")
public void run() throws Exception {
responses = new MapEntries(keys.size());
if (keys.isEmpty()) {
return;
}

operator = operator(this, entryProcessor, getPredicate());
EntryOperator operator = operator(this, entryProcessor, getPredicate());
for (Data key : keys) {
Data response = operator.operateOnKey(key).doPostOperateOps().getResult();
if (response != null) {
Expand Down
Expand Up @@ -16,24 +16,26 @@

package com.hazelcast.map.impl.operation;

import com.hazelcast.internal.partition.InternalPartitionService;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.map.impl.MapContainer;
import com.hazelcast.map.impl.MapDataSerializerHook;
import com.hazelcast.map.impl.MapService;
import com.hazelcast.map.impl.MapServiceContext;
import com.hazelcast.map.impl.query.Query;
import com.hazelcast.map.impl.query.QueryResult;
import com.hazelcast.map.impl.query.QueryResultRow;
import com.hazelcast.map.impl.query.QueryRunner;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.query.Predicate;
import com.hazelcast.query.TruePredicate;
import com.hazelcast.query.impl.Indexes;
import com.hazelcast.query.impl.QueryableEntry;
import com.hazelcast.query.impl.predicates.QueryOptimizer;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.impl.operationservice.impl.operations.PartitionAwareOperationFactory;
import com.hazelcast.spi.partition.IPartitionService;
import com.hazelcast.util.IterationType;
import com.hazelcast.util.collection.InflatableSet;
import com.hazelcast.util.collection.InflatableSet.Builder;
import com.hazelcast.util.collection.Int2ObjectHashMap;

import java.io.IOException;
Expand All @@ -44,59 +46,61 @@
import java.util.Set;

import static com.hazelcast.map.impl.MapService.SERVICE_NAME;
import static com.hazelcast.util.CollectionUtil.isEmpty;
import static com.hazelcast.util.CollectionUtil.toIntArray;
import static com.hazelcast.util.MapUtil.createInt2ObjectHashMap;
import static com.hazelcast.util.MapUtil.isNullOrEmpty;
import static com.hazelcast.util.collection.InflatableSet.newBuilder;
import static java.util.Collections.emptySet;

public class PartitionWideEntryWithPredicateOperationFactory extends PartitionAwareOperationFactory {

private String name;
private EntryProcessor entryProcessor;
private Predicate predicate;

/**
* Entry keys grouped by partition ID. This map is constructed from data
* fetched by querying the map from non-partition threads. Because of
* concurrent migrations, the query running on non-partition threads might
* fail. In this case, the map is {@code null}.
*/
private transient Map<Integer, List<Data>> partitionIdToKeysMap;

public PartitionWideEntryWithPredicateOperationFactory() {
}

public PartitionWideEntryWithPredicateOperationFactory(String name, EntryProcessor entryProcessor,
Predicate predicate, Map<Integer, List<Data>> partitionIdToKeysMap) {
this(name, entryProcessor, predicate);
this.partitionIdToKeysMap = partitionIdToKeysMap;
this.partitions = isNullOrEmpty(partitionIdToKeysMap) ? null : toIntArray(partitionIdToKeysMap.keySet());
}

public PartitionWideEntryWithPredicateOperationFactory(String name, EntryProcessor entryProcessor, Predicate predicate) {
this.name = name;
this.entryProcessor = entryProcessor;
this.predicate = predicate;
}

private PartitionWideEntryWithPredicateOperationFactory(String name, EntryProcessor entryProcessor, Predicate predicate,
Map<Integer, List<Data>> partitionIdToKeysMap) {
this(name, entryProcessor, predicate);
this.partitionIdToKeysMap = partitionIdToKeysMap;
}

@Override
public PartitionAwareOperationFactory createFactoryOnRunner(NodeEngine nodeEngine) {
Set<Data> keys = getKeysFromIndex(nodeEngine);
Map<Integer, List<Data>> partitionIdToKeysMap
= getPartitionIdToKeysMap(keys, ((InternalPartitionService) nodeEngine.getPartitionService()));
public PartitionAwareOperationFactory createFactoryOnRunner(NodeEngine nodeEngine, int[] partitions) {
Set<Data> keys = tryToObtainKeysFromIndexes(nodeEngine);
Map<Integer, List<Data>> partitionIdToKeysMap = groupKeysByPartition(keys, nodeEngine.getPartitionService(), partitions);

return new PartitionWideEntryWithPredicateOperationFactory(name, entryProcessor, predicate, partitionIdToKeysMap);
}

@Override
public Operation createPartitionOperation(int partition) {
if (isNullOrEmpty(partitionIdToKeysMap)) {
// fallback here if we cannot find anything from indexes.
if (partitionIdToKeysMap == null) {
// Index query failed to run because of ongoing migrations or we are
// creating an operation on the caller node.
return new PartitionWideEntryWithPredicateOperation(name, entryProcessor, predicate);
}

// index query succeeded
List<Data> keyList = partitionIdToKeysMap.get(partition);
InflatableSet<Data> keys = newBuilder(keyList).build();
assert keyList != null : "unexpected partition " + partition + ", expected partitions " + partitionIdToKeysMap.keySet();
Set<Data> keys = keyList.isEmpty() ? Collections.<Data>emptySet() : newBuilder(keyList).build();
return new MultipleEntryWithPredicateOperation(name, keys, entryProcessor, predicate);
}


@Override
public void writeData(ObjectDataOutput out) throws IOException {
out.writeUTF(name);
Expand All @@ -111,58 +115,74 @@ public void readData(ObjectDataInput in) throws IOException {
predicate = in.readObject();
}

private Set<Data> getKeysFromIndex(NodeEngine nodeEngine) {
/**
* Attempts to get keys by running an index query. This method may return
* {@code null} if there is an ongoing migration, which means that it is not
* safe to return results from a non-partition thread. The caller must then
* run a partition query to obtain the results.
*
* @param nodeEngine nodeEngine of this cluster node
* @return the set of keys or {@code null} if we failed to fetch the keys
* because of ongoing migrations
*/
private Set<Data> tryToObtainKeysFromIndexes(NodeEngine nodeEngine) {
// Do not use index in this case, because it requires full-table-scan.
if (predicate == TruePredicate.INSTANCE) {
return emptySet();
return null;
}

// get indexes
MapService mapService = nodeEngine.getService(SERVICE_NAME);
MapServiceContext mapServiceContext = mapService.getMapServiceContext();
Set<QueryableEntry> result = queryAllPartitions(mapServiceContext);

if (result == null) {
return emptySet();
QueryRunner runner = mapServiceContext.getMapQueryRunner(name);
Query query = Query.of().mapName(name).predicate(predicate).iterationType(IterationType.KEY).build();
final QueryResult result = (QueryResult) runner.runIndexQueryOnOwnedPartitions(query);
if (result.getPartitionIds() == null) {
// failed to run query because of ongoing migrations
return null;
}

List<Data> keys = null;
for (QueryableEntry e : result) {
if (keys == null) {
keys = new ArrayList<Data>(result.size());
}
keys.add(e.getKeyData());
final Builder<Data> setBuilder = InflatableSet.newBuilder(result.size());
for (QueryResultRow row : result.getRows()) {
setBuilder.add(row.getKey());
}

return keys == null ? Collections.<Data>emptySet() : newBuilder(keys).build();
return setBuilder.build();
}

private Set<QueryableEntry> queryAllPartitions(MapServiceContext mapServiceContext) {
QueryOptimizer queryOptimizer = mapServiceContext.getQueryOptimizer();
MapContainer mapContainer = mapServiceContext.getMapContainer(name);
Indexes indexes = mapContainer.getIndexes();
if (indexes != null) {
Predicate optimizedPredicate = queryOptimizer.optimize(predicate, indexes);
Set<QueryableEntry> querySet = indexes.query(optimizedPredicate);
return querySet;
} else {
throw new IllegalArgumentException("Partitioned index is not supported for on-heap usage");
private Map<Integer, List<Data>> groupKeysByPartition(Set<Data> keys, IPartitionService partitionService, int[] partitions) {
if (keys == null) {
return null;
}
}

private Map<Integer, List<Data>> getPartitionIdToKeysMap(Set<Data> keys, InternalPartitionService partitionService) {
if (isEmpty(keys)) {
return Collections.emptyMap();
// Even if the keys are successfully fetched from indexes, we need to
// filter them to exclude the keys belonging to partitions on which we
// weren't asked to operate on. Moreover, we need to include the
// partitions on which we were asked to operate on and for which we
// don't have any keys since this may indicate an out-migrated partition
// and we want OperationRunner to throw WrongTargetException to notify
// the caller about such situations.

// Using the type of Int2ObjectHashMap allows the get and put operations
// to avoid auto-boxing.
final Int2ObjectHashMap<List<Data>> partitionToKeys = createInt2ObjectHashMap(partitions.length);

// Pre-populate the map with the requested partitions to use it as a set
// to filter out possible unrequested partitions encountered among the
// fetched keys.
for (int partition : partitions) {
partitionToKeys.put(partition, Collections.<Data>emptyList());
}

final int roughSize = Math.min(partitionService.getPartitionCount(), keys.size());

//using the type of Int2ObjectHashMap allows the get and put operations to avoid auto-boxing
final Int2ObjectHashMap<List<Data>> partitionToKeys = createInt2ObjectHashMap(roughSize);
for (Data key : keys) {
int partitionId = partitionService.getPartitionId(key);
List<Data> keyList = partitionToKeys.get(partitionId);
if (keyList == null) {
// we weren't asked to run on this partition
continue;
}

if (keyList.isEmpty()) {
// we have a first key for this partition
keyList = new ArrayList<Data>();
partitionToKeys.put(partitionId, keyList);
}
Expand Down

0 comments on commit 1fd5a09

Please sign in to comment.