Skip to content

Commit

Permalink
Change ContextClassLoader to NarClassLoader in BrokerIntercep…
Browse files Browse the repository at this point in the history
…tor (#13589)

It's ``BrokerInterceptor`` side change, like #13501

Change context class loader through Thread.currentThread().setContextClassLoader(classLoader) before every interceptor handler callback, and change it back to original class loader afterwards.

(cherry picked from commit afc241f)
  • Loading branch information
mattisonchao authored and codelipenghui committed Jan 18, 2022
1 parent d22798a commit ebec861
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.ClassLoaderSwitcher;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
Expand All @@ -50,39 +51,53 @@ public void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata) {
this.interceptor.beforeSendMessage(
subscription, entry, ackSet, msgMetadata);
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
this.interceptor.beforeSendMessage(
subscription, entry, ackSet, msgMetadata);
}
}

@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
this.interceptor.onPulsarCommand(command, cnx);
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
this.interceptor.onPulsarCommand(command, cnx);
}
}

@Override
public void onConnectionClosed(ServerCnx cnx) {
this.interceptor.onConnectionClosed(cnx);
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
this.interceptor.onConnectionClosed(cnx);
}
}

@Override
public void onWebserviceRequest(ServletRequest request) throws IOException, ServletException, InterceptException {
this.interceptor.onWebserviceRequest(request);
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
this.interceptor.onWebserviceRequest(request);
}
}

@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse response)
throws IOException, ServletException {
this.interceptor.onWebserviceResponse(request, response);
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
this.interceptor.onWebserviceResponse(request, response);
}
}

@Override
public void initialize(PulsarService pulsarService) throws Exception {
this.interceptor.initialize(pulsarService);
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
this.interceptor.initialize(pulsarService);
}
}

@Override
public void close() {
interceptor.close();
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
interceptor.close();
}
try {
classLoader.close();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,30 @@
*/
package org.apache.pulsar.broker.intercept;

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.testng.annotations.Test;
import javax.servlet.FilterChain;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import java.util.Map;

/**
* Unit test {@link BrokerInterceptorWithClassLoader}.
Expand All @@ -44,4 +60,127 @@ public void testWrapper() throws Exception {
verify(h, times(1)).initialize(same(pulsarService));
}


@Test
public void testClassLoaderSwitcher() throws Exception {
NarClassLoader narLoader = mock(NarClassLoader.class);
BrokerInterceptor interceptor = new BrokerInterceptor() {
@Override
public void beforeSendMessage(Subscription subscription, Entry entry, long[] ackSet, MessageMetadata msgMetadata) {
assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
}
@Override
public void onConnectionCreated(ServerCnx cnx) {
assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
}
@Override
public void producerCreated(ServerCnx cnx, Producer producer, Map<String, String> metadata) {
assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
}
@Override
public void consumerCreated(ServerCnx cnx, Consumer consumer, Map<String, String> metadata) {
assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
}
@Override
public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs,
long ledgerId, long entryId, Topic.PublishContext publishContext) {
assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
}
@Override
public void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,
long entryId, ByteBuf headersAndPayload) {
assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
}
@Override
public void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck ackCmd) {
assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
}
@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
}
@Override
public void onConnectionClosed(ServerCnx cnx) {
assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
}
@Override
public void onWebserviceRequest(ServletRequest request) {
assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
}
@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse response) {
assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
}
@Override
public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain) {
assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
}
@Override
public void initialize(PulsarService pulsarService) throws Exception {
assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
}
@Override
public void close() {
assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
}
};

BrokerInterceptorWithClassLoader brokerInterceptorWithClassLoader =
new BrokerInterceptorWithClassLoader(interceptor, narLoader);
ClassLoader curClassLoader = Thread.currentThread().getContextClassLoader();
// test class loader
assertEquals(brokerInterceptorWithClassLoader.getClassLoader(), narLoader);
// test initialize
brokerInterceptorWithClassLoader.initialize(mock(PulsarService.class));
assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
// test onFilter
brokerInterceptorWithClassLoader.onFilter(mock(ServletRequest.class)
, mock(ServletResponse.class), mock(FilterChain.class));
assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
// test onWebserviceResponse
brokerInterceptorWithClassLoader.onWebserviceResponse(mock(ServletRequest.class)
, mock(ServletResponse.class));
assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
// test onWebserviceRequest
brokerInterceptorWithClassLoader.onWebserviceRequest(mock(ServletRequest.class));
assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
// test onConnectionClosed
brokerInterceptorWithClassLoader.onConnectionClosed(mock(ServerCnx.class));
assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
// test onPulsarCommand
brokerInterceptorWithClassLoader.onPulsarCommand(null, mock(ServerCnx.class));
assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
// test messageAcked
brokerInterceptorWithClassLoader
.messageAcked(mock(ServerCnx.class), mock(Consumer.class), null);
assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
// test messageDispatched
brokerInterceptorWithClassLoader
.messageDispatched(mock(ServerCnx.class), mock(Consumer.class), 1, 1, null);
assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
// test messageProduced
brokerInterceptorWithClassLoader
.messageProduced(mock(ServerCnx.class), mock(Producer.class), 1, 1, 1, null);
assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
// test consumerCreated
brokerInterceptorWithClassLoader
.consumerCreated(mock(ServerCnx.class), mock(Consumer.class), Maps.newHashMap());
assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
// test producerCreated
brokerInterceptorWithClassLoader
.producerCreated(mock(ServerCnx.class), mock(Producer.class), Maps.newHashMap());
assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
// test onConnectionCreated
brokerInterceptorWithClassLoader
.onConnectionCreated(mock(ServerCnx.class));
assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
// test beforeSendMessage
brokerInterceptorWithClassLoader
.beforeSendMessage(mock(Subscription.class), mock(Entry.class), null, null);
assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
// test close
brokerInterceptorWithClassLoader.close();
assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);

}
}

0 comments on commit ebec861

Please sign in to comment.