Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[UNDERTOW-2361] handle inflater wrapping properly in deflate encoding #1578

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,10 @@ protected void dataDeflated(byte[] data, int off, int len) {
totalOut += len;
}

@Override
protected boolean isZlibHeaderPresent(ByteBuffer buf) {
//this will default to no wrapping object pool
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,18 @@ public StreamSourceConduit wrap(ConduitFactory<StreamSourceConduit> factory, Htt
};

private volatile Inflater inflater;
private volatile PooledObject<Inflater> activePooledObject;

private final PooledObject<Inflater> pooledObject;
private final ObjectPool<Inflater> objectPoolNonWrapping;
private final ObjectPool<Inflater> objectPoolWrapping;
private final HttpServerExchange exchange;
private PooledByteBuffer compressed;
private PooledByteBuffer uncompressed;
private boolean nextDone = false;
private boolean headerDone = false;

public InflatingStreamSourceConduit(HttpServerExchange exchange, StreamSourceConduit next) {
this(exchange, next, newInstanceInflaterPool());
this(exchange, next, newInstanceInflaterPool(), newInstanceWrappingInflaterPool());
}

public InflatingStreamSourceConduit(
Expand All @@ -72,18 +74,52 @@ public InflatingStreamSourceConduit(
ObjectPool<Inflater> inflaterPool) {
super(next);
this.exchange = exchange;
this.pooledObject = inflaterPool.allocate();
this.inflater = pooledObject.getObject();
this.objectPoolNonWrapping = inflaterPool;
this.objectPoolWrapping = null;
}

public InflatingStreamSourceConduit(
HttpServerExchange exchange,
StreamSourceConduit next,
ObjectPool<Inflater> inflaterPool,
ObjectPool<Inflater> inflaterWrappingPool) {
super(next);
this.exchange = exchange;
this.objectPoolNonWrapping = inflaterPool;
this.objectPoolWrapping = inflaterWrappingPool;
}
/**
* Create non-wrapping(gzip/zlib without headers) inflater pool
* @return
*/
public static ObjectPool<Inflater> newInstanceInflaterPool() {
return new NewInstanceObjectPool<>(() -> new Inflater(true), Inflater::end);
}

/**
* Create non-wrapping(gzip/zlib without headers) inflater pool
* @return
*/
public static ObjectPool<Inflater> simpleInflaterPool(int poolSize) {
return new SimpleObjectPool<>(poolSize, () -> new Inflater(true), Inflater::reset, Inflater::end);
}

/**
* Create wrapping inflater pool, one that expects headers.
* @return
*/
public static ObjectPool<Inflater> newInstanceWrappingInflaterPool(){
return new NewInstanceObjectPool<>(() -> new Inflater(false), Inflater::end);
}

/**
* Create wrapping inflater pool, one that expects headers.
* @return
*/
public static ObjectPool<Inflater> simpleWrappingInflaterPool(int poolSize) {
return new SimpleObjectPool<>(poolSize, () -> new Inflater(false), Inflater::reset, Inflater::end);
}

@Override
public int read(ByteBuffer dst) throws IOException {
if (isReadShutdown()) {
Expand Down Expand Up @@ -115,6 +151,8 @@ public int read(ByteBuffer dst) throws IOException {
if (!headerDone) {
headerDone = readHeader(buf);
}

initializeInflater(buf);
inflater.setInput(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
}
}
Expand Down Expand Up @@ -171,6 +209,35 @@ public int read(ByteBuffer dst) throws IOException {
}
}

protected void initializeInflater(ByteBuffer buf) {
if(isZlibHeaderPresent(buf)) {
this.activePooledObject = this.objectPoolWrapping.allocate();
} else {
this.activePooledObject = this.objectPoolNonWrapping.allocate();
}
this.inflater = this.activePooledObject.getObject();
}

protected boolean isZlibHeaderPresent(final ByteBuffer buf) {
// https://www.ietf.org/rfc/rfc1950.txt - 2.2. - Data format, two bytes. Below is sort of a cheat, we have so much power
//to quickly compress to best cap.
// FLEVEL: 0 1 2 3
// CINFO:
// 0 08 1D 08 5B 08 99 08 D7
// 1 18 19 18 57 18 95 18 D3
// 2 28 15 28 53 28 91 28 CF
// 3 38 11 38 4F 38 8D 38 CB
// 4 48 0D 48 4B 48 89 48 C7
// 5 58 09 58 47 58 85 58 C3
// 6 68 05 68 43 68 81 68 DE
// 7 78 01 78 5E 78 9C 78 DA
buf.mark();
final char cmf = (char)(buf.get() & 0xFF);
final char flg = (char)(buf.get() & 0xFF);
buf.reset();
return (cmf == 0x78 && (flg == 0x01 || flg == 0x5E || flg == 0x9c || flg == 0xDA));
}

protected void readFooter(ByteBuffer buf) throws IOException {

}
Expand All @@ -191,7 +258,8 @@ private void done() {
uncompressed.close();
}
if (inflater != null) {
pooledObject.close();
activePooledObject.close();
activePooledObject = null;
inflater = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,25 @@ public void testGzipEncoding() throws IOException {
runTest(sb.toString(), "gzip");
}

private static final String MESSAGE = "COMPRESSED I'AM";
private static final byte[] COMPRESSED_MESSAGE = { 0x78, (byte) (0x9C & 0xFF), 0x73, (byte) (0xF6 & 0xFF),
(byte) (0xF7 & 0xFF), 0x0D, 0x08, 0x72, 0x0D, 0x0E, 0x76, 0x75, 0x51, (byte) (0xF0 & 0xFF), 0x54, 0x77,
(byte) (0xF4 & 0xFF), 0x05, 0x00, 0x22, 0x35, 0x04, 0x14 };

@Test
public void testDeflateWithNoWrapping() throws IOException {
HttpPost post = new HttpPost(DefaultServer.getDefaultServerURL() + "/decode");
post.setEntity(new ByteArrayEntity(COMPRESSED_MESSAGE));
post.addHeader(Headers.CONTENT_ENCODING_STRING, "deflate");

try (CloseableHttpClient client = HttpClientBuilder.create().disableContentCompression().build()) {
HttpResponse result = client.execute(post);
Assert.assertEquals(StatusCodes.OK, result.getStatusLine().getStatusCode());
String sb = HttpClientUtils.readResponse(result);
Assert.assertEquals(MESSAGE.length(), sb.length());
Assert.assertEquals(MESSAGE, sb);
}
}

public void runTest(final String theMessage, String encoding) throws IOException {
try (CloseableHttpClient client = HttpClientBuilder.create().disableContentCompression().build()){
Expand Down