Skip to content

Commit

Permalink
[HZ-266] Fix conditions for detection of rejoining members (#19105)
Browse files Browse the repository at this point in the history
* Fixes conditions for detection of rejoining members

On non-master members, a gracefully shutdown master
was detected as unexpectedly leaving the cluster
and would fail to be admitted back to the cluster.

Now a partition table snapshot is only saved for
members which are still present in the partition
table, avoiding the above scenario. A saved
partition table snapshot is also a requirement
for recognizing a member as rejoining the cluster.

Also promotes some rather important log lines
to INFO level.

* Fixes NPE

Introduced by de6140e

* Offloadable partition replica sync

is now enabled by default. System
property to explicitly disable it
is still available as a kill-switch.

* Adds smart partition reassignment

When persistence is enabled and a crashed
member is rejoining, a stored partition
table snapshot may be picked during
repartitioning task, with a view to minimize
partition rearrangement and migrations.
An eligible partition table snapshot must
include replica assignments to current members
and not any of the shutting down members.
If multiple such candidates are located,
then the one with least "distance" from
the current partition table is chosen.
Distance calculation is the sum of all
partitions' distance. For a given partition
assignment, the distance calculation considers
replica index assignments. If a given
replica index is assigned to the same member
in both partition assignments, then their
distance is 0. If a member is assigned to
replica index 0 and index 2 in the compared
partitions, then the distance is incremented by 2.
A replica index that is assigned in one
but unassigned (null) in the other partition
increments distance by MAX_REPLICA_COUNT.

* Fixes race in offloaded anti-entropy

It is only safe to read replica versions before preparing replication operations.
Reasoning: even though partition is already marked as migrating,
operations may be already queued in partition thread.
If we read replica versions after replication operation
is prepared, we may read updated replica versions but replication op
may have stale data -> future backup sync checks will not detect the
stale data.

* Send retry response when partition is migrating

Instead of just retrying locally, a retry response needs
to be sent back to the replica sync requester, otherwise
subsequent replica sync requests may be stuck up to
migration timeout seconds (default 300sec) due to the
previous sync request being leftover in the sync request
registry (PartitionReplicaManager#replicaSyncRequests).
  • Loading branch information
vbekiaris committed Aug 4, 2021
1 parent d748589 commit 750c504
Show file tree
Hide file tree
Showing 10 changed files with 557 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,8 @@ private boolean isMemberRejoining(MemberMap previousMembersMap, Address address,
// or it is still in member list because connection timeout hasn't been reached yet
|| previousMembersMap.contains(memberUuid)
// or it is a known missing member
|| clusterService.getMembershipManager().isMissingMember(address, memberUuid));
|| clusterService.getMembershipManager().isMissingMember(address, memberUuid))
&& (node.getPartitionService().getLeftMemberSnapshot(memberUuid) != null);
}

private boolean checkIfUsingAnExistingMemberUuid(JoinMessage joinMessage) {
Expand Down Expand Up @@ -782,9 +783,7 @@ private void startJoin() {
for (MemberInfo member : joiningMembers.values()) {
if (isMemberRestartingWithPersistence(member.getAttributes())
&& isMemberRejoining(memberMap, member.getAddress(), member.getUuid())) {
if (logger.isFineEnabled()) {
logger.fine(member + " is rejoining the cluster");
}
logger.info(member + " is rejoining the cluster");
// do not trigger repartition immediately, wait for joining member to load hot-restart data
shouldTriggerRepartition = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@

package com.hazelcast.internal.partition;

import com.hazelcast.cluster.Address;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

import static com.hazelcast.internal.partition.InternalPartition.MAX_REPLICA_COUNT;
import static com.hazelcast.internal.partition.PartitionStampUtil.calculateStamp;

/**
Expand Down Expand Up @@ -63,7 +68,98 @@ public PartitionReplica getReplica(int partitionId, int replicaIndex) {

public PartitionReplica[] getReplicas(int partitionId) {
InternalPartition partition = partitions[partitionId];
return partition != null ? partition.getReplicasCopy() : new PartitionReplica[InternalPartition.MAX_REPLICA_COUNT];
return partition != null ? partition.getReplicasCopy() : new PartitionReplica[MAX_REPLICA_COUNT];
}

/**
* @param replicas
* @param excludedReplicas
* @return {@code true} when this {@code PartitionTableView}
* references the given {@code replicas UUID}s
* and not any of the {@code excludedReplicas UUID}s, otherwise
* {@code false}.
*/
public boolean composedOf(Set<UUID> replicas, Set<UUID> excludedReplicas) {
for (InternalPartition partition : partitions) {
for (PartitionReplica replica : partition.getReplicasCopy()) {
if (replica != null) {
if (!replicas.contains(replica.uuid()) || excludedReplicas.contains(replica.uuid())) {
return false;
}
}
}
}
return true;
}

/**
* @param partitionTableView
* @return a measure of the difference of this
* versus given {@code partitionTableView}.
*/
public int distanceOf(PartitionTableView partitionTableView) {
int distance = 0;
for (int i = 0; i < partitions.length; i++) {
distance += distanceOf(partitions[i], partitionTableView.partitions[i]);
}
return distance;
}

private int distanceOf(InternalPartition partition1, InternalPartition partition2) {
int distance = 0;
for (int i = 0; i < MAX_REPLICA_COUNT; i++) {
PartitionReplica replica1 = partition1.getReplica(i);
PartitionReplica replica2 = partition2.getReplica(i);
if (replica1 == null) {
if (replica2 != null) {
distance += MAX_REPLICA_COUNT;
}
} else {
if (replica2 != null) {
if (!replica1.uuid().equals(replica2.uuid())) {
int replicaIndex2 = replicaIndexOfUuid(replica1.uuid(), partition2);
if (replicaIndex2 == -1) {
distance += MAX_REPLICA_COUNT;
} else {
distance += Math.abs(replicaIndex2 - i);
}
}
}
}
}
return distance;
}

private int replicaIndexOfUuid(UUID uuid, InternalPartition partition) {
PartitionReplica[] replicas = ((AbstractInternalPartition) partition).replicas();
for (int i = 0; i < replicas.length; i++) {
if (replicas[i] != null && replicas[i].uuid().equals(uuid)) {
return i;
}
}
return -1;
}

public PartitionReplica[][] toArray(Map<UUID, Address> addressMap) {
int partitionCount = partitions.length;

PartitionReplica[][] replicas = new PartitionReplica[partitionCount][MAX_REPLICA_COUNT];

for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
InternalPartition p = partitions[partitionId];
replicas[partitionId] = new PartitionReplica[MAX_REPLICA_COUNT];

for (int index = 0; index < MAX_REPLICA_COUNT; index++) {
PartitionReplica replica = p.getReplica(index);
if (replica == null) {
continue;
}
replicas[partitionId][index] = addressMap.containsKey(replica.uuid())
? new PartitionReplica(addressMap.get(replica.uuid()), replica.uuid())
: replica;
}
}
return replicas;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,6 @@ public void memberAdded(Member member) {
}
}

// react to member leaving unexpectedly -- members leaving with graceful shutdown
// have had their migrations already done before removal
@Override
public void memberRemoved(Member member) {
logger.fine("Removing " + member);
Expand All @@ -389,11 +387,11 @@ public void memberRemoved(Member member) {
assert !shouldFetchPartitionTables;
shouldFetchPartitionTables = true;
}
// keep partition table snapshot as member leaves
if (logger.isFineEnabled()) {
logger.fine("Storing partition assignments snapshot for " + member + " from memberRemoved");
// keep partition table snapshot as member leaves, unless
// no partitions were assigned to it (member left with graceful shutdown)
if (!partitionStateManager.isAbsentInPartitionTable(member)) {
partitionStateManager.storeSnapshot(member.getUuid());
}
partitionStateManager.storeSnapshot(member.getUuid());
if (isMaster) {
migrationManager.triggerControlTaskWithDelay();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.hazelcast.internal.partition.PartitionReplica;
import com.hazelcast.internal.partition.PartitionRuntimeState;
import com.hazelcast.internal.partition.PartitionStateVersionMismatchException;
import com.hazelcast.internal.partition.PartitionTableView;
import com.hazelcast.internal.partition.impl.MigrationInterceptor.MigrationParticipant;
import com.hazelcast.internal.partition.impl.MigrationPlanner.MigrationDecisionCallback;
import com.hazelcast.internal.partition.operation.FinalizeMigrationOperation;
Expand Down Expand Up @@ -69,6 +70,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -79,6 +81,7 @@
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -838,7 +841,7 @@ public MigrationStats getStats() {
* this task has been scheduled, schedules migrations and syncs the partition state.
* Also schedules a {@link ProcessShutdownRequestsTask}. Acquires partition service lock.
*/
private class RepartitioningTask implements MigrationRunnable {
class RepartitioningTask implements MigrationRunnable {
@Override
public void run() {
if (!partitionService.isLocalMemberMaster()) {
Expand Down Expand Up @@ -879,7 +882,16 @@ private PartitionReplica[][] repartition() {
return null;
}

PartitionReplica[][] newState = partitionStateManager.repartition(shutdownRequestedMembers, null);
PartitionReplica[][] newState = null;
if (node.getNodeExtension().getInternalHotRestartService().isEnabled()) {
// check partition table snapshots when persistence is enabled
newState = checkSnapshots();
}
if (newState != null) {
logger.info("Identified a snapshot of left member for repartition");
} else {
newState = partitionStateManager.repartition(shutdownRequestedMembers, null);
}
if (newState == null) {
migrationQueue.add(new ProcessShutdownRequestsTask());
return null;
Expand All @@ -891,6 +903,31 @@ private PartitionReplica[][] repartition() {
return newState;
}

PartitionReplica[][] checkSnapshots() {
Set<UUID> shutdownRequestedReplicas = new HashSet<>();
Set<UUID> currentReplicas = new HashSet<>();
Map<UUID, Address> currentAddressMapping = new HashMap<>();
shutdownRequestedMembers.forEach(member -> shutdownRequestedReplicas.add(member.getUuid()));

Collection<Member> currentMembers = node.getClusterService().getMembers(DATA_MEMBER_SELECTOR);
currentMembers.forEach(member -> currentReplicas.add(member.getUuid()));
currentMembers.forEach(member -> currentAddressMapping.put(member.getUuid(), member.getAddress()));

Set<PartitionTableView> candidates = new TreeSet<>(new
PartitionTableViewDistanceComparator(partitionStateManager.getPartitionTable()));

for (PartitionTableView partitionTableView : partitionStateManager.snapshots()) {
if (partitionTableView.composedOf(currentReplicas, shutdownRequestedReplicas)) {
candidates.add(partitionTableView);
}
}
if (candidates.isEmpty()) {
return null;
}
// find least distant
return candidates.iterator().next().toArray(currentAddressMapping);
}

/**
* Assigns new owners to completely lost partitions (which do not have owners for any replica)
* when cluster state does not allow migrations/repartitioning but allows promotions.
Expand Down Expand Up @@ -1973,4 +2010,36 @@ public void run() {
publishCompletedMigrations();
}
}

/**
* Comparator that compares distance of two {@link PartitionTableView}s against a base
* {@link PartitionTableView} that is provided at construction time.
* Distance of two {@link PartitionTableView}s is the sum of distances of their
* respective {link InternalPartition} distances. The distance between two
* {@link InternalPartition}s is calculated as follows:
* <ul>
* <li>If a {@link PartitionReplica} occurs in both {@link InternalPartition}s, then
* their distance is the absolute difference of their respective replica indices.</li>
* <li>If {@code null} {@link PartitionReplica}s occur at the same replica index, then
* their distance is 0.</li>
* <li>If a non-{@code null} {@link PartitionReplica} is present in one {@link InternalPartition}
* and not the other, then its distance is {@link InternalPartition#MAX_REPLICA_COUNT}.</li>
* </ul>
*/
static class PartitionTableViewDistanceComparator implements Comparator<PartitionTableView> {
final PartitionTableView basePartitionTableView;

PartitionTableViewDistanceComparator(PartitionTableView basePartitionTableView) {
this.basePartitionTableView = basePartitionTableView;
}

@Override
public int compare(PartitionTableView o1, PartitionTableView o2) {
return distanceFromBase(o1) - distanceFromBase(o2);
}

int distanceFromBase(PartitionTableView partitionTableView) {
return partitionTableView.distanceOf(basePartitionTableView);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,14 @@
*/
public class PartitionReplicaManager implements PartitionReplicaVersionManager {

// Allow running partition replica sync on generic operation threads? Default is no,
// requires explicitly setting system property hazelcast.partition.replica.offload to true
/**
* Allow running partition replica sync on generic operation threads? Default is true.
* System property supplied as a workaround in case of unexpected issues.
* @since 5.0
*/
private static final String PARTITION_REPLICA_ALLOW_OFFLOAD = "hazelcast.partition.replica.offload";
private static final boolean ALLOW_OFFLOAD = Boolean.getBoolean(PARTITION_REPLICA_ALLOW_OFFLOAD);
private static final boolean ALLOW_OFFLOAD =
Boolean.parseBoolean(System.getProperty(PARTITION_REPLICA_ALLOW_OFFLOAD, "true"));

private final Node node;
private final NodeEngineImpl nodeEngine;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.hazelcast.spi.partitiongroup.MemberGroup;

import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -449,12 +450,14 @@ PartitionTableView getPartitionTable() {
}

void storeSnapshot(UUID crashedMemberUuid) {
if (logger.isFineEnabled()) {
logger.info("Storing snapshot of partition assignments while removing UUID " + crashedMemberUuid);
}
logger.info("Storing snapshot of partition assignments while removing UUID " + crashedMemberUuid);
snapshotOnRemove.put(crashedMemberUuid, getPartitionTable());
}

Collection<PartitionTableView> snapshots() {
return Collections.unmodifiableCollection(snapshotOnRemove.values());
}

PartitionTableView getSnapshot(UUID crashedMemberUuid) {
return snapshotOnRemove.get(crashedMemberUuid);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public void run() {
}

Address ownerAddress = partition.getOwnerOrNull();
if (getAllKnownAliases(getCallerAddress()).stream().noneMatch(ownerAddress::equals)) {
if (ownerAddress != null
&& getAllKnownAliases(getCallerAddress()).stream().noneMatch(ownerAddress::equals)) {
logger.fine("Anti-entropy operation for partitionId=" + getPartitionId() + ", replicaIndex=" + getReplicaIndex()
+ " is received from " + getCallerAddress() + ", but it's not the known primary replica owner: "
+ ownerAddress);
Expand Down

0 comments on commit 750c504

Please sign in to comment.