Skip to content

Commit

Permalink
[Broker] Fix async response filter (#11052)
Browse files Browse the repository at this point in the history
### Motivation

Currently, the response filter couldn't process async responses correctlly, the response interceptor may be called before the async response returning.

### Modifications

Add listener when using async request.

### Verifying this change

Add a new test to verify the response interceptor is called after async response returning.

(cherry picked from commit 3c8d210)
  • Loading branch information
gaoran10 authored and codelipenghui committed Jul 7, 2021
1 parent ab00b99 commit 5fa23c7
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.web;

import java.io.IOException;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
Expand Down Expand Up @@ -71,10 +73,45 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
/* connection is already invalidated */
}
}

if (request.isAsyncSupported() && request.isAsyncStarted()) {
request.getAsyncContext().addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent asyncEvent) throws IOException {
handleInterceptor(request, response);
}

@Override
public void onTimeout(AsyncEvent asyncEvent) throws IOException {
LOG.warn("Http request {} async context timeout.", request);
handleInterceptor(request, response);
}

@Override
public void onError(AsyncEvent asyncEvent) throws IOException {
LOG.warn("Http request {} async context error.", request, asyncEvent.getThrowable());
handleInterceptor(request, response);
}

@Override
public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
// nothing to do
}
});
} else {
handleInterceptor(request, response);
}
}

private void handleInterceptor(ServletRequest request, ServletResponse response) {
if (interceptorEnabled
&& !StringUtils.containsIgnoreCase(request.getContentType(), MediaType.MULTIPART_FORM_DATA)
&& !StringUtils.containsIgnoreCase(request.getContentType(), MediaType.APPLICATION_OCTET_STREAM)) {
interceptor.onWebserviceResponse(request, response);
try {
interceptor.onWebserviceResponse(request, response);
} catch (Exception e) {
LOG.error("Failed to handle interceptor on web service response.", e);
}
}
}

Expand All @@ -87,4 +124,5 @@ public void init(FilterConfig arg) throws ServletException {
public void destroy() {
// No state to clean up.
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.apache.pulsar.broker.admin.v3;


/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.web.RestException;

/**
* Async response test.
*/
@Slf4j
@Path("/test")
public class AsyncResponseTest {

@GET
@Path("/asyncGet/{topicName}/{delayMilliseconds}")
public void asyncGet(@Suspended AsyncResponse response,
@PathParam("topicName") String topicName,
@PathParam("delayMilliseconds") long delayMilliseconds) {
new Thread(() -> {
if (delayMilliseconds > 0) {
try {
Thread.sleep(delayMilliseconds);
} catch (InterruptedException e) {
log.error("Failed to handle test method asyncGet.", e);
response.resume(new RestException(e));
}
}
response.resume(Response.noContent().build());
}).start();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;

Expand Down Expand Up @@ -55,7 +58,6 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.tests.TestRetrySupport;
Expand Down Expand Up @@ -96,6 +98,8 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
private SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor;
private OrderedExecutor bkExecutor;

protected boolean enableBrokerInterceptor = false;

public MockedPulsarServiceBaseTest() {
resetConfig();
}
Expand Down Expand Up @@ -300,6 +304,17 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
doReturn(new CounterBrokerInterceptor()).when(pulsar).getBrokerInterceptor();

doAnswer((invocation) -> spy(invocation.callRealMethod())).when(pulsar).newCompactor();
if (enableBrokerInterceptor) {
mockConfigBrokerInterceptors(pulsar);
}
}

private void mockConfigBrokerInterceptors(PulsarService pulsarService) {
ServiceConfiguration configuration = spy(conf);
Set<String> mockBrokerInterceptors = mock(Set.class);
when(mockBrokerInterceptors.isEmpty()).thenReturn(false);
when(configuration.getBrokerInterceptors()).thenReturn(mockBrokerInterceptors);
when(pulsarService.getConfig()).thenReturn(configuration);
}

protected void waitForZooKeeperWatchers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
package org.apache.pulsar.broker.intercept;

import lombok.Cleanup;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand All @@ -32,8 +37,10 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -71,6 +78,7 @@ public void setup() throws Exception {
listenerName2,
new BrokerInterceptorWithClassLoader(listener2, ncl2));
this.listeners = new BrokerInterceptors(this.listenerMap);
this.enableBrokerInterceptor = true;
super.internalSetup();
super.producerBaseSetup();
}
Expand Down Expand Up @@ -138,4 +146,37 @@ public void testBeforeSendMessage() throws PulsarClientException {

assertEquals(((CounterBrokerInterceptor) listener).getBeforeSendCount(), 1);
}

@Test
public void asyncResponseFilterTest() throws Exception {
Assert.assertTrue(pulsar.getBrokerInterceptor() instanceof CounterBrokerInterceptor);
CounterBrokerInterceptor interceptor = (CounterBrokerInterceptor) pulsar.getBrokerInterceptor();
interceptor.clearResponseList();

OkHttpClient client = new OkHttpClient();
String url = "http://127.0.0.1:" + conf.getWebServicePort().get() + "/admin/v3/test/asyncGet/my-topic/1000";
final Request request = new Request.Builder()
.url(url)
.get()
.build();
Call call = client.newCall(request);
CompletableFuture<Response> future = new CompletableFuture<>();
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
future.completeExceptionally(e);
}

@Override
public void onResponse(Call call, Response response) throws IOException {
future.complete(response);
}
});
future.get();
CounterBrokerInterceptor.ResponseEvent responseEvent = interceptor.getResponseList().get(0);
Assert.assertEquals(responseEvent.getRequestUri(), "/admin/v3/test/asyncGet/my-topic/1000");
Assert.assertEquals(responseEvent.getResponseStatus(),
javax.ws.rs.core.Response.noContent().build().getStatus());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,38 @@
*/
package org.apache.pulsar.broker.intercept;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import java.io.IOException;
import org.eclipse.jetty.server.Response;

@Slf4j
public class CounterBrokerInterceptor implements BrokerInterceptor {

int beforeSendCount = 0;
int count = 0;
private List<ResponseEvent> responseList = new ArrayList<>();

@Data
@AllArgsConstructor
public class ResponseEvent {
private String requestUri;
private int responseStatus;
}

@Override
public void beforeSendMessage(Subscription subscription,
Expand Down Expand Up @@ -68,7 +81,11 @@ public void onWebserviceRequest(ServletRequest request) {
@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse response) {
count ++;
log.info("[{}] On [{}] Webservice response", count, ((HttpServletRequest)request).getRequestURL().toString());
log.info("[{}] On [{}] Webservice response {}", count, ((HttpServletRequest)request).getRequestURL().toString(), response);
if (response instanceof Response) {
Response res = (Response) response;
responseList.add(new ResponseEvent(res.getHttpChannel().getRequest().getRequestURI(), res.getStatus()));
}
}

@Override
Expand All @@ -95,4 +112,12 @@ public int getCount() {
public int getBeforeSendCount() {
return beforeSendCount;
}

public void clearResponseList() {
responseList.clear();
}

public List<ResponseEvent> getResponseList() {
return responseList;
}
}

0 comments on commit 5fa23c7

Please sign in to comment.