From 7323678b22224cecef696c69d6fb42968092d3ac Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 31 May 2022 14:49:53 -0500 Subject: [PATCH 1/3] Revert "Allow `PersistentShardCoordinator` to tolerate duplicate `ShardHomeAllocated` messages (#5967)" This reverts commit 4fa7abb62e07273d7282346531beafe2a36bd2ec. --- .../PersistentShardCoordinator.cs | 47 +++++-------------- 1 file changed, 12 insertions(+), 35 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs index 3ac0ae4f569..645e0a6c360 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs @@ -109,15 +109,12 @@ public IImmutableSet AllShards } /// - /// Feed an event into the ShardCoordinator state. + /// TBD /// - /// The event to process. - /// A flag to indicate if we're trying to recover state previously stored in the database. - /// We need to be more tolerant when this happens in the name of trying to accelerate recovery, so the system doesn't compromise - /// itself and go offline. - /// Thrown if an event is illegal in the current state. - /// An update copy of this state. - public State Updated(IDomainEvent e, bool isRecovering = false) + /// TBD + /// TBD + /// TBD + public State Updated(IDomainEvent e) { switch (e) { @@ -156,27 +153,7 @@ public State Updated(IDomainEvent e, bool isRecovering = false) if (!Regions.TryGetValue(message.Region, out var shardRegions)) throw new ArgumentException($"Region {message.Region} not registered", nameof(e)); if (Shards.ContainsKey(message.Shard)) - { - if (!isRecovering) - throw new ArgumentException($"Shard {message.Shard} is already allocated", - nameof(e)); - - // per https://github.com/akkadotnet/akka.net/issues/5604 - // we're going to allow new value to overwrite previous - var newRegions = Regions; - var previousRegion = Shards[message.Shard]; - if (Regions.TryGetValue(previousRegion, out var previousShards)) - { - newRegions = newRegions.SetItem(previousRegion, - previousShards.Remove(message.Shard)); - } - var newUnallocatedShardsRecovery = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards; - return Copy( - shards: Shards.SetItem(message.Shard, message.Region), - regions: newRegions.SetItem(message.Region, shardRegions.Add(message.Shard)), - unallocatedShards: newUnallocatedShardsRecovery); - } - + throw new ArgumentException($"Shard {message.Shard} is already allocated", nameof(e)); var newUnallocatedShards = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards; return Copy( @@ -1369,26 +1346,26 @@ protected override bool ReceiveRecover(object message) switch (evt) { case ShardRegionRegistered _: - CurrentState = CurrentState.Updated(evt, true); + CurrentState = CurrentState.Updated(evt); return true; case ShardRegionProxyRegistered _: - CurrentState = CurrentState.Updated(evt, true); + CurrentState = CurrentState.Updated(evt); return true; case ShardRegionTerminated regionTerminated: if (CurrentState.Regions.ContainsKey(regionTerminated.Region)) - CurrentState = CurrentState.Updated(evt, true); + CurrentState = CurrentState.Updated(evt); else Log.Debug("ShardRegionTerminated but region {0} was not registered", regionTerminated.Region); return true; case ShardRegionProxyTerminated proxyTerminated: if (CurrentState.RegionProxies.Contains(proxyTerminated.RegionProxy)) - CurrentState = CurrentState.Updated(evt, true); + CurrentState = CurrentState.Updated(evt); return true; case ShardHomeAllocated _: - CurrentState = CurrentState.Updated(evt, true); + CurrentState = CurrentState.Updated(evt); return true; case ShardHomeDeallocated _: - CurrentState = CurrentState.Updated(evt, true); + CurrentState = CurrentState.Updated(evt); return true; } return false; From 0d6325a8f25027fbf8819517d575d3ae3401e842 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 31 May 2022 14:54:50 -0500 Subject: [PATCH 2/3] de-duplicate only identical `ShardHomeAllocated` messages close https://github.com/akkadotnet/akka.net/issues/5604 --- .../PersistentShardCoordinator.cs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs index 645e0a6c360..ff567449a40 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs @@ -109,11 +109,11 @@ public IImmutableSet AllShards } /// - /// TBD + /// Feed an event into the ShardCoordinator state. /// - /// TBD - /// TBD - /// TBD + /// The event to process. + /// Thrown if an event is illegal in the current state. + /// An update copy of this state. public State Updated(IDomainEvent e) { switch (e) @@ -1361,8 +1361,12 @@ protected override bool ReceiveRecover(object message) if (CurrentState.RegionProxies.Contains(proxyTerminated.RegionProxy)) CurrentState = CurrentState.Updated(evt); return true; - case ShardHomeAllocated _: - CurrentState = CurrentState.Updated(evt); + case ShardHomeAllocated homeAllocated: + // if we already have identical ShardHomeAllocated data, skip processing it + // addresses https://github.com/akkadotnet/akka.net/issues/5604 + if(!(CurrentState.Shards.TryGetValue(homeAllocated.Shard, out var currentShardRegion) + && Equals(homeAllocated.Region, currentShardRegion))) + CurrentState = CurrentState.Updated(evt); return true; case ShardHomeDeallocated _: CurrentState = CurrentState.Updated(evt); From f08d7e6a6f8e6572eea24a03200507f4dfb39d77 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 31 May 2022 15:01:37 -0500 Subject: [PATCH 3/3] made syntax clearer --- .../Akka.Cluster.Sharding/PersistentShardCoordinator.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs index ff567449a40..1b7d56f5ab9 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs @@ -1364,9 +1364,10 @@ protected override bool ReceiveRecover(object message) case ShardHomeAllocated homeAllocated: // if we already have identical ShardHomeAllocated data, skip processing it // addresses https://github.com/akkadotnet/akka.net/issues/5604 - if(!(CurrentState.Shards.TryGetValue(homeAllocated.Shard, out var currentShardRegion) - && Equals(homeAllocated.Region, currentShardRegion))) - CurrentState = CurrentState.Updated(evt); + if (CurrentState.Shards.TryGetValue(homeAllocated.Shard, out var currentShardRegion) + && Equals(homeAllocated.Region, currentShardRegion)) + return true; + CurrentState = CurrentState.Updated(evt); return true; case ShardHomeDeallocated _: CurrentState = CurrentState.Updated(evt);