Skip to content

Commit

Permalink
Clear Members Before Failover Over to New Cluster [API-2217] (#891)
Browse files Browse the repository at this point in the history
* Clear members during cluster change.

* Review changes.
  • Loading branch information
emreyigit committed May 15, 2024
1 parent 5b14a35 commit 9e3151e
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 27 deletions.
53 changes: 41 additions & 12 deletions src/Hazelcast.Net.Tests/Clustering/FailoverTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Hazelcast.Clustering;
using Hazelcast.Configuration;
Expand Down Expand Up @@ -207,10 +208,7 @@ public void TestClusterOptionsRotate()

var failover = new Failover(clusterState, options);

failover.ClusterChanged += _ =>
{
clusterChangedCount++;
};
failover.ClusterChanged += _ => { clusterChangedCount++; };

void AssertCluster1(Failover fo)
{
Expand Down Expand Up @@ -318,12 +316,41 @@ public async Task TestClientCanFailover(bool smartRouting, int memberCount)
.WithHConsoleLogger()
.Build();

var latchExistMembers = new ManualResetEvent(false);
var latchConnectMembers = new ManualResetEvent(false);
var latchMembersUpdated = new ManualResetEvent(false);

HConsole.WriteLine(this, "Start members of clusters 0 and 1");
var members0 = await StartMembersAsync(_cluster0, memberCount);
var members1 = await StartMembersAsync(_cluster1, memberCount);

HConsole.WriteLine(this, "Start failover client");
await using var client = await HazelcastClientFactory.StartNewFailoverClientAsync(failoverOptions);

var clientImp = (HazelcastClient) client;
clientImp.Cluster.State.Failover.ClusterChanged += _ =>
{
// During cluster changed event, members are cleared
if (clientImp.Cluster.Members.GetMembers().Any()) latchExistMembers.Set();
// Member table should be filled but we cant use them to connect due to the failover.
if (!clientImp.Cluster.Members.GetMembersForConnection().Any()) latchConnectMembers.Set();
};

await client.SubscribeAsync(events =>
events.MembersUpdated((sender, args) =>
{
// After cluster changed, member events should be triggered properly.
if (latchConnectMembers.WaitOne(0)
&& latchExistMembers.WaitOne(0)
&& args.RemovedMembers.Count > 0
&& args.AddedMembers.Count > 0)
{
latchMembersUpdated.Set();
}
}));

var mapName = CreateUniqueName();
var map = await client.GetMapAsync<string, string>(mapName);
Assert.IsNotNull(map);
Expand Down Expand Up @@ -361,6 +388,9 @@ public async Task TestClientCanFailover(bool smartRouting, int memberCount)
await AssertStateEventually(states, ClientState.ClusterChanged);
await AssertStateEventually(states, ClientState.Connected);
await AssertCluster(client, _cluster0.Id, map);
Assert.True(latchExistMembers.WaitOne(1_000));
Assert.True(latchConnectMembers.WaitOne(1_000));
Assert.True(latchMembersUpdated.WaitOne(1_000));
}

[TestCase(true, 1)]
Expand Down Expand Up @@ -459,7 +489,6 @@ public async Task TestClientCanFailoverFirstClusterNotUp(bool smartRouting, int
}

[Test]

public async Task TestClientThrowExceptionOnFailover()
{
HConsole.Configure(options => options
Expand Down Expand Up @@ -615,7 +644,7 @@ public async Task TestClientCannotFailoverToDifferentPartitionCount()
// and are not simply still trying to reconnect to the original cluster 0 before
// failover - we test this because the ClusterChanged event *must* trigger here
HConsole.WriteLine(this, "Wait...");
await Task.Delay((int)(failoverOptions.Clients[0].Networking.ConnectionRetry.ClusterConnectionTimeoutMilliseconds + 2_000));
await Task.Delay((int) (failoverOptions.Clients[0].Networking.ConnectionRetry.ClusterConnectionTimeoutMilliseconds + 2_000));

// start cluster 0 members again
HConsole.WriteLine(this, "Start members of Cluster 0");
Expand Down Expand Up @@ -706,9 +735,9 @@ public async Task TestClientRetryCurrentClusterBeforeFailover()
// cluster 1 is live - but we should first try to connect again to cluster 0
// within o.Networking.ConnectionRetry.ClusterConnectionTimeoutMilliseconds
Assert.AreEqual(_cluster0.Id, client.ClusterName);
Assert.True(client.Members.Any(x=>
x.Member.Address.Host==members0[0].Host &&
x.Member.Address.Port==members0[0].Port)
Assert.True(client.Members.Any(x =>
x.Member.Address.Host == members0[0].Host &&
x.Member.Address.Port == members0[0].Port)
);

// start cluster 0 members again
Expand Down Expand Up @@ -736,9 +765,9 @@ public static void AssertState(ConcurrentQueue<ClientState> states, ClientState
public static async Task AssertStateEventually(ConcurrentQueue<ClientState> states, ClientState expectedState, int timeoutMillis = 120_000)
{
var state = (ClientState) (-1);

await AssertEx.SucceedsEventually(
() => { Assert.That(states.TryDequeue(out state)); },
() => { Assert.That(states.TryDequeue(out state)); },
timeoutMillis, 1000,
$"Failed to get state {expectedState}, state did not change");

Expand All @@ -765,4 +794,4 @@ private async Task AssertCluster(IHazelcastClient client, string expectedCluster
// cannot access other value
Assert.IsNull(await map.GetAsync(otherKey));
}
}
}
2 changes: 1 addition & 1 deletion src/Hazelcast.Net/Clustering/ClusterConnections.cs
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ static IEnumerable<NetworkAddress> Distinct(IEnumerable<NetworkAddress> aa, ISet
}

// get known members' addresses
var addresses = _clusterMembers.GetMembers().Select(x => x.ConnectAddress);
var addresses = _clusterMembers.GetMembersForConnection().Select(x => x.ConnectAddress);
foreach (var address in Distinct(addresses, distinct, shuffle))
yield return address;

Expand Down
34 changes: 20 additions & 14 deletions src/Hazelcast.Net/Clustering/ClusterMembers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace Hazelcast.Clustering
internal class ClusterMembers : IAsyncDisposable
{
private const int SqlConnectionRandomAttempts = 10;

private const int InvalidMemberTableVersion = -1;
private readonly object _mutex = new();
private readonly ClusterState _clusterState;
private readonly ILogger _logger;
Expand Down Expand Up @@ -80,6 +80,14 @@ public ClusterMembers(ClusterState clusterState, TerminateConnections terminateC
lock (_mutex) return _members.ContainsMember(x);
}, _clusterState.LoggerFactory);
}

_clusterState.Failover.ClusterChanged += cluster =>
{
// Invalidate current member table. Cannot remove the tables due to the
// members updated event. It should be occur on AddConnection method.
lock (_mutex)
_members = new MemberTable(InvalidMemberTableVersion, _members.Members);
};
}

// NOTES
Expand Down Expand Up @@ -198,15 +206,6 @@ public void AddConnection(MemberConnection connection, bool isNewCluster)
// add the connection
_connections[connection.MemberId] = connection;

if (isNewCluster)
{
// reset members
// this is safe because... isNewCluster means that this is the very first connection and there are
// no other connections yet and therefore we should not receive events and therefore no one
// should invoke SetMembers.
_members = new MemberTable();
}

// if this is a true member connection
if (_members.TryGetMember(connection.MemberId, out var member) && IsMemberAddress(member, connection.Address))
{
Expand Down Expand Up @@ -370,6 +369,7 @@ private void LogDiffs(MemberTable table, Dictionary<MemberInfo, int> diff)
msg.Append(status);
msg.AppendLine();
}

msg.Append('}');

//Print only if there is a change
Expand Down Expand Up @@ -793,10 +793,11 @@ public MemberInfo GetMemberForSql()
/// <returns>The oldest active connection, or <c>null</c> if no connection is active.</returns>
public MemberConnection GetOldestConnection()
{
lock (_mutex) return _connections.Values
.Where(x => x.Active)
.OrderBy(x => x.ConnectTime)
.FirstOrDefault();
lock (_mutex)
return _connections.Values
.Where(x => x.Active)
.OrderBy(x => x.ConnectTime)
.FirstOrDefault();
}

/// <summary>
Expand Down Expand Up @@ -835,6 +836,11 @@ public IEnumerable<MemberInfo> GetMembers(bool liteOnly = false)
return liteOnly ? members.Where(x => x.IsLiteMember).ToList() : members;
}

public IEnumerable<MemberInfo> GetMembersForConnection()
{
return _members.Version == InvalidMemberTableVersion ? Enumerable.Empty<MemberInfo>() : GetMembers();
}

/// <summary>
/// Gets information about a member.
/// </summary>
Expand Down

0 comments on commit 9e3151e

Please sign in to comment.