Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
gavingaozhangmin committed Aug 29, 2022
1 parent 3d9e6a5 commit 3603f94
Showing 1 changed file with 48 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.pulsar.broker.service.plugin;

import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
import com.google.common.collect.ImmutableList;
Expand All @@ -35,6 +37,7 @@
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerTestBase;
Expand Down Expand Up @@ -71,6 +74,51 @@ protected void cleanup() throws Exception {
internalCleanup();
}

@Test
public void testOverride() throws Exception {
String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
String subName = "sub";
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
.isAckReceiptEnabled(true)
.subscriptionName(subName).subscribe();

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService()
.getTopicReference(topic).get();

// set topic level entry filters
EntryFilterWithClassLoader mockFilter = mock(EntryFilterWithClassLoader.class);
when(mockFilter.filterEntry(any(Entry.class), any(FilterContext.class))).thenReturn(
EntryFilter.FilterResult.REJECT);
ImmutableMap<String, EntryFilterWithClassLoader> entryFilters = ImmutableMap.of("key", mockFilter);
when(topicRef.getEntryFilters()).thenReturn(entryFilters);


EntryFilterWithClassLoader mockFilter1 = mock(EntryFilterWithClassLoader.class);
when(mockFilter1.filterEntry(any(Entry.class), any(FilterContext.class))).thenReturn(
EntryFilter.FilterResult.ACCEPT);
ImmutableMap<String, EntryFilterWithClassLoader> entryFilters1 = ImmutableMap.of("key", mockFilter1);
when(pulsar.getBrokerService().getEntryFilters()).thenReturn(entryFilters1);

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topic).create();
for (int i = 0; i < 10; i++) {
producer.send("test");
}

int counter = 0;
while (true) {
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
if (message != null) {
counter++;
consumer.acknowledge(message);
} else {
break;
}
}
// All normal messages can be received
assertEquals(10, counter);
}

@Test
public void testFilter() throws Exception {
Map<String, String> map = new HashMap<>();
Expand Down

0 comments on commit 3603f94

Please sign in to comment.