Skip to content

Commit

Permalink
Batch partition replica updates [HZ-3652] (hazelcast#25905)
Browse files Browse the repository at this point in the history
Since Hazelcast IMDG 4.1 with the introduction of parallel migrations,
the "partition assignments version", a monotonically increasing version
number for the whole partition replica assignments data structure, was
replaced with a "partition state stamp" - a hash calculated over
individual partitions' version. On each partition replica assignment
update, the partition state stamp needs to be recalculated.

In certain cases, it is expected that all partition replica assignments
will be updated:
- during initial partitions assignment ("first arrangement")
- when a member that joins the cluster applies the partition replica
assignments, as received from master member
- when a member recovers partition replica assignments from persistence

In such cases, updating the partition state stamp on each partition
replica update is inefficient. Instead, the partition state stamp can be
updated just once, after the whole batch of partition replica
assignments has been applied.

Measured the following timings:
- Initial partitions assignment on a single member with 20K partitions
  - current `master` branch: 3870 millis (20001 partition stamp updates)
  - with this PR: 28 millis (1 partition stamp update)
- Apply initial partition state on the 3rd member joining a cluster with
2 members
   - current `master` branch: 3260 (40002 partition stamp updates)
   - with this PR: 4 millis (1 partition stamp update)
  • Loading branch information
vbekiaris committed Nov 10, 2023
1 parent 8c2cd05 commit 1030206
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

import com.hazelcast.internal.partition.PartitionReplica;
import com.hazelcast.internal.partition.PartitionReplicaInterceptor;
import com.hazelcast.internal.util.collection.PartitionIdSet;

/**
* PartitionReplicaInterceptor used to intercept partition changes internally.
* Most significant responsibility of this interceptor is to increment the partition-state version on each change
* Most significant responsibility of this interceptor is to update the partition-state stamp on each change
* and cancel any ongoing replica synchronization on the changed partition.
*/
final class DefaultPartitionReplicaInterceptor implements PartitionReplicaInterceptor {
Expand All @@ -31,6 +32,12 @@ final class DefaultPartitionReplicaInterceptor implements PartitionReplicaInterc
this.partitionService = partitionService;
}

/**
* If this logic changes, consider also changing the implementation of
* {@link PartitionStateManager#partitionOwnersChanged(PartitionIdSet)}, which should apply
* the same logic per partition batch.
* </b></p>.
*/
@Override
public void replicaChanged(int partitionId, int replicaIndex, PartitionReplica oldReplica, PartitionReplica newReplica) {
if (replicaIndex == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,17 @@ void swapReplicas(int index1, int index2) {
onReplicaChange(index2, a2, a1);
}

void setReplicasAndVersion(InternalPartition partition) {
setReplicas(partition.getReplicasCopy());
/**
* This method is always called from batch partition update situations, so it does
* not invoke interceptors individually per partition.
* (apply partition assignments from master while joining, apply partition table recovered
* from hot restart)
* @return {@code true} if partition owner was changed, otherwise {@code false}.
*/
boolean setReplicasAndVersion(InternalPartition partition) {
boolean ownerChanged = setReplicas(partition.getReplicasCopy(), false);
version = partition.version();
return ownerChanged;
}

void setVersion(int version) {
Expand All @@ -122,6 +130,17 @@ void setReplicas(PartitionReplica[] newReplicas) {
onReplicasChange(newReplicas, oldReplicas);
}

/**
* Variant of {@link #setReplicas(PartitionReplica[])} that's suitable for batch partition replica updates.
*
* @return {@code true} if partition owner was changed, otherwise {@code false}.
*/
boolean setReplicas(PartitionReplica[] newReplicas, boolean invokeInterceptor) {
PartitionReplica[] oldReplicas = replicas;
replicas = newReplicas;
return onReplicasChange(newReplicas, oldReplicas, invokeInterceptor);
}

void setReplica(int replicaIndex, PartitionReplica newReplica) {
PartitionReplica[] newReplicas = copyOf(replicas, MAX_REPLICA_COUNT);
PartitionReplica oldReplica = newReplicas[replicaIndex];
Expand All @@ -130,32 +149,60 @@ void setReplica(int replicaIndex, PartitionReplica newReplica) {
onReplicaChange(replicaIndex, oldReplica, newReplica);
}

/** Calls the partition replica change interceptor for all changed replicas. */
private void onReplicasChange(PartitionReplica[] newReplicas, PartitionReplica[] oldReplicas) {
for (int replicaIndex = 0; replicaIndex < MAX_REPLICA_COUNT; replicaIndex++) {
/**
* Calls the partition replica change interceptor for all changed replicas.
* @return {@code true} if partition owner change was detected, otherwise {@code false}.
*/
private boolean onReplicasChange(PartitionReplica[] newReplicas, PartitionReplica[] oldReplicas) {
return onReplicasChange(newReplicas, oldReplicas, true);
}

private boolean onReplicasChange(PartitionReplica[] newReplicas, PartitionReplica[] oldReplicas, boolean invokeInterceptor) {
PartitionReplica oldReplicasOwner = oldReplicas[0];
PartitionReplica newReplicasOwner = newReplicas[0];
boolean partitionOwnerChanged = onReplicaChange(0, oldReplicasOwner, newReplicasOwner, invokeInterceptor);

for (int replicaIndex = 1; replicaIndex < MAX_REPLICA_COUNT; replicaIndex++) {
PartitionReplica oldReplicasId = oldReplicas[replicaIndex];
PartitionReplica newReplicasId = newReplicas[replicaIndex];
onReplicaChange(replicaIndex, oldReplicasId, newReplicasId);
onReplicaChange(replicaIndex, oldReplicasId, newReplicasId, invokeInterceptor);
}
return partitionOwnerChanged;
}

/** Calls the partition replica change interceptor for the changed replica. */
/**
* If a replica change is detected, then increments partition version and calls the partition replica change interceptor
* for the changed replica.
* @return {@code true} if a replica change was detected, otherwise {@code false}.
*/
@SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT",
justification = "This method is called under InternalPartitionServiceImpl.lock")
private void onReplicaChange(int replicaIndex, PartitionReplica oldReplica, PartitionReplica newReplica) {
private boolean onReplicaChange(int replicaIndex, PartitionReplica oldReplica, PartitionReplica newReplica) {
return onReplicaChange(replicaIndex, oldReplica, newReplica, true);
}

/**
* Calls the partition replica change interceptor for the changed replica.
* @return {@code true} if a replica change was detected, otherwise {@code false}.
*/
@SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT",
justification = "This method is called under InternalPartitionServiceImpl.lock")
private boolean onReplicaChange(int replicaIndex, PartitionReplica oldReplica, PartitionReplica newReplica,
boolean invokeInterceptor) {
boolean changed;
if (oldReplica == null) {
changed = newReplica != null;
} else {
changed = !oldReplica.equals(newReplica);
}
if (!changed) {
return;
return false;
}
version++;
if (interceptor != null) {
if (interceptor != null && invokeInterceptor) {
interceptor.replicaChanged(partitionId, replicaIndex, oldReplica, newReplica);
}
return true;
}

InternalPartitionImpl copy(PartitionReplicaInterceptor interceptor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import com.hazelcast.internal.util.ExceptionUtil;
import com.hazelcast.internal.util.HashUtil;
import com.hazelcast.internal.util.StringUtil;
import com.hazelcast.internal.util.collection.PartitionIdSet;
import com.hazelcast.internal.util.scheduler.CoalescingDelayedTrigger;
import com.hazelcast.internal.util.scheduler.ScheduledEntry;
import com.hazelcast.logging.ILogger;
Expand Down Expand Up @@ -750,11 +751,13 @@ private void requestMemberListUpdateIfUnknownMembersFound(Address sender, Intern
* @see MigrationManager#scheduleActiveMigrationFinalization(MigrationInfo)
* @return whether or not the new partition table is accepted
*/
@SuppressWarnings({"checkstyle:npathcomplexity", "checkstyle:cyclomaticcomplexity"})
private boolean updatePartitionsAndFinalizeMigrations(InternalPartition[] partitions,
Collection<MigrationInfo> completedMigrations, Address sender) {

boolean applied = false;
boolean accepted = false;
PartitionIdSet changedOwnerPartitions = new PartitionIdSet(partitionCount);

for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
InternalPartition newPartition = partitions[partitionId];
Expand Down Expand Up @@ -785,7 +788,9 @@ private boolean updatePartitionsAndFinalizeMigrations(InternalPartition[] partit

applied = true;
accepted = true;
currentPartition.setReplicasAndVersion(newPartition);
if (currentPartition.setReplicasAndVersion(newPartition)) {
changedOwnerPartitions.add(partitionId);
}
}

for (MigrationInfo migration : completedMigrations) {
Expand All @@ -795,10 +800,11 @@ private boolean updatePartitionsAndFinalizeMigrations(InternalPartition[] partit
}
}

// Manually trigger partition stamp calculation.
// Because partition versions are explicitly set to master's versions
// while applying the partition table updates.
partitionStateManager.updateStamp();
// while applying the partition table updates, we need to
// (1) cancel any ongoing replica syncs (for partitions whose owners changed) and
// (2) update the partition state stamp.
partitionStateManager.partitionOwnersChanged(changedOwnerPartitions);

if (logger.isFineEnabled()) {
if (applied) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.hazelcast.internal.partition.ReadonlyInternalPartition;
import com.hazelcast.internal.partition.membergroup.MemberGroupFactory;
import com.hazelcast.internal.partition.membergroup.MemberGroupFactoryFactory;
import com.hazelcast.internal.util.collection.PartitionIdSet;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.partitiongroup.MemberGroup;

Expand All @@ -44,6 +45,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.IntConsumer;

import static com.hazelcast.cluster.memberselector.MemberSelectors.DATA_MEMBER_SELECTOR;
import static com.hazelcast.internal.metrics.MetricDescriptorConstants.PARTITIONS_METRIC_PARTITION_REPLICA_STATE_MANAGER_ACTIVE_PARTITION_COUNT;
Expand Down Expand Up @@ -98,6 +100,9 @@ public class PartitionStateManager {
// can be read and written concurrently...
private volatile int memberGroupsSize;

/** For test usage only */
private ReplicaUpdateInterceptor replicaUpdateInterceptor;

public PartitionStateManager(Node node, InternalPartitionServiceImpl partitionService) {
this.node = node;
this.logger = node.getLogger(getClass());
Expand All @@ -117,6 +122,7 @@ public PartitionStateManager(Node node, InternalPartitionServiceImpl partitionSe
node.getDiscoveryService());
partitionStateGenerator = new PartitionStateGeneratorImpl();
snapshotOnRemove = new ConcurrentHashMap<>();
this.replicaUpdateInterceptor = NoOpBatchReplicatUpdateInterceptor.INSTANCE;
}

/**
Expand Down Expand Up @@ -192,11 +198,7 @@ boolean initializePartitionAssignments(Set<Member> excludedMembers) {
+ "Expected: " + partitionCount + ", Actual: " + newState.length);
}

for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
InternalPartitionImpl partition = partitions[partitionId];
PartitionReplica[] replicas = newState[partitionId];
partition.setReplicas(replicas);
}
batchUpdateReplicas(newState);

ClusterState clusterState = node.getClusterService().getClusterState();
if (!clusterState.isMigrationAllowed()) {
Expand All @@ -210,6 +212,36 @@ boolean initializePartitionAssignments(Set<Member> excludedMembers) {
return true;
}

void batchUpdateReplicas(PartitionReplica[][] newState) {
PartitionIdSet changedOwnersSet = new PartitionIdSet(partitionCount);
for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
InternalPartitionImpl partition = partitions[partitionId];
PartitionReplica[] replicas = newState[partitionId];
if (partition.setReplicas(replicas, false)) {
changedOwnersSet.add(partitionId);
}
}
partitionOwnersChanged(changedOwnersSet);
}

/**
* Called after a batch of partition replica assignments have been applied. This is an optimization for batch
* changes, to avoid repeatedly performing costly computations (like updating partition assignments stamp).
* <p><b>
* If this logic changes, consider also changing the implementation of
* {@link PartitionReplicaInterceptor#replicaChanged(int, int, PartitionReplica, PartitionReplica)}, which should apply
* the same logic per partition.
* </b></p>
*
* @param partitionIdSet
*/
void partitionOwnersChanged(PartitionIdSet partitionIdSet) {
partitionIdSet.intIterator().forEachRemaining(
(IntConsumer) partitionId -> partitionService.getReplicaManager().cancelReplicaSync(partitionId));
updateStamp();
replicaUpdateInterceptor.onPartitionOwnersChanged();
}

/**
* Returns {@code true} if the node has started and
* the cluster state allows migrations (see {@link ClusterState#isMigrationAllowed()}).
Expand Down Expand Up @@ -247,6 +279,7 @@ void setInitialState(PartitionTableView partitionTable) {
logger.info("Setting cluster partition table...");
boolean foundReplica = false;
PartitionReplica localReplica = PartitionReplica.from(node.getLocalMember());
PartitionIdSet changedOwnerPartitions = new PartitionIdSet(partitionCount);
for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
InternalPartitionImpl partition = partitions[partitionId];
InternalPartition newPartition = partitionTable.getPartition(partitionId);
Expand All @@ -257,10 +290,13 @@ void setInitialState(PartitionTableView partitionTable) {
}
partition.reset(localReplica);
if (newPartition != null) {
partition.setReplicasAndVersion(newPartition);
if (partition.setReplicasAndVersion(newPartition)) {
changedOwnerPartitions.add(partitionId);
}
}
}
if (foundReplica) {
partitionOwnersChanged(changedOwnerPartitions);
setInitialized();
}
}
Expand Down Expand Up @@ -388,6 +424,7 @@ public void updateStamp() {
if (logger.isFinestEnabled()) {
logger.finest("New calculated partition state stamp is: " + stateStamp);
}
replicaUpdateInterceptor.onPartitionStampUpdate();
}

public long getStamp() {
Expand All @@ -409,7 +446,8 @@ void incrementPartitionVersion(int partitionId, int delta) {

boolean setInitialized() {
if (!initialized) {
updateStamp();
// partition state stamp is already calculated
assert stateStamp != 0 : "Partition state stamp should already have been calculated";
initialized = true;
node.getNodeExtension().onPartitionStateChange();
return true;
Expand Down Expand Up @@ -472,4 +510,26 @@ PartitionTableView getSnapshot(UUID crashedMemberUuid) {
void removeSnapshot(UUID memberUuid) {
snapshotOnRemove.remove(memberUuid);
}

/** For test usage only */
void setReplicaUpdateInterceptor(ReplicaUpdateInterceptor interceptor) {
this.replicaUpdateInterceptor = interceptor;
}

interface ReplicaUpdateInterceptor {
void onPartitionOwnersChanged();
void onPartitionStampUpdate();
}

static final class NoOpBatchReplicatUpdateInterceptor implements ReplicaUpdateInterceptor {
static final NoOpBatchReplicatUpdateInterceptor INSTANCE = new NoOpBatchReplicatUpdateInterceptor();

@Override
public void onPartitionOwnersChanged() {
}

@Override
public void onPartitionStampUpdate() {
}
}
}

0 comments on commit 1030206

Please sign in to comment.