Skip to content

Commit

Permalink
[CURATOR-653] Proposed changes based on PR #398 (#436)
Browse files Browse the repository at this point in the history
Co-authored-by: shixiaoxiao <shixiaoxiao@shixiaoxiao1deMacBook-Pro.local>
Co-authored-by: tison <wander4096@gmail.com>
  • Loading branch information
3 people committed Oct 18, 2022
1 parent 4b96f18 commit 9b3a145
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -540,10 +540,17 @@ public String getLastPathIsLeader()
@VisibleForTesting
volatile CountDownLatch debugResetWaitLatch = null;

@VisibleForTesting
volatile CountDownLatch debugResetWaitBeforeNodeDeleteLatch = null;

@VisibleForTesting
void reset() throws Exception
{
setLeadership(false);
if ( debugResetWaitBeforeNodeDeleteLatch != null )
{
debugResetWaitBeforeNodeDeleteLatch.await();
}
setNode(null);

BackgroundCallback callback = new BackgroundCallback()
Expand Down Expand Up @@ -623,6 +630,7 @@ else if ( ourIndex == 0 )
}
else
{
setLeadership(false);
String watchPath = sortedChildren.get(ourIndex - 1);
Watcher watcher = new Watcher()
{
Expand Down Expand Up @@ -726,7 +734,6 @@ protected void handleStateChange(ConnectionState newState)
private synchronized void setLeadership(boolean newValue)
{
boolean oldValue = hasLeadership.getAndSet(newValue);

if ( oldValue && !newValue )
{ // Lost leadership, was true, now false
listeners.forEach(LeaderLatchListener::notLeader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.ForkJoinPool;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.TestCleanState;
Expand Down Expand Up @@ -220,6 +224,153 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception
}
}

@Test
public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Exception
{
final String latchPath = "/test";
final Timing2 timing = new Timing2();
final BlockingQueue<TestEvent> events = Queues.newLinkedBlockingQueue();

final List<Closeable> closeableResources = new ArrayList<>();
try
{
final String id0 = "id0";
final CuratorFramework client0 = createAndStartClient(server.getConnectString(), timing, id0, events);
closeableResources.add(client0);
final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, id0, events);
closeableResources.add(latch0);

assertEquals(new TestEvent(id0, TestEventType.GAINED_CONNECTION), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
assertEquals(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));

final String id1 = "id1";
final CuratorFramework client1 = createAndStartClient(server.getConnectString(), timing, id1, events);
closeableResources.add(client1);
final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, id1, events);
closeableResources.add(latch1);

assertEquals(new TestEvent(id1, TestEventType.GAINED_CONNECTION), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));

// wait for the non-leading LeaderLatch (i.e. latch1) instance to be done with its creation
// this call is time-consuming but necessary because we don't have a handle to detect the end of the reset call
timing.forWaiting().sleepABit();

assertTrue(latch0.hasLeadership());
assertFalse(latch1.hasLeadership());

latch1.debugResetWaitBeforeNodeDeleteLatch = new CountDownLatch(1);
latch1.debugResetWaitLatch = new CountDownLatch(1);
latch0.debugResetWaitLatch = new CountDownLatch(1);

// force latch0 and latch1 reset to trigger the actual test
latch0.reset();
// latch1 needs to be called within a separate thread since it's going to be blocked by the CountDownLatch outside an async call
ForkJoinPool.commonPool().submit(() -> {
latch1.reset();
return null;
});

// latch0.reset() will result in it losing its leadership, deleting its old child node and creating a new child node before being blocked by its debugResetWaitLatch
assertEquals(new TestEvent(id0, TestEventType.LOST_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
// latch1.reset() is blocked but latch1 will gain leadership due its node watching latch0's node to be deleted
assertEquals(new TestEvent(id1, TestEventType.GAINED_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));

assertFalse(latch0.hasLeadership());
assertTrue(latch1.hasLeadership());

// latch0.reset() continues with the getChildren call, finds itself not being the leader and starts listening to the node created by latch1
latch0.debugResetWaitLatch.countDown();
timing.sleepABit();

// latch1.reset() continues, deletes its old child node and creates a new child node before being blocked by its debugResetWaitLatch
latch1.debugResetWaitBeforeNodeDeleteLatch.countDown();

// latch0 receives NodeDeleteEvent and then finds itself to be the leader
assertEquals(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
assertTrue(latch0.hasLeadership());

// latch1.reset() continues and finds itself not being the leader
latch1.debugResetWaitLatch.countDown();
// this call is time-consuming but necessary because we don't have a handle to detect the end of the reset call
timing.forWaiting().sleepABit();

assertTrue(latch0.hasLeadership());
assertFalse(latch1.hasLeadership());
}
finally
{
// reverse is necessary for closing the LeaderLatch instances before closing the corresponding client
Collections.reverse(closeableResources);
closeableResources.forEach(CloseableUtils::closeQuietly);
}
}

private static CuratorFramework createAndStartClient(String zkConnectString, Timing2 timing, String id, Collection<TestEvent> events) {
final CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkConnectString)
.connectionTimeoutMs(timing.connection())
.sessionTimeoutMs(timing.session())
.retryPolicy(new RetryOneTime(1))
.connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy())
.build();

client.getConnectionStateListenable().addListener((client1, newState) -> {
if ( newState == ConnectionState.CONNECTED )
{
events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION));
}
});

client.start();

return client;
}

private static LeaderLatch createAndStartLeaderLatch(CuratorFramework client, String latchPath, String id, Collection<TestEvent> events) throws Exception
{
final LeaderLatch latch = new LeaderLatch(client, latchPath, id);
latch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
events.add(new TestEvent(latch.getId(), TestEventType.GAINED_LEADERSHIP));
}

@Override
public void notLeader() {
events.add(new TestEvent(latch.getId(), TestEventType.LOST_LEADERSHIP));
}
});
latch.start();

return latch;
}

private enum TestEventType
{
GAINED_LEADERSHIP,
LOST_LEADERSHIP,
GAINED_CONNECTION;
}

private static class TestEvent {
private final String id;
private final TestEventType eventType;

public TestEvent(String id, TestEventType eventType)
{
this.id = id;
this.eventType = eventType;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TestEvent testEvent = (TestEvent) o;
return Objects.equals(id, testEvent.id) && eventType == testEvent.eventType;
}
}

@Test
public void testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() throws Exception
{
Expand Down

1 comment on commit 9b3a145

@tisonkun
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well..I'd say I should write a better commit title.

Please sign in to comment.