New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow PersistentShardCoordinator
to tolerate duplicate ShardHomeAllocated
messages
#5967
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -109,12 +109,15 @@ public IImmutableSet<ShardId> AllShards | |
} | ||
|
||
/// <summary> | ||
/// TBD | ||
/// Feed an event into the ShardCoordinator state. | ||
/// </summary> | ||
/// <param name="e">TBD</param> | ||
/// <exception cref="ArgumentException">TBD</exception> | ||
/// <returns>TBD</returns> | ||
public State Updated(IDomainEvent e) | ||
/// <param name="e">The event to process.</param> | ||
/// <param name="isRecovering">A flag to indicate if we're trying to recover state previously stored in the database. | ||
/// We need to be more tolerant when this happens in the name of trying to accelerate recovery, so the system doesn't compromise | ||
/// itself and go offline.</param> | ||
/// <exception cref="ArgumentException">Thrown if an event is illegal in the current state.</exception> | ||
/// <returns>An update copy of this state.</returns> | ||
public State Updated(IDomainEvent e, bool isRecovering = false) | ||
{ | ||
switch (e) | ||
{ | ||
|
@@ -153,7 +156,27 @@ public State Updated(IDomainEvent e) | |
if (!Regions.TryGetValue(message.Region, out var shardRegions)) | ||
throw new ArgumentException($"Region {message.Region} not registered", nameof(e)); | ||
if (Shards.ContainsKey(message.Shard)) | ||
throw new ArgumentException($"Shard {message.Shard} is already allocated", nameof(e)); | ||
{ | ||
if (!isRecovering) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've decided to implement reader tolerance in only one location for now the |
||
throw new ArgumentException($"Shard {message.Shard} is already allocated", | ||
nameof(e)); | ||
|
||
// per https://github.com/akkadotnet/akka.net/issues/5604 | ||
// we're going to allow new value to overwrite previous | ||
var newRegions = Regions; | ||
var previousRegion = Shards[message.Shard]; | ||
if (Regions.TryGetValue(previousRegion, out var previousShards)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ALGO is as follows:
|
||
{ | ||
newRegions = newRegions.SetItem(previousRegion, | ||
previousShards.Remove(message.Shard)); | ||
} | ||
var newUnallocatedShardsRecovery = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards; | ||
return Copy( | ||
shards: Shards.SetItem(message.Shard, message.Region), | ||
regions: newRegions.SetItem(message.Region, shardRegions.Add(message.Shard)), | ||
unallocatedShards: newUnallocatedShardsRecovery); | ||
} | ||
|
||
|
||
var newUnallocatedShards = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards; | ||
return Copy( | ||
|
@@ -1346,26 +1369,26 @@ protected override bool ReceiveRecover(object message) | |
switch (evt) | ||
{ | ||
case ShardRegionRegistered _: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sure all of the |
||
CurrentState = CurrentState.Updated(evt); | ||
CurrentState = CurrentState.Updated(evt, true); | ||
return true; | ||
case ShardRegionProxyRegistered _: | ||
CurrentState = CurrentState.Updated(evt); | ||
CurrentState = CurrentState.Updated(evt, true); | ||
return true; | ||
case ShardRegionTerminated regionTerminated: | ||
if (CurrentState.Regions.ContainsKey(regionTerminated.Region)) | ||
CurrentState = CurrentState.Updated(evt); | ||
CurrentState = CurrentState.Updated(evt, true); | ||
else | ||
Log.Debug("ShardRegionTerminated but region {0} was not registered", regionTerminated.Region); | ||
return true; | ||
case ShardRegionProxyTerminated proxyTerminated: | ||
if (CurrentState.RegionProxies.Contains(proxyTerminated.RegionProxy)) | ||
CurrentState = CurrentState.Updated(evt); | ||
CurrentState = CurrentState.Updated(evt, true); | ||
return true; | ||
case ShardHomeAllocated _: | ||
CurrentState = CurrentState.Updated(evt); | ||
CurrentState = CurrentState.Updated(evt, true); | ||
return true; | ||
case ShardHomeDeallocated _: | ||
CurrentState = CurrentState.Updated(evt); | ||
CurrentState = CurrentState.Updated(evt, true); | ||
return true; | ||
} | ||
return false; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We now accept a new parameter here,
isRecovering
- when this value is set totrue
then we enable "reader tolerance" inside thePersistentShardCoordinator
.