Skip to content

Commit

Permalink
Fix IMessageExtractor technical debts in tests (#7099)
Browse files Browse the repository at this point in the history
* Fix Akka.Cluster.Sharding.Tests

* Fix Akka.Cluster.Tests.MultiNode

* Fix ClusterShardingSpec, improve NRE exception message

* Fix PersistentShardingMigrationSpec
  • Loading branch information
Arkatufus committed Feb 16, 2024
1 parent 1c8ffc3 commit f7ed2ac
Show file tree
Hide file tree
Showing 32 changed files with 661 additions and 549 deletions.
Expand Up @@ -141,25 +141,28 @@ protected override bool Receive(object message)
}
}

private ExtractEntityId extractEntityId = message =>
private sealed class MessageExtractor: IMessageExtractor
{
switch (message)
{
case Ping p:
return (p.Id, message);
}
return Option<(string, object)>.None;
};
public string EntityId(object message)
=> message switch
{
Ping p => p.Id,
_ => null
};

internal ExtractShardId extractShardId = message =>
{
switch (message)
{
case Ping p:
return p.Id[0].ToString();
}
return null;
};
public object EntityMessage(object message)
=> message;

public string ShardId(object message)
=> message switch
{
Ping p => p.Id[0].ToString(),
_ => null
};

public string ShardId(string entityId, object messageHint = null)
=> entityId[0].ToString();
}

private readonly Lazy<IActorRef> _region;

Expand All @@ -172,11 +175,10 @@ protected ClusterShardCoordinatorDowning2Spec(ClusterShardCoordinatorDowning2Spe
private void StartSharding()
{
StartSharding(
Sys,
typeName: "Entity",
entityProps: Props.Create(() => new Entity()),
extractEntityId: extractEntityId,
extractShardId: extractShardId);
Sys,
typeName: "Entity",
entityProps: Props.Create(() => new Entity()),
messageExtractor: new MessageExtractor());
}

#endregion
Expand Down
Expand Up @@ -143,25 +143,28 @@ protected override bool Receive(object message)
}
}

private ExtractEntityId extractEntityId = message =>
private sealed class MessageExtractor: IMessageExtractor
{
switch (message)
{
case Ping p:
return (p.Id, message);
}
return Option<(string, object)>.None;
};
public string EntityId(object message)
=> message switch
{
Ping p => p.Id,
_ => null
};

internal ExtractShardId extractShardId = message =>
{
switch (message)
{
case Ping p:
return p.Id[0].ToString();
}
return null;
};
public object EntityMessage(object message)
=> message;

public string ShardId(object message)
=> message switch
{
Ping p => p.Id[0].ToString(),
_ => null
};

public string ShardId(string entityId, object messageHint = null)
=> entityId[0].ToString();
}

private readonly Lazy<IActorRef> _region;

Expand All @@ -177,8 +180,7 @@ private void StartSharding()
Sys,
typeName: "Entity",
entityProps: Props.Create(() => new Entity()),
extractEntityId: extractEntityId,
extractShardId: extractShardId);
messageExtractor: new MessageExtractor());
}

#endregion
Expand Down
Expand Up @@ -191,8 +191,6 @@ private void Join(RoleName from, RoleName to)
Sys,
typeName: "Entity",
entityProps: SimpleEchoActor.Props(),
extractEntityId: IntExtractEntityId,
extractShardId: IntExtractShardId,
allocationStrategy: new TestAllocationStrategy(_allocator.Value))
);
}
Expand Down
Expand Up @@ -138,31 +138,30 @@ protected override void PostStop()
}
}

private ExtractEntityId extractEntityId = message =>
private sealed class MessageExtractor: IMessageExtractor
{
switch (message)
{
case Get msg:
return (msg.Id, message);
case Add msg:
return (msg.Id, message);
}
return Option<(string, object)>.None;
};
public string EntityId(object message)
=> message switch
{
Get msg => msg.Id,
Add msg => msg.Id,
_ => null
};

private ExtractShardId extractShardId = message =>
{
switch (message)
{
case Get msg:
return msg.Id[0].ToString();
case Add msg:
return msg.Id[0].ToString();
case ShardRegion.StartEntity se:
return se.EntityId;
}
return null;
};
public object EntityMessage(object message)
=> message;

public string ShardId(object message)
=> message switch
{
Get msg => msg.Id[0].ToString(),
Add msg => msg.Id[0].ToString(),
_ => null
};

public string ShardId(string entityId, object messageHint = null)
=> entityId[0].ToString();
}

private readonly Lazy<IActorRef> _region;

Expand All @@ -179,8 +178,7 @@ private void Join(RoleName from, RoleName to)
Sys,
typeName: "Entity",
entityProps: Props.Create(() => new Entity()),
extractEntityId: extractEntityId,
extractShardId: extractShardId)
messageExtractor: new MessageExtractor())
);
}

Expand Down
Expand Up @@ -48,25 +48,28 @@ public class ClusterShardingGetStateSpec : MultiNodeClusterShardingSpec<ClusterS
private const int NumberOfShards = 2;
private const string ShardTypeName = "Ping";

private ExtractEntityId extractEntityId = message =>
private sealed class MessageExtractor: IMessageExtractor
{
switch (message)
{
case PingPongActor.Ping msg:
return (msg.Id.ToString(), message);
}
return Option<(string, object)>.None;
};
public string EntityId(object message)
=> message switch
{
PingPongActor.Ping p => p.Id.ToString(),
_ => null
};

private ExtractShardId extractShardId = message =>
{
switch (message)
{
case PingPongActor.Ping msg:
return (msg.Id % NumberOfShards).ToString();
}
return null;
};
public object EntityMessage(object message)
=> message;

public string ShardId(object message)
=> message switch
{
PingPongActor.Ping p => (p.Id % NumberOfShards).ToString(),
_ => null
};

public string ShardId(string entityId, object messageHint = null)
=> (int.Parse(entityId) % NumberOfShards).ToString();
}

public ClusterShardingGetStateSpec()
: this(new ClusterShardingGetStateSpecConfig(), typeof(ClusterShardingGetStateSpec))
Expand Down Expand Up @@ -108,8 +111,7 @@ private void Inspecting_cluster_sharding_state_must_join_cluster()
Sys,
typeName: ShardTypeName,
role: "shard",
extractEntityId: extractEntityId,
extractShardId: extractShardId);
messageExtractor: new MessageExtractor());
}, Config.Controller);

RunOn(() =>
Expand All @@ -119,8 +121,7 @@ private void Inspecting_cluster_sharding_state_must_join_cluster()
typeName: ShardTypeName,
entityProps: Props.Create(() => new PingPongActor()),
settings: Settings.Value.WithRole("shard"),
extractEntityId: extractEntityId,
extractShardId: extractShardId);
messageExtractor: new MessageExtractor());
}, Config.First, Config.Second);

EnterBarrier("sharding started");
Expand Down
Expand Up @@ -49,25 +49,28 @@ public class ClusterShardingGetStatsSpec : MultiNodeClusterShardingSpec<ClusterS
private const int NumberOfShards = 3;
private const string ShardTypeName = "Ping";

private ExtractEntityId extractEntityId = message =>
private sealed class MessageExtractor: IMessageExtractor
{
switch (message)
{
case PingPongActor.Ping msg:
return (msg.Id.ToString(), message);
}
return Option<(string, object)>.None;
};
public string EntityId(object message)
=> message switch
{
PingPongActor.Ping p => p.Id.ToString(),
_ => null
};

private ExtractShardId extractShardId = message =>
{
switch (message)
{
case PingPongActor.Ping msg:
return (msg.Id % NumberOfShards).ToString();
}
return null;
};
public object EntityMessage(object message)
=> message;

public string ShardId(object message)
=> message switch
{
PingPongActor.Ping p => (p.Id % NumberOfShards).ToString(),
_ => null
};

public string ShardId(string entityId, object messageHint = null)
=> (int.Parse(entityId) % NumberOfShards).ToString();
}

private readonly Lazy<IActorRef> _region;

Expand All @@ -89,8 +92,7 @@ private IActorRef StartShard()
typeName: ShardTypeName,
entityProps: Props.Create(() => new PingPongActor()),
settings: Settings.Value.WithRole("shard"),
extractEntityId: extractEntityId,
extractShardId: extractShardId);
messageExtractor: new MessageExtractor());
}

#endregion
Expand Down Expand Up @@ -127,8 +129,7 @@ private void Inspecting_cluster_sharding_state_must_join_cluster()
Sys,
typeName: ShardTypeName,
role: "shard",
extractEntityId: extractEntityId,
extractShardId: extractShardId);
messageExtractor: new MessageExtractor());
}, Config.Controller);

Expand Down
Expand Up @@ -188,8 +188,6 @@ private IActorRef StartSharding(string typeName)
Sys,
typeName,
entityProps: Props.Create(() => new SlowStopShardedEntity()),
extractEntityId: IntExtractEntityId,
extractShardId: IntExtractShardId,
allocationStrategy: ShardAllocationStrategy.LeastShardAllocationStrategy(absoluteLimit: 2, relativeLimit: 1.0),
handOffStopMessage: SlowStopShardedEntity.Stop.Instance);
}
Expand Down
Expand Up @@ -102,8 +102,6 @@ private IActorRef StartSharding(string typeName)
Sys,
typeName,
entityProps: Props.Create(() => new ShardedEntity()),
extractEntityId: IntExtractEntityId,
extractShardId: IntExtractShardId,
allocationStrategy: ShardAllocationStrategy.LeastShardAllocationStrategy(absoluteLimit: 2, relativeLimit: 1.0),
handOffStopMessage: ShardedEntity.Stop.Instance);
}
Expand Down
Expand Up @@ -129,25 +129,28 @@ public ShardLocations()
}
}

private ExtractEntityId extractEntityId = message =>
private sealed class MessageExtractor: IMessageExtractor
{
switch (message)
{
case Ping msg:
return (msg.Id, message);
}
return Option<(string, object)>.None;
};
public string EntityId(object message)
=> message switch
{
Ping p => p.Id,
_ => null
};

private ExtractShardId extractShardId = message =>
{
switch (message)
{
case Ping msg:
return msg.Id[0].ToString();
}
return null;
};
public object EntityMessage(object message)
=> message;

public string ShardId(object message)
=> message switch
{
Ping p => p.Id[0].ToString(),
_ => null
};

public string ShardId(string entityId, object messageHint = null)
=> entityId[0].ToString();
}

private readonly Lazy<IActorRef> _region;

Expand All @@ -163,8 +166,7 @@ private void StartSharding()
Sys,
typeName: "Entity",
entityProps: Props.Create(() => new Entity()),
extractEntityId: extractEntityId,
extractShardId: extractShardId);
messageExtractor: new MessageExtractor());
}

#endregion
Expand Down

0 comments on commit f7ed2ac

Please sign in to comment.