diff --git a/hazelcast-client/src/test/java/com/hazelcast/client/map/ClientPutAllRemoveBounceTest.java b/hazelcast-client/src/test/java/com/hazelcast/client/map/ClientPutAllRemoveBounceTest.java new file mode 100644 index 000000000000..70db01378e8f --- /dev/null +++ b/hazelcast-client/src/test/java/com/hazelcast/client/map/ClientPutAllRemoveBounceTest.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.client.map; + +import com.hazelcast.config.Config; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.IMap; +import com.hazelcast.query.SampleTestObjects.Employee; +import com.hazelcast.query.SqlPredicate; +import com.hazelcast.test.HazelcastSerialClassRunner; +import com.hazelcast.test.HazelcastTestSupport; +import com.hazelcast.test.annotation.SlowTest; +import com.hazelcast.test.bounce.BounceMemberRule; +import com.hazelcast.test.jitter.JitterRule; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.junit.Assert.assertEquals; + +/** + * Test removing entries by query from Hazelcast clients while members are + * shutting down and joining. + */ +@RunWith(HazelcastSerialClassRunner.class) +@Category(SlowTest.class) +public class ClientPutAllRemoveBounceTest extends HazelcastTestSupport { + private static final String TEST_MAP_NAME = "employees"; + private static final int CONCURRENCY = 1; + + @Rule + public BounceMemberRule bounceMemberRule = BounceMemberRule.with(getConfig()).clusterSize(4).driverCount(4).build(); + + @Before + public void setup() { + bounceMemberRule.getSteadyMember().getMap(TEST_MAP_NAME).addIndex("id", true); + } + + @Test + public void testQuery() { + QueryRunnable[] testTasks = new QueryRunnable[CONCURRENCY]; + for (int i = 0; i < CONCURRENCY; i++) { + testTasks[i] = new QueryRunnable(bounceMemberRule.getNextTestDriver()); + } + bounceMemberRule.testRepeatedly(testTasks, MINUTES.toSeconds(1)); + } + + @After + public void assertMapEmpty() { + IMap map = bounceMemberRule.getSteadyMember().getMap(TEST_MAP_NAME); + assertEquals("Map is not empty ", 0, map.size()); + } + + protected Config getConfig() { + return smallInstanceConfig(); + } + + public static class QueryRunnable implements Runnable { + private final HazelcastInstance hazelcastInstance; + private final Random random = new Random(); + private IMap map; + int range = 10; + int keyDomain = Integer.MAX_VALUE; + + QueryRunnable(HazelcastInstance hazelcastInstance) { + this.hazelcastInstance = hazelcastInstance; + } + + @Override + public void run() { + if (map == null) { + map = hazelcastInstance.getMap(TEST_MAP_NAME); + } + int min = random.nextInt(keyDomain - range); + int max = min + range; + + Map m = new HashMap(); + for (int i = min; i < max; i++) { + m.put(i, new Employee(i, "name" + i, i, true, i)); + } + + map.putAll(m); + map.removeAll(new SqlPredicate("id >= " + min + " and id < " + max)); + } + } +} diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MultipleEntryOperation.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MultipleEntryOperation.java index 87f2c60ce3fb..27e5124247ed 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MultipleEntryOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MultipleEntryOperation.java @@ -45,8 +45,6 @@ public class MultipleEntryOperation extends MapOperation protected MapEntries responses; protected EntryProcessor entryProcessor; - protected transient EntryOperator operator; - public MultipleEntryOperation() { } @@ -69,8 +67,11 @@ public void innerBeforeRun() throws Exception { @SuppressWarnings("checkstyle:npathcomplexity") public void run() throws Exception { responses = new MapEntries(keys.size()); + if (keys.isEmpty()) { + return; + } - operator = operator(this, entryProcessor, getPredicate()); + EntryOperator operator = operator(this, entryProcessor, getPredicate()); for (Data key : keys) { Data response = operator.operateOnKey(key).doPostOperateOps().getResult(); if (response != null) { diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/PartitionWideEntryWithPredicateOperationFactory.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/PartitionWideEntryWithPredicateOperationFactory.java index 3d721c31c4ad..822dbabadec3 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/PartitionWideEntryWithPredicateOperationFactory.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/PartitionWideEntryWithPredicateOperationFactory.java @@ -16,24 +16,26 @@ package com.hazelcast.map.impl.operation; -import com.hazelcast.internal.partition.InternalPartitionService; import com.hazelcast.map.EntryProcessor; -import com.hazelcast.map.impl.MapContainer; import com.hazelcast.map.impl.MapDataSerializerHook; import com.hazelcast.map.impl.MapService; import com.hazelcast.map.impl.MapServiceContext; +import com.hazelcast.map.impl.query.Query; +import com.hazelcast.map.impl.query.QueryResult; +import com.hazelcast.map.impl.query.QueryResultRow; +import com.hazelcast.map.impl.query.QueryRunner; import com.hazelcast.nio.ObjectDataInput; import com.hazelcast.nio.ObjectDataOutput; import com.hazelcast.nio.serialization.Data; import com.hazelcast.query.Predicate; import com.hazelcast.query.TruePredicate; -import com.hazelcast.query.impl.Indexes; -import com.hazelcast.query.impl.QueryableEntry; -import com.hazelcast.query.impl.predicates.QueryOptimizer; import com.hazelcast.spi.NodeEngine; import com.hazelcast.spi.Operation; import com.hazelcast.spi.impl.operationservice.impl.operations.PartitionAwareOperationFactory; +import com.hazelcast.spi.partition.IPartitionService; +import com.hazelcast.util.IterationType; import com.hazelcast.util.collection.InflatableSet; +import com.hazelcast.util.collection.InflatableSet.Builder; import com.hazelcast.util.collection.Int2ObjectHashMap; import java.io.IOException; @@ -44,12 +46,8 @@ import java.util.Set; import static com.hazelcast.map.impl.MapService.SERVICE_NAME; -import static com.hazelcast.util.CollectionUtil.isEmpty; -import static com.hazelcast.util.CollectionUtil.toIntArray; import static com.hazelcast.util.MapUtil.createInt2ObjectHashMap; -import static com.hazelcast.util.MapUtil.isNullOrEmpty; import static com.hazelcast.util.collection.InflatableSet.newBuilder; -import static java.util.Collections.emptySet; public class PartitionWideEntryWithPredicateOperationFactory extends PartitionAwareOperationFactory { @@ -57,46 +55,52 @@ public class PartitionWideEntryWithPredicateOperationFactory extends PartitionAw private EntryProcessor entryProcessor; private Predicate predicate; + /** + * Entry keys grouped by partition ID. This map is constructed from data + * fetched by querying the map from non-partition threads. Because of + * concurrent migrations, the query running on non-partition threads might + * fail. In this case, the map is {@code null}. + */ private transient Map> partitionIdToKeysMap; public PartitionWideEntryWithPredicateOperationFactory() { } - public PartitionWideEntryWithPredicateOperationFactory(String name, EntryProcessor entryProcessor, - Predicate predicate, Map> partitionIdToKeysMap) { - this(name, entryProcessor, predicate); - this.partitionIdToKeysMap = partitionIdToKeysMap; - this.partitions = isNullOrEmpty(partitionIdToKeysMap) ? null : toIntArray(partitionIdToKeysMap.keySet()); - } - public PartitionWideEntryWithPredicateOperationFactory(String name, EntryProcessor entryProcessor, Predicate predicate) { this.name = name; this.entryProcessor = entryProcessor; this.predicate = predicate; } + private PartitionWideEntryWithPredicateOperationFactory(String name, EntryProcessor entryProcessor, Predicate predicate, + Map> partitionIdToKeysMap) { + this(name, entryProcessor, predicate); + this.partitionIdToKeysMap = partitionIdToKeysMap; + } + @Override - public PartitionAwareOperationFactory createFactoryOnRunner(NodeEngine nodeEngine) { - Set keys = getKeysFromIndex(nodeEngine); - Map> partitionIdToKeysMap - = getPartitionIdToKeysMap(keys, ((InternalPartitionService) nodeEngine.getPartitionService())); + public PartitionAwareOperationFactory createFactoryOnRunner(NodeEngine nodeEngine, int[] partitions) { + Set keys = tryToObtainKeysFromIndexes(nodeEngine); + Map> partitionIdToKeysMap = groupKeysByPartition(keys, nodeEngine.getPartitionService(), partitions); return new PartitionWideEntryWithPredicateOperationFactory(name, entryProcessor, predicate, partitionIdToKeysMap); } @Override public Operation createPartitionOperation(int partition) { - if (isNullOrEmpty(partitionIdToKeysMap)) { - // fallback here if we cannot find anything from indexes. + if (partitionIdToKeysMap == null) { + // Index query failed to run because of ongoing migrations or we are + // creating an operation on the caller node. return new PartitionWideEntryWithPredicateOperation(name, entryProcessor, predicate); } + // index query succeeded List keyList = partitionIdToKeysMap.get(partition); - InflatableSet keys = newBuilder(keyList).build(); + assert keyList != null : "unexpected partition " + partition + ", expected partitions " + partitionIdToKeysMap.keySet(); + Set keys = keyList.isEmpty() ? Collections.emptySet() : newBuilder(keyList).build(); return new MultipleEntryWithPredicateOperation(name, keys, entryProcessor, predicate); } - @Override public void writeData(ObjectDataOutput out) throws IOException { out.writeUTF(name); @@ -111,58 +115,74 @@ public void readData(ObjectDataInput in) throws IOException { predicate = in.readObject(); } - private Set getKeysFromIndex(NodeEngine nodeEngine) { + /** + * Attempts to get keys by running an index query. This method may return + * {@code null} if there is an ongoing migration, which means that it is not + * safe to return results from a non-partition thread. The caller must then + * run a partition query to obtain the results. + * + * @param nodeEngine nodeEngine of this cluster node + * @return the set of keys or {@code null} if we failed to fetch the keys + * because of ongoing migrations + */ + private Set tryToObtainKeysFromIndexes(NodeEngine nodeEngine) { // Do not use index in this case, because it requires full-table-scan. if (predicate == TruePredicate.INSTANCE) { - return emptySet(); + return null; } - // get indexes MapService mapService = nodeEngine.getService(SERVICE_NAME); MapServiceContext mapServiceContext = mapService.getMapServiceContext(); - Set result = queryAllPartitions(mapServiceContext); - if (result == null) { - return emptySet(); + QueryRunner runner = mapServiceContext.getMapQueryRunner(name); + Query query = Query.of().mapName(name).predicate(predicate).iterationType(IterationType.KEY).build(); + final QueryResult result = (QueryResult) runner.runIndexQueryOnOwnedPartitions(query); + if (result.getPartitionIds() == null) { + // failed to run query because of ongoing migrations + return null; } - List keys = null; - for (QueryableEntry e : result) { - if (keys == null) { - keys = new ArrayList(result.size()); - } - keys.add(e.getKeyData()); + final Builder setBuilder = InflatableSet.newBuilder(result.size()); + for (QueryResultRow row : result.getRows()) { + setBuilder.add(row.getKey()); } - - return keys == null ? Collections.emptySet() : newBuilder(keys).build(); + return setBuilder.build(); } - private Set queryAllPartitions(MapServiceContext mapServiceContext) { - QueryOptimizer queryOptimizer = mapServiceContext.getQueryOptimizer(); - MapContainer mapContainer = mapServiceContext.getMapContainer(name); - Indexes indexes = mapContainer.getIndexes(); - if (indexes != null) { - Predicate optimizedPredicate = queryOptimizer.optimize(predicate, indexes); - Set querySet = indexes.query(optimizedPredicate); - return querySet; - } else { - throw new IllegalArgumentException("Partitioned index is not supported for on-heap usage"); + private Map> groupKeysByPartition(Set keys, IPartitionService partitionService, int[] partitions) { + if (keys == null) { + return null; } - } - private Map> getPartitionIdToKeysMap(Set keys, InternalPartitionService partitionService) { - if (isEmpty(keys)) { - return Collections.emptyMap(); + // Even if the keys are successfully fetched from indexes, we need to + // filter them to exclude the keys belonging to partitions on which we + // weren't asked to operate on. Moreover, we need to include the + // partitions on which we were asked to operate on and for which we + // don't have any keys since this may indicate an out-migrated partition + // and we want OperationRunner to throw WrongTargetException to notify + // the caller about such situations. + + // Using the type of Int2ObjectHashMap allows the get and put operations + // to avoid auto-boxing. + final Int2ObjectHashMap> partitionToKeys = createInt2ObjectHashMap(partitions.length); + + // Pre-populate the map with the requested partitions to use it as a set + // to filter out possible unrequested partitions encountered among the + // fetched keys. + for (int partition : partitions) { + partitionToKeys.put(partition, Collections.emptyList()); } - final int roughSize = Math.min(partitionService.getPartitionCount(), keys.size()); - - //using the type of Int2ObjectHashMap allows the get and put operations to avoid auto-boxing - final Int2ObjectHashMap> partitionToKeys = createInt2ObjectHashMap(roughSize); for (Data key : keys) { int partitionId = partitionService.getPartitionId(key); List keyList = partitionToKeys.get(partitionId); if (keyList == null) { + // we weren't asked to run on this partition + continue; + } + + if (keyList.isEmpty()) { + // we have a first key for this partition keyList = new ArrayList(); partitionToKeys.put(partitionId, keyList); } diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/query/QueryRunner.java b/hazelcast/src/main/java/com/hazelcast/map/impl/query/QueryRunner.java index b86a10a310bc..e1beee430fd3 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/query/QueryRunner.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/query/QueryRunner.java @@ -124,6 +124,52 @@ public Result runIndexOrPartitionScanQueryOnOwnedPartitions(Query query) { return result; } + /** + * Performs the given query using indexes. + *

+ * The method may return a special failure result, which has {@code null} + * {@link Result#getPartitionIds() partition IDs}, in the following + * situations: + *

    + *
  • If a partition migration is detected during the query execution. + *
  • If it's impossible to perform the given query using indexes. + *
+ *

+ * The method may be invoked on any thread. + * + * @param query the query to perform. + * @return the result of the query; if the result has {@code null} {@link + * Result#getPartitionIds() partition IDs} this indicates a failure. + */ + public Result runIndexQueryOnOwnedPartitions(Query query) { + int migrationStamp = getMigrationStamp(); + Collection initialPartitions = mapServiceContext.getOwnedPartitions(); + MapContainer mapContainer = mapServiceContext.getMapContainer(query.getMapName()); + + // to optimize the query we need to get any index instance + Indexes indexes = mapContainer.getIndexes(); + if (indexes == null) { + indexes = mapContainer.getIndexes(initialPartitions.iterator().next()); + } + // first we optimize the query + Predicate predicate = queryOptimizer.optimize(query.getPredicate(), indexes); + + // then we try to run using an index + Collection entries = runUsingGlobalIndexSafely(predicate, mapContainer, migrationStamp); + + Result result; + if (entries == null) { + // failed with index query because of ongoing migrations + result = populateEmptyResult(query, initialPartitions); + } else { + // success + result = populateResult(query, initialPartitions, entries); + } + + updateStatistics(mapContainer); + return result; + } + // MIGRATION UNSAFE QUERYING - MIGRATION STAMPS ARE NOT VALIDATED, so assumes a run on partition-thread // for a single partition. If the index is global it won't be asked public Result runPartitionIndexOrPartitionScanQueryOnGivenOwnedPartition(Query query, int partitionId) { diff --git a/hazelcast/src/main/java/com/hazelcast/spi/impl/operationservice/impl/operations/PartitionAwareOperationFactory.java b/hazelcast/src/main/java/com/hazelcast/spi/impl/operationservice/impl/operations/PartitionAwareOperationFactory.java index 7cf372dc7abf..b29a48c1188d 100644 --- a/hazelcast/src/main/java/com/hazelcast/spi/impl/operationservice/impl/operations/PartitionAwareOperationFactory.java +++ b/hazelcast/src/main/java/com/hazelcast/spi/impl/operationservice/impl/operations/PartitionAwareOperationFactory.java @@ -19,7 +19,6 @@ import com.hazelcast.spi.NodeEngine; import com.hazelcast.spi.Operation; import com.hazelcast.spi.OperationFactory; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; /** * Creates partition specific operations. @@ -40,8 +39,12 @@ public abstract class PartitionAwareOperationFactory implements OperationFactory * this method can be used to create it. Otherwise, stateful factories may cause JMM problems. * * @param nodeEngine nodeEngine + * @param partitions the partitions provided to an operation which use this + * factory. The operation factory may decide to use this + * externally provided partition set if it doesn't manage + * one internally on its own. */ - public PartitionAwareOperationFactory createFactoryOnRunner(NodeEngine nodeEngine) { + public PartitionAwareOperationFactory createFactoryOnRunner(NodeEngine nodeEngine, int[] partitions) { return this; } @@ -55,19 +58,6 @@ public PartitionAwareOperationFactory createFactoryOnRunner(NodeEngine nodeEngin */ public abstract Operation createPartitionOperation(int partition); - /** - * This method will be called on operation runner node. - *

- * Created operations by this factory will be run on the partitions returned by this method. - * Returning {@code null} means operations will be run provided partitions by default. - * - * @return {@code null} to preserve default behaviour or return relevant partition IDs for the operations of this factory - */ - @SuppressFBWarnings("EI_EXPOSE_REP") - public int[] getPartitions() { - return partitions; - } - @Override public Operation createOperation() { throw new UnsupportedOperationException("Use createPartitionOperation() with PartitionAwareOperationFactory"); diff --git a/hazelcast/src/main/java/com/hazelcast/spi/impl/operationservice/impl/operations/PartitionIteratingOperation.java b/hazelcast/src/main/java/com/hazelcast/spi/impl/operationservice/impl/operations/PartitionIteratingOperation.java index 0c85561864a6..38ef4dbe4cb8 100644 --- a/hazelcast/src/main/java/com/hazelcast/spi/impl/operationservice/impl/operations/PartitionIteratingOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/spi/impl/operationservice/impl/operations/PartitionIteratingOperation.java @@ -136,11 +136,8 @@ private void executeOperations() { } private void executePartitionAwareOperations(PartitionAwareOperationFactory givenFactory) { - PartitionAwareOperationFactory factory = givenFactory.createFactoryOnRunner(getNodeEngine()); - NodeEngine nodeEngine = getNodeEngine(); - int[] operationFactoryPartitions = factory.getPartitions(); - partitions = operationFactoryPartitions == null ? partitions : operationFactoryPartitions; + PartitionAwareOperationFactory factory = givenFactory.createFactoryOnRunner(nodeEngine, partitions); OperationResponseHandler responseHandler = new OperationResponseHandlerImpl(partitions); OperationService operationService = nodeEngine.getOperationService(); diff --git a/hazelcast/src/test/java/com/hazelcast/map/MapPutAllWrongTargetForPartitionTest.java b/hazelcast/src/test/java/com/hazelcast/map/MapPutAllWrongTargetForPartitionTest.java index f782ef551085..ca736c17cd6e 100644 --- a/hazelcast/src/test/java/com/hazelcast/map/MapPutAllWrongTargetForPartitionTest.java +++ b/hazelcast/src/test/java/com/hazelcast/map/MapPutAllWrongTargetForPartitionTest.java @@ -106,12 +106,13 @@ private void testPutAllPerMemberOperation(final int entriesPerPartition) throws SerializationService serializationService = nodeEngine.getSerializationService(); // create a PutAllPerMemberOperation with entries for all partitions - PartitionAwareOperationFactory factory = createPutAllOperationFactory(entriesPerPartition, mapName, hz, + int[] allPartitions = getAllPartitions(); + PartitionAwareOperationFactory factory = createPutAllOperationFactory(allPartitions, entriesPerPartition, mapName, hz, serializationService); // invoke the operation on a random remote target InternalOperationService operationService = nodeEngine.getOperationService(); - operationService.invokeOnPartitions(MapService.SERVICE_NAME, factory, factory.getPartitions()); + operationService.invokeOnPartitions(MapService.SERVICE_NAME, factory, allPartitions); // assert that all entries have been written IMap map = hz.getMap(mapName); @@ -123,7 +124,7 @@ private void testPutAllPerMemberOperation(final int entriesPerPartition) throws // assert that each member owns entriesPerPartition entries of the map and that all backups have been written assertTrueEventually(new AssertTask() { @Override - public void run() throws Exception { + public void run() { int totalBackups = 0; for (int i = 0; i < INSTANCE_COUNT; i++) { IMap map = instances[i].getMap(mapName); @@ -137,12 +138,19 @@ public void run() throws Exception { }); } - private PartitionAwareOperationFactory createPutAllOperationFactory(int entriesPerPartition, String mapName, + private int[] getAllPartitions() { + int[] partitions = new int[INSTANCE_COUNT]; + for (int partitionId = 0; partitionId < INSTANCE_COUNT; partitionId++) { + partitions[partitionId] = partitionId; + } + return partitions; + } + + private PartitionAwareOperationFactory createPutAllOperationFactory(int[] partitions, int entriesPerPartition, String mapName, HazelcastInstance hz, SerializationService serializationService) { - int[] partitions = new int[INSTANCE_COUNT]; MapEntries[] entries = new MapEntries[INSTANCE_COUNT]; - for (int partitionId = 0; partitionId < INSTANCE_COUNT; partitionId++) { + for (int partitionId : partitions) { MapEntries mapEntries = new MapEntries(entriesPerPartition); for (int i = 0; i < entriesPerPartition; i++) { @@ -152,7 +160,6 @@ private PartitionAwareOperationFactory createPutAllOperationFactory(int entriesP mapEntries.add(data, data); } - partitions[partitionId] = partitionId; entries[partitionId] = mapEntries; } return getPutAllPartitionAwareOperationFactory(mapName, partitions, entries); diff --git a/hazelcast/src/test/java/com/hazelcast/spi/impl/operationservice/impl/OperationServiceImpl_invokeOnPartitionsTest.java b/hazelcast/src/test/java/com/hazelcast/spi/impl/operationservice/impl/OperationServiceImpl_invokeOnPartitionsTest.java index 5f2c0185a45b..4cc6b03629eb 100644 --- a/hazelcast/src/test/java/com/hazelcast/spi/impl/operationservice/impl/OperationServiceImpl_invokeOnPartitionsTest.java +++ b/hazelcast/src/test/java/com/hazelcast/spi/impl/operationservice/impl/OperationServiceImpl_invokeOnPartitionsTest.java @@ -24,6 +24,7 @@ import com.hazelcast.nio.serialization.IdentifiedDataSerializable; import com.hazelcast.spi.Operation; import com.hazelcast.spi.OperationFactory; +import com.hazelcast.spi.impl.operationservice.impl.operations.PartitionAwareOperationFactory; import com.hazelcast.test.HazelcastParallelClassRunner; import com.hazelcast.test.HazelcastTestSupport; import com.hazelcast.test.TestHazelcastInstanceFactory; @@ -82,6 +83,21 @@ public void testLongRunning() throws Exception { } } + @Test + public void testPartitionScopeIsRespectedForPartitionAwareFactories() throws Exception { + Config config = new Config().setProperty(PARTITION_COUNT.getName(), "" + 100); + config.getSerializationConfig() + .addDataSerializableFactory(321, new PartitionAwareOperationFactoryDataSerializableFactory()); + HazelcastInstance hz = createHazelcastInstance(config); + OperationServiceImpl opService = getOperationServiceImpl(hz); + + Map result = opService + .invokeOnPartitions(null, new PartitionAwareOperationFactoryImpl(new int[]{0, 1, 2}), new int[]{1}); + + assertEquals(1, result.size()); + assertEquals(2, result.values().iterator().next()); + } + private static class OperationFactoryImpl extends AbstractOperationFactor { @Override public Operation createOperation() { @@ -162,4 +178,45 @@ public Object getResponse() { } } + private static class PartitionAwareOperationFactoryImpl extends PartitionAwareOperationFactory { + public PartitionAwareOperationFactoryImpl(int[] partitions) { + this.partitions = partitions; + } + + public PartitionAwareOperationFactoryImpl() { + } + + @Override + public Operation createPartitionOperation(int partition) { + return new OperationImpl(); + } + + @Override + public int getFactoryId() { + return 321; + } + + @Override + public int getId() { + return 654; + } + + @Override + public void writeData(ObjectDataOutput out) throws IOException { + out.writeIntArray(partitions); + } + + @Override + public void readData(ObjectDataInput in) throws IOException { + this.partitions = in.readIntArray(); + } + } + + private static class PartitionAwareOperationFactoryDataSerializableFactory implements DataSerializableFactory { + @Override + public IdentifiedDataSerializable create(int typeId) { + return new PartitionAwareOperationFactoryImpl(); + } + } + }