Skip to content

Commit

Permalink
[fix] [admin] PIP-259: Admin API can not work if uri too large (#19514)
Browse files Browse the repository at this point in the history
Motivation: If automatic topic creation is enabled, producers and consumers can create topics with long names, but once created, such topics cannot be managed by the Admin API, it will respond "414 URI is too large".

Modifications: add a config `pulsar.conf.httpMaxRequestHeaderSize` to make the jetty can accept the request with a longer Uri
  • Loading branch information
poorbarcode committed Apr 11, 2023
1 parent 25dea3d commit b7f7e04
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 5 deletions.
5 changes: 5 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,11 @@ saslJaasServerRoleTokenSignerSecretPath=
# If >0, it will reject all HTTP requests with bodies larged than the configured limit
httpMaxRequestSize=-1

# The maximum size in bytes of the request header. Larger headers will allow for more and/or larger cookies plus larger
# form content encoded in a URL.However, larger headers consume more memory and can make a server more vulnerable to
# denial of service attacks.
httpMaxRequestHeaderSize = 8192

# If true, the broker will reject all HTTP requests using the TRACE and TRACK verbs.
# This setting may be necessary if the broker is deployed into an environment that uses http port
# scanning and flags web servers allowing the TRACE method as insecure.
Expand Down
5 changes: 5 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ maxHttpServerConnections=2048
# Max concurrent web requests
maxConcurrentHttpRequests=1024

# The maximum size in bytes of the request header. Larger headers will allow for more and/or larger cookies plus larger
# form content encoded in a URL.However, larger headers consume more memory and can make a server more vulnerable to
# denial of service attacks.
httpMaxRequestHeaderSize = 8192

## Configure the datasource of basic authenticate, supports the file and Base64 format.
# file:
# basicAuthConf=/path/my/.htpasswd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,17 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
)
private long httpMaxRequestSize = -1;

@FieldContext(
category = CATEGORY_HTTP,
doc = """
The maximum size in bytes of the request header.
Larger headers will allow for more and/or larger cookies plus larger form content encoded in a URL.
However, larger headers consume more memory and can make a server more vulnerable to denial of service
attacks.
"""
)
private int httpMaxRequestHeaderSize = 8 * 1024;

@FieldContext(
category = CATEGORY_HTTP,
doc = "If true, the broker will reject all HTTP requests using the TRACE and TRACK verbs.\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
import org.eclipse.jetty.server.ConnectionLimit;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
Expand Down Expand Up @@ -100,8 +102,10 @@ public WebService(PulsarService pulsar) throws PulsarServerException {
List<ServerConnector> connectors = new ArrayList<>();

Optional<Integer> port = config.getWebServicePort();
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.setRequestHeaderSize(pulsar.getConfig().getHttpMaxRequestHeaderSize());
if (port.isPresent()) {
httpConnector = new ServerConnector(server);
httpConnector = new ServerConnector(server, new HttpConnectionFactory(httpConfig));
httpConnector.setPort(port.get());
httpConnector.setHost(pulsar.getBindAddress());
connectors.add(httpConnector);
Expand Down Expand Up @@ -140,7 +144,7 @@ public WebService(PulsarService pulsar) throws PulsarServerException {
config.getWebServiceTlsProtocols(),
config.getTlsCertRefreshCheckDurationSec());
}
httpsConnector = new ServerConnector(server, sslCtxFactory);
httpsConnector = new ServerConnector(server, sslCtxFactory, new HttpConnectionFactory(httpConfig));
httpsConnector.setPort(tlsPort.get());
httpsConnector.setHost(pulsar.getBindAddress());
connectors.add(httpsConnector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public void testInit() throws Exception {
assertEquals(config.getMaxMessagePublishBufferSizeInMB(), -1);
assertEquals(config.getManagedLedgerDataReadPriority(), "bookkeeper-first");
assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05);
assertEquals(config.getHttpMaxRequestHeaderSize(), 1234);
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties());
assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ brokerServicePort=6650
brokerServicePortTls=6651
webServicePort=8080
webServicePortTls=4443
httpMaxRequestHeaderSize=1234
bindAddress=0.0.0.0
advertisedAddress=
clusterName="test_cluster"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class AdminProxyHandler extends ProxyServlet {

private static final String ORIGINAL_PRINCIPAL_HEADER = "X-Original-Principal";

public static final String INIT_PARAM_REQUEST_BUFFER_SIZE = "requestBufferSize";

private static final Set<String> functionRoutes = new HashSet<>(Arrays.asList(
"/admin/v3/function",
"/admin/v2/function",
Expand Down Expand Up @@ -140,7 +142,7 @@ protected HttpClient createHttpClient() throws ServletException {
}
client.setIdleTimeout(Long.parseLong(value));

value = config.getInitParameter("requestBufferSize");
value = config.getInitParameter(INIT_PARAM_REQUEST_BUFFER_SIZE);
if (value != null) {
client.setRequestBufferSize(Integer.parseInt(value));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,18 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private int httpOutputBufferSize = 32 * 1024;

@FieldContext(
minValue = 1,
category = CATEGORY_HTTP,
doc = """
The maximum size in bytes of the request header.
Larger headers will allow for more and/or larger cookies plus larger form content encoded in a URL.
However, larger headers consume more memory and can make a server more vulnerable to denial of service
attacks.
"""
)
private int httpMaxRequestHeaderSize = 8 * 1024;

@FieldContext(
minValue = 1,
category = CATEGORY_HTTP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.proxy.server;

import static org.apache.pulsar.proxy.server.AdminProxyHandler.INIT_PARAM_REQUEST_BUFFER_SIZE;
import io.prometheus.client.jetty.JettyStatisticsCollector;
import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -93,6 +94,7 @@ public WebServer(ProxyConfiguration config, AuthenticationService authentication

HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.setOutputBufferSize(config.getHttpOutputBufferSize());
httpConfig.setRequestHeaderSize(config.getHttpMaxRequestHeaderSize());

if (config.getWebServicePort().isPresent()) {
this.externalServicePort = config.getWebServicePort().get();
Expand Down Expand Up @@ -131,7 +133,7 @@ public WebServer(ProxyConfiguration config, AuthenticationService authentication
config.getWebServiceTlsProtocols(),
config.getTlsCertRefreshCheckDurationSec());
}
connectorTls = new ServerConnector(server, sslCtxFactory);
connectorTls = new ServerConnector(server, sslCtxFactory, new HttpConnectionFactory(httpConfig));
connectorTls.setPort(config.getWebServicePortTls().get());
connectorTls.setHost(config.getBindAddress());
connectors.add(connectorTls);
Expand Down Expand Up @@ -195,6 +197,8 @@ public void addServlet(String basePath, ServletHolder servletHolder, List<Pair<S

public void addServlet(String basePath, ServletHolder servletHolder,
List<Pair<String, Object>> attributes, boolean requireAuthentication) {
popularServletParams(servletHolder, config);

Optional<String> existingPath = servletPaths.stream().filter(p -> p.startsWith(basePath)).findFirst();
if (existingPath.isPresent()) {
throw new IllegalArgumentException(
Expand All @@ -214,6 +218,19 @@ public void addServlet(String basePath, ServletHolder servletHolder,
handlers.add(context);
}

private static void popularServletParams(ServletHolder servletHolder, ProxyConfiguration config) {
int requestBufferSize = -1;
try {
requestBufferSize = Integer.parseInt(servletHolder.getInitParameter(INIT_PARAM_REQUEST_BUFFER_SIZE));
} catch (NumberFormatException nfe){
log.warn("The init-param {} is invalidated, because it is not a number", INIT_PARAM_REQUEST_BUFFER_SIZE);
}
if (requestBufferSize > 0 || config.getHttpMaxRequestHeaderSize() > 0) {
int v = Math.max(requestBufferSize, config.getHttpMaxRequestHeaderSize());
servletHolder.setInitParameter(INIT_PARAM_REQUEST_BUFFER_SIZE, String.valueOf(v));
}
}

public void addRestResource(String basePath, String attribute, Object attributeValue, Class<?> resourceClass) {
ResourceConfig config = new ResourceConfig();
config.register(resourceClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ protected void setup() throws Exception {
conf.setProxyRoles(ImmutableSet.of("proxy"));
conf.setAuthenticationProviders(ImmutableSet.of(AuthenticationProviderTls.class.getName()));
conf.setNumExecutorThreadPoolSize(5);
conf.setHttpMaxRequestHeaderSize(20000);

super.internalSetup();

Expand All @@ -87,6 +88,7 @@ protected void setup() throws Exception {
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
proxyConfig.setTlsEnabledWithBroker(true);
proxyConfig.setHttpMaxRequestHeaderSize(20000);

// enable tls and auth&auth at proxy
proxyConfig.setTlsCertificateFilePath(getTlsFile("broker.cert"));
Expand Down Expand Up @@ -185,4 +187,30 @@ public void testAuthenticatedProxyAsNonAdmin() throws Exception {
Assert.assertEquals(ImmutableSet.of("tenant1/ns1"), user1Admin.namespaces().getNamespaces("tenant1"));
}
}

@Test
public void testAuthenticatedRequestWithLongUri() throws Exception {
PulsarAdmin user1Admin = getAdminClient("user1");
PulsarAdmin brokerAdmin = getDirectToBrokerAdminClient("admin");
StringBuilder longTenant = new StringBuilder("tenant");
for (int i = 10 * 1024; i > 0; i = i - 4){
longTenant.append("_abc");
}
try {
brokerAdmin.namespaces().getNamespaces(longTenant.toString());
Assert.fail("expect error: Tenant not found");
} catch (Exception ex){
Assert.assertTrue(ex instanceof PulsarAdminException);
PulsarAdminException pulsarAdminException = (PulsarAdminException) ex;
Assert.assertEquals(pulsarAdminException.getStatusCode(), 404);
}
try {
user1Admin.namespaces().getNamespaces(longTenant.toString());
Assert.fail("expect error: Tenant not found");
} catch (Exception ex){
Assert.assertTrue(ex instanceof PulsarAdminException);
PulsarAdminException pulsarAdminException = (PulsarAdminException) ex;
Assert.assertEquals(pulsarAdminException.getStatusCode(), 404);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,15 @@ public void testBackwardCompatibility() throws IOException {
try (PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile)))) {
printWriter.println("zookeeperSessionTimeoutMs=60");
printWriter.println("zooKeeperCacheExpirySeconds=500");
printWriter.println("httpMaxRequestHeaderSize=1234");
}
testConfigFile.deleteOnExit();
InputStream stream = new FileInputStream(testConfigFile);
ProxyConfiguration serviceConfig = PulsarConfigurationLoader.create(stream, ProxyConfiguration.class);
stream.close();
assertEquals(serviceConfig.getMetadataStoreSessionTimeoutMillis(), 60);
assertEquals(serviceConfig.getMetadataStoreCacheExpirySeconds(), 500);
assertEquals(serviceConfig.getHttpMaxRequestHeaderSize(), 1234);

testConfigFile = new File("tmp." + System.currentTimeMillis() + ".properties");
if (testConfigFile.exists()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
Expand All @@ -66,6 +70,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {

private Server backingServer1;
private Server backingServer2;
private Server backingServer3;
private PulsarResources resource;
private Client client = ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class));

Expand All @@ -85,6 +90,15 @@ protected void setup() throws Exception {
backingServer2 = new Server(0);
backingServer2.setHandler(newHandler("server2"));
backingServer2.start();

backingServer3 = new Server();
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.setRequestHeaderSize(20000);
ServerConnector connector = new ServerConnector(backingServer3, new HttpConnectionFactory(httpConfig));
connector.setPort(0);
backingServer3.setConnectors(new Connector[]{connector});
backingServer3.setHandler(newHandler("server3"));
backingServer3.start();
}

private static AbstractHandler newHandler(String text) {
Expand All @@ -96,7 +110,9 @@ public void handle(String target, Request baseRequest,
response.setContentType("text/plain;charset=utf-8");
response.setStatus(HttpServletResponse.SC_OK);
baseRequest.setHandled(true);
response.getWriter().println(String.format("%s,%s", text, request.getRequestURI()));
String uri = request.getRequestURI();
response.getWriter().println(String.format("%s,%s", text,
uri.substring(0, uri.length() > 1024 ? 1024 : uri.length())));
}
};
}
Expand Down Expand Up @@ -331,6 +347,47 @@ public void testLongPath() throws Exception {
}
}

@Test
public void testLongUri() throws Exception {
Properties props = new Properties();
props.setProperty("httpReverseProxy.3.path", "/service3");
props.setProperty("httpReverseProxy.3.proxyTo", backingServer3.getURI().toString());
props.setProperty("servicePort", "0");
props.setProperty("webServicePort", "0");

ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
AuthenticationService authService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));

StringBuilder longUri = new StringBuilder("/service3/tp");
for (int i = 10 * 1024; i > 0; i = i - 11){
longUri.append("_sub1_RETRY");
}

WebServer webServerMaxUriLen8k = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen8k, proxyConfig, null,
new BrokerDiscoveryProvider(proxyConfig, resource));
webServerMaxUriLen8k.start();
try {
Response r = client.target(webServerMaxUriLen8k.getServiceUri()).path(longUri.toString()).request().get();
Assert.assertEquals(r.getStatus(), Response.Status.REQUEST_URI_TOO_LONG.getStatusCode());
} finally {
webServerMaxUriLen8k.stop();
}

proxyConfig.setHttpMaxRequestHeaderSize(12 * 1024);
WebServer webServerMaxUriLen12k = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen12k, proxyConfig, null,
new BrokerDiscoveryProvider(proxyConfig, resource));
webServerMaxUriLen12k.start();
try {
Response r = client.target(webServerMaxUriLen12k.getServiceUri()).path(longUri.toString()).request().get();
Assert.assertEquals(r.getStatus(), Response.Status.OK.getStatusCode());
} finally {
webServerMaxUriLen12k.stop();
}
}

@Test
public void testPathEndsInSlash() throws Exception {
Properties props = new Properties();
Expand Down

0 comments on commit b7f7e04

Please sign in to comment.