Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKPORT] Fix EP index queries consistency #13892

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,108 @@
/*
* Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.client.map;

import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.query.SampleTestObjects.Employee;
import com.hazelcast.query.SqlPredicate;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.SlowTest;
import com.hazelcast.test.bounce.BounceMemberRule;
import com.hazelcast.test.jitter.JitterRule;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import static java.util.concurrent.TimeUnit.MINUTES;
import static org.junit.Assert.assertEquals;

/**
* Test removing entries by query from Hazelcast clients while members are
* shutting down and joining.
*/
@RunWith(HazelcastSerialClassRunner.class)
@Category(SlowTest.class)
public class ClientPutAllRemoveBounceTest extends HazelcastTestSupport {
private static final String TEST_MAP_NAME = "employees";
private static final int CONCURRENCY = 1;

@Rule
public BounceMemberRule bounceMemberRule = BounceMemberRule.with(getConfig()).clusterSize(4).driverCount(4).build();

@Before
public void setup() {
bounceMemberRule.getSteadyMember().getMap(TEST_MAP_NAME).addIndex("id", true);
}

@Test
public void testQuery() {
QueryRunnable[] testTasks = new QueryRunnable[CONCURRENCY];
for (int i = 0; i < CONCURRENCY; i++) {
testTasks[i] = new QueryRunnable(bounceMemberRule.getNextTestDriver());
}
bounceMemberRule.testRepeatedly(testTasks, MINUTES.toSeconds(1));
}

@After
public void assertMapEmpty() {
IMap<Integer, Employee> map = bounceMemberRule.getSteadyMember().getMap(TEST_MAP_NAME);
assertEquals("Map is not empty ", 0, map.size());
}

protected Config getConfig() {
return smallInstanceConfig();
}

public static class QueryRunnable implements Runnable {
private final HazelcastInstance hazelcastInstance;
private final Random random = new Random();
private IMap<Integer, Employee> map;
int range = 10;
int keyDomain = Integer.MAX_VALUE;

QueryRunnable(HazelcastInstance hazelcastInstance) {
this.hazelcastInstance = hazelcastInstance;
}

@Override
public void run() {
if (map == null) {
map = hazelcastInstance.getMap(TEST_MAP_NAME);
}
int min = random.nextInt(keyDomain - range);
int max = min + range;

Map<Integer, Employee> m = new HashMap<Integer, Employee>();
for (int i = min; i < max; i++) {
m.put(i, new Employee(i, "name" + i, i, true, i));
}

map.putAll(m);
map.removeAll(new SqlPredicate("id >= " + min + " and id < " + max));
}
}
}
Expand Up @@ -45,8 +45,6 @@ public class MultipleEntryOperation extends MapOperation
protected MapEntries responses;
protected EntryProcessor entryProcessor;

protected transient EntryOperator operator;

public MultipleEntryOperation() {
}

Expand All @@ -69,8 +67,11 @@ public void innerBeforeRun() throws Exception {
@SuppressWarnings("checkstyle:npathcomplexity")
public void run() throws Exception {
responses = new MapEntries(keys.size());
if (keys.isEmpty()) {
return;
}

operator = operator(this, entryProcessor, getPredicate());
EntryOperator operator = operator(this, entryProcessor, getPredicate());
for (Data key : keys) {
Data response = operator.operateOnKey(key).doPostOperateOps().getResult();
if (response != null) {
Expand Down
Expand Up @@ -16,24 +16,26 @@

package com.hazelcast.map.impl.operation;

import com.hazelcast.internal.partition.InternalPartitionService;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.map.impl.MapContainer;
import com.hazelcast.map.impl.MapDataSerializerHook;
import com.hazelcast.map.impl.MapService;
import com.hazelcast.map.impl.MapServiceContext;
import com.hazelcast.map.impl.query.Query;
import com.hazelcast.map.impl.query.QueryResult;
import com.hazelcast.map.impl.query.QueryResultRow;
import com.hazelcast.map.impl.query.QueryRunner;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.query.Predicate;
import com.hazelcast.query.TruePredicate;
import com.hazelcast.query.impl.Indexes;
import com.hazelcast.query.impl.QueryableEntry;
import com.hazelcast.query.impl.predicates.QueryOptimizer;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.impl.operationservice.impl.operations.PartitionAwareOperationFactory;
import com.hazelcast.spi.partition.IPartitionService;
import com.hazelcast.util.IterationType;
import com.hazelcast.util.collection.InflatableSet;
import com.hazelcast.util.collection.InflatableSet.Builder;
import com.hazelcast.util.collection.Int2ObjectHashMap;

import java.io.IOException;
Expand All @@ -44,59 +46,61 @@
import java.util.Set;

import static com.hazelcast.map.impl.MapService.SERVICE_NAME;
import static com.hazelcast.util.CollectionUtil.isEmpty;
import static com.hazelcast.util.CollectionUtil.toIntArray;
import static com.hazelcast.util.MapUtil.createInt2ObjectHashMap;
import static com.hazelcast.util.MapUtil.isNullOrEmpty;
import static com.hazelcast.util.collection.InflatableSet.newBuilder;
import static java.util.Collections.emptySet;

public class PartitionWideEntryWithPredicateOperationFactory extends PartitionAwareOperationFactory {

private String name;
private EntryProcessor entryProcessor;
private Predicate predicate;

/**
* Entry keys grouped by partition ID. This map is constructed from data
* fetched by querying the map from non-partition threads. Because of
* concurrent migrations, the query running on non-partition threads might
* fail. In this case, the map is {@code null}.
*/
private transient Map<Integer, List<Data>> partitionIdToKeysMap;

public PartitionWideEntryWithPredicateOperationFactory() {
}

public PartitionWideEntryWithPredicateOperationFactory(String name, EntryProcessor entryProcessor,
Predicate predicate, Map<Integer, List<Data>> partitionIdToKeysMap) {
this(name, entryProcessor, predicate);
this.partitionIdToKeysMap = partitionIdToKeysMap;
this.partitions = isNullOrEmpty(partitionIdToKeysMap) ? null : toIntArray(partitionIdToKeysMap.keySet());
}

public PartitionWideEntryWithPredicateOperationFactory(String name, EntryProcessor entryProcessor, Predicate predicate) {
this.name = name;
this.entryProcessor = entryProcessor;
this.predicate = predicate;
}

private PartitionWideEntryWithPredicateOperationFactory(String name, EntryProcessor entryProcessor, Predicate predicate,
Map<Integer, List<Data>> partitionIdToKeysMap) {
this(name, entryProcessor, predicate);
this.partitionIdToKeysMap = partitionIdToKeysMap;
}

@Override
public PartitionAwareOperationFactory createFactoryOnRunner(NodeEngine nodeEngine) {
Set<Data> keys = getKeysFromIndex(nodeEngine);
Map<Integer, List<Data>> partitionIdToKeysMap
= getPartitionIdToKeysMap(keys, ((InternalPartitionService) nodeEngine.getPartitionService()));
public PartitionAwareOperationFactory createFactoryOnRunner(NodeEngine nodeEngine, int[] partitions) {
Set<Data> keys = tryToObtainKeysFromIndexes(nodeEngine);
Map<Integer, List<Data>> partitionIdToKeysMap = groupKeysByPartition(keys, nodeEngine.getPartitionService(), partitions);

return new PartitionWideEntryWithPredicateOperationFactory(name, entryProcessor, predicate, partitionIdToKeysMap);
}

@Override
public Operation createPartitionOperation(int partition) {
if (isNullOrEmpty(partitionIdToKeysMap)) {
// fallback here if we cannot find anything from indexes.
if (partitionIdToKeysMap == null) {
// Index query failed to run because of ongoing migrations or we are
// creating an operation on the caller node.
return new PartitionWideEntryWithPredicateOperation(name, entryProcessor, predicate);
}

// index query succeeded
List<Data> keyList = partitionIdToKeysMap.get(partition);
InflatableSet<Data> keys = newBuilder(keyList).build();
assert keyList != null : "unexpected partition " + partition + ", expected partitions " + partitionIdToKeysMap.keySet();
Set<Data> keys = keyList.isEmpty() ? Collections.<Data>emptySet() : newBuilder(keyList).build();
return new MultipleEntryWithPredicateOperation(name, keys, entryProcessor, predicate);
}


@Override
public void writeData(ObjectDataOutput out) throws IOException {
out.writeUTF(name);
Expand All @@ -111,58 +115,74 @@ public void readData(ObjectDataInput in) throws IOException {
predicate = in.readObject();
}

private Set<Data> getKeysFromIndex(NodeEngine nodeEngine) {
/**
* Attempts to get keys by running an index query. This method may return
* {@code null} if there is an ongoing migration, which means that it is not
* safe to return results from a non-partition thread. The caller must then
* run a partition query to obtain the results.
*
* @param nodeEngine nodeEngine of this cluster node
* @return the set of keys or {@code null} if we failed to fetch the keys
* because of ongoing migrations
*/
private Set<Data> tryToObtainKeysFromIndexes(NodeEngine nodeEngine) {
// Do not use index in this case, because it requires full-table-scan.
if (predicate == TruePredicate.INSTANCE) {
return emptySet();
return null;
}

// get indexes
MapService mapService = nodeEngine.getService(SERVICE_NAME);
MapServiceContext mapServiceContext = mapService.getMapServiceContext();
Set<QueryableEntry> result = queryAllPartitions(mapServiceContext);

if (result == null) {
return emptySet();
QueryRunner runner = mapServiceContext.getMapQueryRunner(name);
Query query = Query.of().mapName(name).predicate(predicate).iterationType(IterationType.KEY).build();
final QueryResult result = (QueryResult) runner.runIndexQueryOnOwnedPartitions(query);
if (result.getPartitionIds() == null) {
// failed to run query because of ongoing migrations
return null;
}

List<Data> keys = null;
for (QueryableEntry e : result) {
if (keys == null) {
keys = new ArrayList<Data>(result.size());
}
keys.add(e.getKeyData());
final Builder<Data> setBuilder = InflatableSet.newBuilder(result.size());
for (QueryResultRow row : result.getRows()) {
setBuilder.add(row.getKey());
}

return keys == null ? Collections.<Data>emptySet() : newBuilder(keys).build();
return setBuilder.build();
}

private Set<QueryableEntry> queryAllPartitions(MapServiceContext mapServiceContext) {
QueryOptimizer queryOptimizer = mapServiceContext.getQueryOptimizer();
MapContainer mapContainer = mapServiceContext.getMapContainer(name);
Indexes indexes = mapContainer.getIndexes();
if (indexes != null) {
Predicate optimizedPredicate = queryOptimizer.optimize(predicate, indexes);
Set<QueryableEntry> querySet = indexes.query(optimizedPredicate);
return querySet;
} else {
throw new IllegalArgumentException("Partitioned index is not supported for on-heap usage");
private Map<Integer, List<Data>> groupKeysByPartition(Set<Data> keys, IPartitionService partitionService, int[] partitions) {
if (keys == null) {
return null;
}
}

private Map<Integer, List<Data>> getPartitionIdToKeysMap(Set<Data> keys, InternalPartitionService partitionService) {
if (isEmpty(keys)) {
return Collections.emptyMap();
// Even if the keys are successfully fetched from indexes, we need to
// filter them to exclude the keys belonging to partitions on which we
// weren't asked to operate on. Moreover, we need to include the
// partitions on which we were asked to operate on and for which we
// don't have any keys since this may indicate an out-migrated partition
// and we want OperationRunner to throw WrongTargetException to notify
// the caller about such situations.

// Using the type of Int2ObjectHashMap allows the get and put operations
// to avoid auto-boxing.
final Int2ObjectHashMap<List<Data>> partitionToKeys = createInt2ObjectHashMap(partitions.length);

// Pre-populate the map with the requested partitions to use it as a set
// to filter out possible unrequested partitions encountered among the
// fetched keys.
for (int partition : partitions) {
partitionToKeys.put(partition, Collections.<Data>emptyList());
}

final int roughSize = Math.min(partitionService.getPartitionCount(), keys.size());

//using the type of Int2ObjectHashMap allows the get and put operations to avoid auto-boxing
final Int2ObjectHashMap<List<Data>> partitionToKeys = createInt2ObjectHashMap(roughSize);
for (Data key : keys) {
int partitionId = partitionService.getPartitionId(key);
List<Data> keyList = partitionToKeys.get(partitionId);
if (keyList == null) {
// we weren't asked to run on this partition
continue;
}

if (keyList.isEmpty()) {
// we have a first key for this partition
keyList = new ArrayList<Data>();
partitionToKeys.put(partitionId, keyList);
}
Expand Down