Skip to content

Commit

Permalink
Backport of akkadotnet#5967 - Allow PersistentShardCoordinator to tol…
Browse files Browse the repository at this point in the history
…erate duplicate ShardHomeAllocated messages
  • Loading branch information
Arkatufus committed May 26, 2022
1 parent cfbdd5b commit fb3eb71
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 12 deletions.
Expand Up @@ -88,14 +88,14 @@ protected override bool ReceiveRecover(object message)
switch (evt)
{
case ShardRegionRegistered _:
State = State.Updated(evt);
State = State.Updated(evt, true);
return true;
case ShardRegionProxyRegistered _:
State = State.Updated(evt);
State = State.Updated(evt, true);
return true;
case ShardRegionTerminated regionTerminated:
if (State.Regions.ContainsKey(regionTerminated.Region))
State = State.Updated(evt);
State = State.Updated(evt, true);
else
//Log.Debug(
// "{0}: ShardRegionTerminated, but region {1} was not registered. This inconsistency is due to that " +
Expand All @@ -107,13 +107,13 @@ protected override bool ReceiveRecover(object message)
return true;
case ShardRegionProxyTerminated proxyTerminated:
if (State.RegionProxies.Contains(proxyTerminated.RegionProxy))
State = State.Updated(evt);
State = State.Updated(evt, true);
return true;
case ShardHomeAllocated _:
State = State.Updated(evt);
State = State.Updated(evt, true);
return true;
case ShardHomeDeallocated _:
State = State.Updated(evt);
State = State.Updated(evt, true);
return true;
case ShardCoordinatorInitialized _:
// not used here
Expand Down
34 changes: 28 additions & 6 deletions src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs
Expand Up @@ -1144,12 +1144,15 @@ public override string ToString()
}

/// <summary>
/// TBD
/// Feed an event into the ShardCoordinator state.
/// </summary>
/// <param name="e">TBD</param>
/// <exception cref="ArgumentException">TBD</exception>
/// <returns>TBD</returns>
public CoordinatorState Updated(IDomainEvent e)
/// <param name="e">The event to process.</param>
/// <param name="isRecovering">A flag to indicate if we're trying to recover state previously stored in the database.
/// We need to be more tolerant when this happens in the name of trying to accelerate recovery, so the system doesn't compromise
/// itself and go offline.</param>
/// <exception cref="ArgumentException">Thrown if an event is illegal in the current state.</exception>
/// <returns>An update copy of this state.</returns>
public CoordinatorState Updated(IDomainEvent e, bool isRecovering = false)
{
switch (e)
{
Expand Down Expand Up @@ -1188,7 +1191,26 @@ public CoordinatorState Updated(IDomainEvent e)
if (!Regions.TryGetValue(message.Region, out var shardRegions))
throw new ArgumentException($"Region {message.Region} not registered: {this}", nameof(e));
if (Shards.ContainsKey(message.Shard))
throw new ArgumentException($"Shard {message.Shard} already allocated: {this}", nameof(e));
{
if(!isRecovering)
throw new ArgumentException($"Shard {message.Shard} already allocated: {this}",
nameof(e));

// per https://github.com/akkadotnet/akka.net/issues/5604
// we're going to allow new value to overwrite previous
var newRegions = Regions;
var previousRegion = Shards[message.Shard];
if (Regions.TryGetValue(previousRegion, out var previousShards))
{
newRegions = newRegions.SetItem(previousRegion,
previousShards.Remove(message.Shard));
}
var newUnallocatedShardsRecovery = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards;
return Copy(
shards: Shards.SetItem(message.Shard, message.Region),
regions: newRegions.SetItem(message.Region, shardRegions.Add(message.Shard)),
unallocatedShards: newUnallocatedShardsRecovery);
}

var newUnallocatedShards = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards;
return Copy(
Expand Down

0 comments on commit fb3eb71

Please sign in to comment.