Skip to content

Commit

Permalink
Fixed Proxy leaking oubound connections (#11848)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Aug 31, 2021
1 parent 00f99bb commit 6486a55
Showing 1 changed file with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
// ConnectionPool is used by the proxy to issue lookup requests
private PulsarClientImpl client;
private ConnectionPool connectionPool;
private ProxyService service;
private Authentication clientAuthentication;
AuthenticationDataSource authenticationData;
Expand Down Expand Up @@ -159,6 +160,14 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
service.getClientCnxs().remove(this);
LOG.info("[{}] Connection closed", remoteAddress);

if (connectionPool != null) {
try {
connectionPool.close();
} catch (Exception e) {
LOG.error("Failed to close connection pool {}", e.getMessage(), e);
}
}
}

@Override
Expand Down Expand Up @@ -297,9 +306,10 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),

// authn not enabled, complete
if (!service.getConfiguration().isAuthenticationEnabled()) {
this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup(),
new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)), service.getTimer());
this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion));
this.client =
new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());

completeConnect();
return;
Expand Down Expand Up @@ -434,9 +444,10 @@ ClientConfigurationData createClientConfiguration() throws UnsupportedAuthentica

private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData,
final String clientAuthMethod, final int protocolVersion) throws PulsarClientException {
return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf,
service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion)), service.getTimer());
this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
clientAuthMethod, protocolVersion));
return new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
}

private static int getProtocolVersionToAdvertise(CommandConnect connect) {
Expand Down

0 comments on commit 6486a55

Please sign in to comment.