Skip to content

Commit

Permalink
[fix][flaky-test] Fix ClassCastException: BrokerService cannot be cas…
Browse files Browse the repository at this point in the history
…t to class PulsarResources (#16821)

(cherry picked from commit 4ee3466)
  • Loading branch information
poorbarcode authored and congbobo184 committed Nov 10, 2022
1 parent 0c6f899 commit b4ce1b7
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker;

import static org.mockito.Mockito.mock;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

@Slf4j
public class PulsarServiceMockSupport {

/**
* see: https://github.com/apache/pulsar/pull/16821.
* While executing "doReturn(pulsarResources).when(pulsar).getPulsarResources()", Meta Store Thread also accesses
* variable PulsarService.getPulsarResources() asynchronously in logic: "notification by zk-watcher".
* So execute mock-cmd in meta-thread (The executor of MetaStore is a single thread pool executor, so all things
* will be thread safety).
* Note: If the MetaStore's executor is no longer single-threaded, should mock as single-threaded if you need to
* execute this method.
*/
public static void mockPulsarServiceProps(final PulsarService pulsarService, Runnable mockTask)
throws ExecutionException, InterruptedException, TimeoutException {
final CompletableFuture<Void> mockGetPulsarResourceFuture = new CompletableFuture<>();
MetadataStoreExtended metadataStoreExtended = pulsarService.getLocalMetadataStore();
if (metadataStoreExtended instanceof AbstractMetadataStore){
AbstractMetadataStore abstractMetadataStore = (AbstractMetadataStore) metadataStoreExtended;
abstractMetadataStore.execute(() -> {
mockTask.run();
mockGetPulsarResourceFuture.complete(null);
}, mock(CompletableFuture.class));
try {
mockGetPulsarResourceFuture.get(1, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException){
mockTask.run();
}
} else {
mockTask.run();
}
}

@Test
public void testMockMetaStore() throws Exception{
AtomicInteger integer = new AtomicInteger();
PulsarService pulsarService = Mockito.mock(PulsarService.class);
Mockito.when(pulsarService.getLocalMetadataStore()).thenReturn(Mockito.mock(ZKMetadataStore.class));
mockPulsarServiceProps(pulsarService, () -> integer.incrementAndGet());
Assert.assertEquals(integer.get(), 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
Expand Down Expand Up @@ -92,7 +93,9 @@ public void setup() throws Exception {
doReturn(store).when(pulsar).getConfigurationMetadataStore();

PulsarResources pulsarResources = new PulsarResources(store, store);
doReturn(pulsarResources).when(pulsar).getPulsarResources();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(pulsarResources).when(pulsar).getPulsarResources();
});

serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
doReturn(true).when(serverCnx).isActive();
Expand All @@ -105,7 +108,9 @@ public void setup() throws Exception {

eventLoopGroup = new NioEventLoopGroup();
brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup);
doReturn(brokerService).when(pulsar).getBrokerService();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(brokerService).when(pulsar).getBrokerService();
});

String topicName = TopicName.get("MessageCumulativeAckTest").toString();
PersistentTopic persistentTopic = new PersistentTopic(topicName, mock(ManagedLedger.class), brokerService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.matches;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
Expand All @@ -47,7 +45,6 @@
import java.util.function.Supplier;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand All @@ -69,6 +66,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.PulsarResources;
Expand All @@ -85,13 +83,9 @@
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -128,24 +122,32 @@ public void setup() throws Exception {
ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
svcConfig.setBrokerShutdownTimeoutMs(0L);
pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig);
doReturn(svcConfig).when(pulsar).getConfiguration();
store = MetadataStoreFactory.create("memory://local", MetadataStoreConfig.builder().build());
doReturn(store).when(pulsar).getLocalMetadataStore();
doReturn(store).when(pulsar).getConfigurationMetadataStore();

PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(svcConfig).when(pulsar).getConfiguration();
});

mlFactoryMock = mock(ManagedLedgerFactory.class);
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
});

doReturn(TransactionTestBase.createMockBookKeeper(executor))
.when(pulsar).getBookKeeperClient();
eventLoopGroup = new NioEventLoopGroup();

store = MetadataStoreFactory.create("memory://local", MetadataStoreConfig.builder().build());
doReturn(store).when(pulsar).getLocalMetadataStore();
doReturn(store).when(pulsar).getConfigurationMetadataStore();

PulsarResources pulsarResources = new PulsarResources(store, store);
doReturn(pulsarResources).when(pulsar).getPulsarResources();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(pulsarResources).when(pulsar).getPulsarResources();
});

brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup);
doReturn(brokerService).when(pulsar).getBrokerService();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(brokerService).when(pulsar).getBrokerService();
});

consumerChanges = new LinkedBlockingQueue<>();
this.channelCtx = mock(ChannelHandlerContext.class);
Expand Down Expand Up @@ -191,7 +193,9 @@ public void setup() throws Exception {
.when(serverCnxWithOldVersion).getCommandSender();

NamespaceService nsSvc = mock(NamespaceService.class);
doReturn(nsSvc).when(pulsar).getNamespaceService();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(nsSvc).when(pulsar).getNamespaceService();
});
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any(TopicName.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources;
Expand Down Expand Up @@ -193,27 +194,29 @@ public void setup() throws Exception {
deleteLedgerCallback.deleteLedgerComplete(null);
return null;
}).when(mlFactoryMock).asyncDelete(any(), any(), any());

// Mock metaStore.
ZooKeeper mockZk = createMockZooKeeper();
doReturn(createMockBookKeeper(executor))
.when(pulsar).getBookKeeperClient();

doReturn(executor).when(pulsar).getOrderedExecutor();

store = new ZKMetadataStore(mockZk);
doReturn(store).when(pulsar).getLocalMetadataStore();
doReturn(store).when(pulsar).getConfigurationMetadataStore();
// Mock pulsarResources.
PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
NamespaceResources nsr = spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
TopicResources tsr = spyWithClassAndConstructorArgs(TopicResources.class, store);
doReturn(nsr).when(pulsarResources).getNamespaceResources();
doReturn(tsr).when(pulsarResources).getTopicResources();
doReturn(pulsarResources).when(pulsar).getPulsarResources();

doReturn(store).when(pulsar).getLocalMetadataStore();
doReturn(store).when(pulsar).getConfigurationMetadataStore();

PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(pulsarResources).when(pulsar).getPulsarResources();
});
// Mock brokerService.
brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup);
doReturn(brokerService).when(pulsar).getBrokerService();

PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(brokerService).when(pulsar).getBrokerService();
});
// Mock serviceCnx.
serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
Expand All @@ -229,7 +232,9 @@ public void setup() throws Exception {
NamespaceService nsSvc = mock(NamespaceService.class);
NamespaceBundle bundle = mock(NamespaceBundle.class);
doReturn(CompletableFuture.completedFuture(bundle)).when(nsSvc).getBundleAsync(any());
doReturn(nsSvc).when(pulsar).getNamespaceService();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(nsSvc).when(pulsar).getNamespaceService();
});
doReturn(true).when(nsSvc).isServiceUnitOwned(any());
doReturn(true).when(nsSvc).isServiceUnitActive(any());
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
Expand Down Expand Up @@ -116,7 +117,10 @@ public void beforeMethod() throws Exception {
doReturn(new DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();

doReturn(svcConfig).when(pulsar).getConfiguration();
doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();
});


ManagedLedgerFactory mlFactoryMock = mock(ManagedLedgerFactory.class);
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
Expand All @@ -132,7 +136,9 @@ public void beforeMethod() throws Exception {
doReturn(store).when(pulsar).getConfigurationMetadataStore();

pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
doReturn(pulsarResources).when(pulsar).getPulsarResources();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(pulsarResources).when(pulsar).getPulsarResources();
});
NamespaceResources namespaceResources =
spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
doReturn(namespaceResources).when(pulsarResources).getNamespaceResources();
Expand All @@ -146,8 +152,10 @@ public void beforeMethod() throws Exception {
brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup);
BrokerInterceptor interceptor = mock(BrokerInterceptor.class);
doReturn(interceptor).when(brokerService).getInterceptor();
doReturn(brokerService).when(pulsar).getBrokerService();
doReturn(executor).when(pulsar).getOrderedExecutor();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(brokerService).when(pulsar).getBrokerService();
doReturn(executor).when(pulsar).getOrderedExecutor();
});
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ public void invalidateAll() {
/**
* Run the task in the executor thread and fail the future if the executor is shutting down
*/
protected void execute(Runnable task, CompletableFuture<?> future) {
@VisibleForTesting
public void execute(Runnable task, CompletableFuture<?> future) {
try {
executor.execute(task);
} catch (Throwable t) {
Expand Down

0 comments on commit b4ce1b7

Please sign in to comment.