diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs index 645e0a6c360..3ac0ae4f569 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs @@ -109,12 +109,15 @@ public IImmutableSet AllShards } /// - /// TBD + /// Feed an event into the ShardCoordinator state. /// - /// TBD - /// TBD - /// TBD - public State Updated(IDomainEvent e) + /// The event to process. + /// A flag to indicate if we're trying to recover state previously stored in the database. + /// We need to be more tolerant when this happens in the name of trying to accelerate recovery, so the system doesn't compromise + /// itself and go offline. + /// Thrown if an event is illegal in the current state. + /// An update copy of this state. + public State Updated(IDomainEvent e, bool isRecovering = false) { switch (e) { @@ -153,7 +156,27 @@ public State Updated(IDomainEvent e) if (!Regions.TryGetValue(message.Region, out var shardRegions)) throw new ArgumentException($"Region {message.Region} not registered", nameof(e)); if (Shards.ContainsKey(message.Shard)) - throw new ArgumentException($"Shard {message.Shard} is already allocated", nameof(e)); + { + if (!isRecovering) + throw new ArgumentException($"Shard {message.Shard} is already allocated", + nameof(e)); + + // per https://github.com/akkadotnet/akka.net/issues/5604 + // we're going to allow new value to overwrite previous + var newRegions = Regions; + var previousRegion = Shards[message.Shard]; + if (Regions.TryGetValue(previousRegion, out var previousShards)) + { + newRegions = newRegions.SetItem(previousRegion, + previousShards.Remove(message.Shard)); + } + var newUnallocatedShardsRecovery = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards; + return Copy( + shards: Shards.SetItem(message.Shard, message.Region), + regions: newRegions.SetItem(message.Region, shardRegions.Add(message.Shard)), + unallocatedShards: newUnallocatedShardsRecovery); + } + var newUnallocatedShards = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards; return Copy( @@ -1346,26 +1369,26 @@ protected override bool ReceiveRecover(object message) switch (evt) { case ShardRegionRegistered _: - CurrentState = CurrentState.Updated(evt); + CurrentState = CurrentState.Updated(evt, true); return true; case ShardRegionProxyRegistered _: - CurrentState = CurrentState.Updated(evt); + CurrentState = CurrentState.Updated(evt, true); return true; case ShardRegionTerminated regionTerminated: if (CurrentState.Regions.ContainsKey(regionTerminated.Region)) - CurrentState = CurrentState.Updated(evt); + CurrentState = CurrentState.Updated(evt, true); else Log.Debug("ShardRegionTerminated but region {0} was not registered", regionTerminated.Region); return true; case ShardRegionProxyTerminated proxyTerminated: if (CurrentState.RegionProxies.Contains(proxyTerminated.RegionProxy)) - CurrentState = CurrentState.Updated(evt); + CurrentState = CurrentState.Updated(evt, true); return true; case ShardHomeAllocated _: - CurrentState = CurrentState.Updated(evt); + CurrentState = CurrentState.Updated(evt, true); return true; case ShardHomeDeallocated _: - CurrentState = CurrentState.Updated(evt); + CurrentState = CurrentState.Updated(evt, true); return true; } return false;