From 7323678b22224cecef696c69d6fb42968092d3ac Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 31 May 2022 14:49:53 -0500 Subject: [PATCH] Revert "Allow `PersistentShardCoordinator` to tolerate duplicate `ShardHomeAllocated` messages (#5967)" This reverts commit 4fa7abb62e07273d7282346531beafe2a36bd2ec. --- .../PersistentShardCoordinator.cs | 47 +++++-------------- 1 file changed, 12 insertions(+), 35 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs index 3ac0ae4f569..645e0a6c360 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs @@ -109,15 +109,12 @@ public IImmutableSet AllShards } /// - /// Feed an event into the ShardCoordinator state. + /// TBD /// - /// The event to process. - /// A flag to indicate if we're trying to recover state previously stored in the database. - /// We need to be more tolerant when this happens in the name of trying to accelerate recovery, so the system doesn't compromise - /// itself and go offline. - /// Thrown if an event is illegal in the current state. - /// An update copy of this state. - public State Updated(IDomainEvent e, bool isRecovering = false) + /// TBD + /// TBD + /// TBD + public State Updated(IDomainEvent e) { switch (e) { @@ -156,27 +153,7 @@ public State Updated(IDomainEvent e, bool isRecovering = false) if (!Regions.TryGetValue(message.Region, out var shardRegions)) throw new ArgumentException($"Region {message.Region} not registered", nameof(e)); if (Shards.ContainsKey(message.Shard)) - { - if (!isRecovering) - throw new ArgumentException($"Shard {message.Shard} is already allocated", - nameof(e)); - - // per https://github.com/akkadotnet/akka.net/issues/5604 - // we're going to allow new value to overwrite previous - var newRegions = Regions; - var previousRegion = Shards[message.Shard]; - if (Regions.TryGetValue(previousRegion, out var previousShards)) - { - newRegions = newRegions.SetItem(previousRegion, - previousShards.Remove(message.Shard)); - } - var newUnallocatedShardsRecovery = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards; - return Copy( - shards: Shards.SetItem(message.Shard, message.Region), - regions: newRegions.SetItem(message.Region, shardRegions.Add(message.Shard)), - unallocatedShards: newUnallocatedShardsRecovery); - } - + throw new ArgumentException($"Shard {message.Shard} is already allocated", nameof(e)); var newUnallocatedShards = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards; return Copy( @@ -1369,26 +1346,26 @@ protected override bool ReceiveRecover(object message) switch (evt) { case ShardRegionRegistered _: - CurrentState = CurrentState.Updated(evt, true); + CurrentState = CurrentState.Updated(evt); return true; case ShardRegionProxyRegistered _: - CurrentState = CurrentState.Updated(evt, true); + CurrentState = CurrentState.Updated(evt); return true; case ShardRegionTerminated regionTerminated: if (CurrentState.Regions.ContainsKey(regionTerminated.Region)) - CurrentState = CurrentState.Updated(evt, true); + CurrentState = CurrentState.Updated(evt); else Log.Debug("ShardRegionTerminated but region {0} was not registered", regionTerminated.Region); return true; case ShardRegionProxyTerminated proxyTerminated: if (CurrentState.RegionProxies.Contains(proxyTerminated.RegionProxy)) - CurrentState = CurrentState.Updated(evt, true); + CurrentState = CurrentState.Updated(evt); return true; case ShardHomeAllocated _: - CurrentState = CurrentState.Updated(evt, true); + CurrentState = CurrentState.Updated(evt); return true; case ShardHomeDeallocated _: - CurrentState = CurrentState.Updated(evt, true); + CurrentState = CurrentState.Updated(evt); return true; } return false;