Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddata ReadMajorityPlus and WriteMajorityPlus #5146

Merged
merged 6 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class ReplicatorSpec : MultiNodeClusterSpec
private readonly ORDictionaryKey<string, Flag> KeyH = new ORDictionaryKey<string, Flag>("H");
private readonly GSetKey<string> KeyI = new GSetKey<string>("I");
private readonly GSetKey<string> KeyJ = new GSetKey<string>("J");
private readonly LWWRegisterKey<string> KeyK = new LWWRegisterKey<string>("K");
private readonly GCounterKey KeyX = new GCounterKey("X");
private readonly GCounterKey KeyY = new GCounterKey("Y");
private readonly GCounterKey KeyZ = new GCounterKey("Z");
Expand Down Expand Up @@ -407,7 +408,7 @@ public void Cluster_CRDT_should_be_replicated_after_successful_update()
_replicator.Tell(Dsl.Get(KeyC, ReadLocal.Instance));
var c = ExpectMsg<GetSuccess>(g => Equals(g.Key, KeyC), TimeSpan.FromMilliseconds(300)).Get(KeyC);
c.Value.ShouldBe(33UL);
}, interval:TimeSpan.FromMilliseconds(300));
}, interval: TimeSpan.FromMilliseconds(300));
});
}, _first, _second);

Expand Down Expand Up @@ -713,9 +714,9 @@ public void Cluster_CRDT_should_avoid_duplicate_change_events_for_same_data()

Within(TimeSpan.FromSeconds(5), () =>
{
var changed = changedProbe.ExpectMsg<Changed>(c =>
c.Get(KeyI).Elements.ShouldBe(ImmutableHashSet.Create("a")));

var changed = changedProbe.ExpectMsg<Changed>(c =>
c.Get(KeyI).Elements.ShouldBe(ImmutableHashSet.Create("a")));
var keyIData = changed.Get(KeyI);
Sys.Log.Debug("DEBUG: Received Changed {0}", changed);
});
Expand All @@ -731,6 +732,60 @@ public void Cluster_CRDT_should_avoid_duplicate_change_events_for_same_data()

}

public void Cluster_CRDT_should_support_prefer_oldest_members()
{
// disable gossip and delta replication to only verify the write and read operations
var oldestReplicator = Sys.ActorOf(
Replicator.Props(
ReplicatorSettings.Create(Sys).WithPreferOldest(true).WithGossipInterval(TimeSpan.FromMinutes(1))),//.withDeltaCrdtEnabled(false)),
"oldestReplicator");
Within(TimeSpan.FromSeconds(5), () =>
{
var countProbe = CreateTestProbe();
AwaitAssert(() =>
{
oldestReplicator.Tell(GetReplicaCount.Instance, countProbe.Ref);
countProbe.ExpectMsg(new ReplicaCount(3));
});
});
EnterBarrier("oldest-replicator-started");

var probe = CreateTestProbe();

RunOn(() =>
{
oldestReplicator.Tell(
Dsl.Update(KeyK, new LWWRegister<string>(Cluster.SelfUniqueAddress, "0"), _writeTwo, a => a.WithValue(Cluster.SelfUniqueAddress, "1")),
probe.Ref);
probe.ExpectMsg(new UpdateSuccess(KeyK, null));
}, _second);
EnterBarrier("updated-1");

RunOn(() =>
{
// replicated to oldest
oldestReplicator.Tell(new Get(KeyK, ReadLocal.Instance), probe.Ref);
var msg = probe.ExpectMsg<GetSuccess>(m => m.Data is LWWRegister<string>);
((LWWRegister<string>)msg.Data).Value.Should().Be("1");
//probe.ExpectMsg<GetSuccess[LWWRegister[String]]>.dataValue.value should === ("1");
}, _first);

RunOn(() =>
{
// not replicated to third (not among the two oldest)
oldestReplicator.Tell(Dsl.Get(KeyK, ReadLocal.Instance), probe.Ref);
probe.ExpectMsg(new NotFound(KeyK, null));

// read from oldest
oldestReplicator.Tell(Dsl.Get(KeyK, _readTwo), probe.Ref);
var msg = probe.ExpectMsg<GetSuccess>(m => m.Data is LWWRegister<string>);
((LWWRegister<string>)msg.Data).Value.Should().Be("1");
//probe.ExpectMsg<GetSuccess[LWWRegister[String]]>.dataValue.value should === ("1");
}, _third);

EnterBarrierAfterTestStep();
}

protected override int InitialParticipantsValueFactory => Roles.Count;
private void EnterBarrierAfterTestStep()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class ReplicatorResiliencySpec : AkkaSpec
private readonly IActorRef _replicator1;
private readonly IActorRef _replicator2;
private readonly IActorRef _replicator3;

static ReplicatorResiliencySpec()
{
SpecConfig = ConfigurationFactory.ParseString(@"
Expand All @@ -52,7 +52,7 @@ public ReplicatorResiliencySpec(ITestOutputHelper helper) : base(SpecConfig, hel
_sys1 = Sys;
_sys3 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
_sys2 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);

var settings = ReplicatorSettings.Create(Sys)
.WithGossipInterval(TimeSpan.FromSeconds(1.0))
.WithMaxDeltaElements(10)
Expand All @@ -68,14 +68,14 @@ public ReplicatorResiliencySpec(ITestOutputHelper helper) : base(SpecConfig, hel
maxNrOfRetries: -1)
.WithFinalStopMessage(m => m is Terminate))
.WithDeploy(Deploy.Local).WithDispatcher(settings.Dispatcher);

_replicator1 = _sys1.ActorOf(props, "replicatorSuper");
_replicator2 = _sys2.ActorOf(props, "replicatorSuper");
_replicator3 = _sys3.ActorOf(props, "replicatorSuper");


}

private async Task InitCluster()
{
Cluster.Cluster.Get(_sys1).Join(Cluster.Cluster.Get(_sys1).SelfAddress); // coordinator will initially run on sys1
Expand Down Expand Up @@ -107,7 +107,7 @@ private async Task InitCluster()
});
});
}

[Fact]
public async Task Handle_Durable_Store_Exception()
{
Expand All @@ -119,10 +119,10 @@ public async Task DurableStoreActorCrash()
{
const string replicatorActorPath = "/user/replicatorSuper/replicator";
const string durableStoreActorPath = "/user/replicatorSuper/replicator/durableStore";

var durableStore = _sys1.ActorSelection(durableStoreActorPath).ResolveOne(TimeSpan.FromSeconds(3)).ContinueWith(
m => m.Result).Result;

var replicator = _sys1.ActorSelection(replicatorActorPath).ResolveOne(TimeSpan.FromSeconds(3)).ContinueWith(
m => m.Result).Result;

Expand All @@ -135,30 +135,30 @@ public async Task DurableStoreActorCrash()
{
throw new Exception(
$"Expecting termination of either durable storage or replicator, found {terminated.ActorRef.Path} instead.");
}
}

terminated = ExpectMsg<Terminated>(TimeSpan.FromSeconds(10));
if (!terminated.ActorRef.Path.Equals(durableStore.Path) && !terminated.ActorRef.Path.Equals(replicator.Path))
{
throw new Exception(
$"Expecting termination of either durable storage or replicator, found {terminated.ActorRef.Path} instead.");
}

//The supervisor should have restarted the replicator actor by now
await AwaitAssertAsync(async () =>
{
// Is the replicator actor recreated
var newReplicator = await _sys1.ActorSelection(replicatorActorPath).ResolveOne(TimeSpan.FromSeconds(5)).ContinueWith(
m => m.Result);

// We should be able to identify the recreated actor to prove the actor exists
await newReplicator.Ask<ActorIdentity>(new Identify(Guid.NewGuid().ToString())).ContinueWith(r =>
{
Assert.Equal(replicatorActorPath,r.Result.Subject.Path.ToStringWithoutAddress());
});
},TimeSpan.FromSeconds(10));
}

[Fact]
public async Task DistributedData_Replicator_Defaults_to_NoSupervisor()
{
Expand All @@ -167,8 +167,8 @@ public async Task DistributedData_Replicator_Defaults_to_NoSupervisor()

await InitCluster();
var replicator = DistributedData.Get(_sys1).Replicator;
IActorRef durableStore = null;

IActorRef durableStore = null;
await AwaitAssertAsync(() =>
{
durableStore = _sys1.ActorSelection(durableStoreActorPath).ResolveOne(TimeSpan.FromSeconds(3))
Expand All @@ -181,15 +181,16 @@ public async Task DistributedData_Replicator_Defaults_to_NoSupervisor()
durableStore.Tell(new InitFail());

// termination orders aren't guaranteed, so can't use ExpectTerminated here
var terminated = ReceiveN(2, TimeSpan.FromSeconds(10)).Cast<Terminated>();
terminated.Select(x => x.ActorRef).Should().BeEquivalentTo(replicator, durableStore);
var terminated1 = ExpectMsg<Terminated>(TimeSpan.FromSeconds(10));
var terminated2 = ExpectMsg<Terminated>(TimeSpan.FromSeconds(10));
ImmutableHashSet.Create(terminated1.ActorRef, terminated2.ActorRef).Should().BeEquivalentTo(durableStore, replicator);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, that should make this test less racy


// The replicator should not have been recreated, so expect ActorNotFound
await Assert.ThrowsAsync<ActorNotFoundException>(() =>
_sys1.ActorSelection(replicatorActorPath).ResolveOne(TimeSpan.FromSeconds(5)));
}
}

public class FakeDurableStore : ReceiveActor
{
public FakeDurableStore(Config config)
Expand All @@ -199,7 +200,7 @@ public FakeDurableStore(Config config)
Receive<LoadAll>( load=> { Sender.Tell(LoadAllCompleted.Instance); });
Receive<InitFail>( init => { throw new LoadFailedException("failed to load durable distributed-data"); });
}

}

public class InitFail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public ReplicatorMessageSerializerSpec(ITestOutputHelper output) : base(BaseConf
_serializer = new ReplicatorMessageSerializer((ExtendedActorSystem)Sys);

// We dont have Artery implementation
// _protocol = ((RemoteActorRefProvider) ((ExtendedActorSystem)Sys).Provider).RemoteSettings.Artery.Enabled
// _protocol = ((RemoteActorRefProvider) ((ExtendedActorSystem)Sys).Provider).RemoteSettings.Artery.Enabled
_protocol = "akka.tcp";

_address1 = new UniqueAddress(new Address("akka.tcp", Sys.Name, "some.host.org", 4711), 1);
Expand Down Expand Up @@ -78,6 +78,8 @@ public void ReplicatorMessageSerializer_should_serialize_Replicator_message()
.Should().Throw<ArgumentOutOfRangeException>("Our protobuf protocol does not support timeouts larger than unsigned ints")
.Which.Message.Contains("unsigned int");

CheckSerialization(new Get(_keyA, new ReadMajorityPlus(TimeSpan.FromSeconds(2), 3), "x"));
CheckSerialization(new Get(_keyA, new ReadMajorityPlus(TimeSpan.FromSeconds(2), 3, 5), "x"));
CheckSerialization(new GetSuccess(_keyA, null, data1));
CheckSerialization(new GetSuccess(_keyA, "x", data1));
CheckSerialization(new NotFound(_keyA, "x"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ internal class TestWriteAggregator<T> : WriteAggregator where T : IReplicatedDat

public TestWriteAggregator(
IKey<T> key,
T data,
T data,
Delta delta,
IWriteConsistency consistency,
IWriteConsistency consistency,
IImmutableDictionary<Address, IActorRef> probes,
IImmutableSet<Address> nodes,
IImmutableList<Address> nodes,
IImmutableSet<Address> unreachable,
IActorRef replyTo,
bool durable)
: base(key, new DataEnvelope(data), delta, consistency, null, nodes, unreachable, replyTo, durable)
bool durable)
: base(key, new DataEnvelope(data), delta, consistency, null, nodes, unreachable, false, replyTo, durable)
{
_probes = probes;
}
Expand Down Expand Up @@ -64,7 +64,7 @@ public WriteAckAdapter(IActorRef replica)
private static Props TestWriteAggregatorProps(GSet<string> data,
IWriteConsistency consistency,
IImmutableDictionary<Address, IActorRef> probes,
IImmutableSet<Address> nodes,
IImmutableList<Address> nodes,
IImmutableSet<Address> unreachable,
IActorRef replyTo,
bool durable) => Actor.Props.Create(() => new TestWriteAggregator<GSet<string>>(KeyA, data, null, consistency, probes, nodes, unreachable, replyTo, durable));
Expand All @@ -73,7 +73,7 @@ public WriteAckAdapter(IActorRef replica)
Delta delta,
IWriteConsistency consistency,
IImmutableDictionary<Address, IActorRef> probes,
IImmutableSet<Address> nodes,
IImmutableList<Address> nodes,
IImmutableSet<Address> unreachable,
IActorRef replyTo,
bool durable) => Actor.Props.Create(() => new TestWriteAggregator<ORSet<string>>(KeyB, data, delta, consistency, probes, nodes, unreachable, replyTo, durable));
Expand All @@ -85,7 +85,7 @@ public WriteAckAdapter(IActorRef replica)
private readonly Address _nodeB = new Address("akka.tcp", "Sys", "b", 2552);
private readonly Address _nodeC = new Address("akka.tcp", "Sys", "c", 2552);
private readonly Address _nodeD = new Address("akka.tcp", "Sys", "d", 2552);
private readonly IImmutableSet<Address> _nodes;
private readonly IImmutableList<Address> _nodes;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the change from set to list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the base class ReadWriteAggregator changes of nodes from IImmutableSet to IImmutableList to keep the order of nodes by oldest for the PreferOldest scenario.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thank you


private readonly GSet<string> _data = GSet.Create("A", "B");
private readonly WriteTo _writeThree = new WriteTo(3, TimeSpan.FromSeconds(3));
Expand All @@ -104,7 +104,7 @@ public WriteAckAdapter(IActorRef replica)
map-size = 10MiB
}}"), "WriteAggregatorSpec", output)
{
_nodes = ImmutableHashSet.CreateRange(new[] {_nodeA, _nodeB, _nodeC, _nodeD});
_nodes = ImmutableList.CreateRange(new[] {_nodeA, _nodeB, _nodeC, _nodeD});

var cluster = Akka.Cluster.Cluster.Get(Sys);
_fullState1 = ORSet<string>.Empty.Add(cluster, "a").Add(cluster, "b");
Expand Down Expand Up @@ -169,24 +169,50 @@ public void WriteAggregator_must_timeout_when_less_than_required_ACKs()
Watch(aggregator);
ExpectTerminated(aggregator);
});

}

[Fact]
public void WriteAggregator_must_callculate_majority_with_min_capactiy()
{
var minCap = 5;

ReadWriteAggregator.CalculateMajorityWithMinCapacity(minCap, 3).Should().Be(3);
ReadWriteAggregator.CalculateMajorityWithMinCapacity(minCap, 4).Should().Be(4);
ReadWriteAggregator.CalculateMajorityWithMinCapacity(minCap, 5).Should().Be(5);
ReadWriteAggregator.CalculateMajorityWithMinCapacity(minCap, 6).Should().Be(5);
ReadWriteAggregator.CalculateMajorityWithMinCapacity(minCap, 7).Should().Be(5);
ReadWriteAggregator.CalculateMajorityWithMinCapacity(minCap, 8).Should().Be(5);
ReadWriteAggregator.CalculateMajorityWithMinCapacity(minCap, 9).Should().Be(5);
ReadWriteAggregator.CalculateMajorityWithMinCapacity(minCap, 10).Should().Be(6);
ReadWriteAggregator.CalculateMajorityWithMinCapacity(minCap, 11).Should().Be(6);
ReadWriteAggregator.CalculateMajorityWithMinCapacity(minCap, 12).Should().Be(7);
ReadWriteAggregator.CalculateMajority(minCap, 3, 0).Should().Be(3);
ReadWriteAggregator.CalculateMajority(minCap, 4, 0).Should().Be(4);
ReadWriteAggregator.CalculateMajority(minCap, 5, 0).Should().Be(5);
ReadWriteAggregator.CalculateMajority(minCap, 6, 0).Should().Be(5);
ReadWriteAggregator.CalculateMajority(minCap, 7, 0).Should().Be(5);
ReadWriteAggregator.CalculateMajority(minCap, 8, 0).Should().Be(5);
ReadWriteAggregator.CalculateMajority(minCap, 9, 0).Should().Be(5);
ReadWriteAggregator.CalculateMajority(minCap, 10, 0).Should().Be(6);
ReadWriteAggregator.CalculateMajority(minCap, 11, 0).Should().Be(6);
ReadWriteAggregator.CalculateMajority(minCap, 12, 0).Should().Be(7);
}

[Fact]
public void WriteAggregator_must_callculate_majority_with_additional()
{
ReadWriteAggregator.CalculateMajority(0, 3, 1).Should().Be(3);
ReadWriteAggregator.CalculateMajority(0, 3, 2).Should().Be(3);
ReadWriteAggregator.CalculateMajority(0, 4, 1).Should().Be(4);
ReadWriteAggregator.CalculateMajority(0, 5, 1).Should().Be(4);
ReadWriteAggregator.CalculateMajority(0, 5, 2).Should().Be(5);
ReadWriteAggregator.CalculateMajority(0, 6, 1).Should().Be(5);
ReadWriteAggregator.CalculateMajority(0, 7, 1).Should().Be(5);
ReadWriteAggregator.CalculateMajority(0, 8, 1).Should().Be(6);
ReadWriteAggregator.CalculateMajority(0, 8, 2).Should().Be(7);
ReadWriteAggregator.CalculateMajority(0, 9, 1).Should().Be(6);
ReadWriteAggregator.CalculateMajority(0, 10, 1).Should().Be(7);
ReadWriteAggregator.CalculateMajority(0, 11, 1).Should().Be(7);
ReadWriteAggregator.CalculateMajority(0, 11, 3).Should().Be(9);
}

[Fact]
public void WriteAggregator_must_callculate_majority_with_additional_and_min_capactiy()
{
ReadWriteAggregator.CalculateMajority(5, 9, 1).Should().Be(6);
ReadWriteAggregator.CalculateMajority(7, 9, 1).Should().Be(7);
ReadWriteAggregator.CalculateMajority(10, 9, 1).Should().Be(9);
}

[Fact]
Expand Down Expand Up @@ -274,7 +300,7 @@ public void Durable_WriteAggregator_must_not_reply_before_local_confirmation()
var probe = CreateTestProbe();
var aggregator = Sys.ActorOf(TestWriteAggregatorProps(_data, _writeThree, Probes(probe.Ref), _nodes, ImmutableHashSet<Address>.Empty, TestActor, true));
Watch(aggregator);

probe.ExpectMsg<Write>();
probe.LastSender.Tell(WriteAck.Instance);
probe.ExpectMsg<Write>();
Expand All @@ -294,7 +320,7 @@ public void Durable_WriteAggregator_must_tolerate_WriteNack_if_enough_WriteAck()
var probe = CreateTestProbe();
var aggregator = Sys.ActorOf(TestWriteAggregatorProps(_data, _writeThree, Probes(probe.Ref), _nodes, ImmutableHashSet<Address>.Empty, TestActor, true));
Watch(aggregator);

aggregator.Tell(new UpdateSuccess(KeyA, null));
probe.ExpectMsg<Write>();
probe.LastSender.Tell(WriteAck.Instance);
Expand Down Expand Up @@ -353,7 +379,7 @@ public void Durable_WriteAggregator_must_timeout_when_less_than_required_ACKs()
}

private IImmutableDictionary<Address, IActorRef> Probes(IActorRef probe) =>
_nodes.Select(address =>
_nodes.Select(address =>
new KeyValuePair<Address, IActorRef>(
address,
Sys.ActorOf(Props.Create(() => new WriteAckAdapter(probe)))))
Expand Down