diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index 54f778ea454e9..5dfe317cb7753 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -59,6 +60,7 @@ import org.elasticsearch.xpack.core.security.authc.Authentication; import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer; +import java.io.FilterOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -582,18 +584,28 @@ boolean ensureAuthenticatedUserIsSame(Map originHeaders, Authent } private void writeResponse(R response, OutputStream os) throws IOException { + os = new FilterOutputStream(os) { + @Override + public void close() { + // do not close the output + } + }; final Version minNodeVersion = clusterService.state().nodes().getMinNodeVersion(); - final OutputStreamStreamOutput out = new OutputStreamStreamOutput(os); - out.setVersion(minNodeVersion); - Version.writeVersion(minNodeVersion, out); - response.writeTo(out); + Version.writeVersion(minNodeVersion, new OutputStreamStreamOutput(os)); + if (minNodeVersion.onOrAfter(Version.V_8_0_0)) { + os = CompressorFactory.COMPRESSOR.threadLocalOutputStream(os); + } + try (OutputStreamStreamOutput out = new OutputStreamStreamOutput(os)) { + out.setVersion(minNodeVersion); + response.writeTo(out); + } } /** * Decode the provided base-64 bytes into a {@link AsyncSearchResponse}. */ private R decodeResponse(CharBuffer encodedBuffer) throws IOException { - final InputStream encodedIn = Base64.getDecoder().wrap(new InputStream() { + InputStream encodedIn = Base64.getDecoder().wrap(new InputStream() { @Override public int read() { if (encodedBuffer.hasRemaining()) { @@ -603,9 +615,12 @@ public int read() { } } }); + final Version version = Version.readVersion(new InputStreamStreamInput(encodedIn)); + assert version.onOrBefore(Version.CURRENT) : version + " >= " + Version.CURRENT; + if (version.onOrAfter(Version.V_8_0_0)) { + encodedIn = CompressorFactory.COMPRESSOR.threadLocalInputStream(encodedIn); + } try (StreamInput in = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(encodedIn), registry)) { - final Version version = Version.readVersion(in); - assert version.onOrBefore(Version.CURRENT) : version + " >= " + Version.CURRENT; in.setVersion(version); return reader.read(in); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java index 73ff1a4754e9b..34e4ca1589c4d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java @@ -305,47 +305,48 @@ public CircuitBreakerStats stats(String name) { } public void testMaxAsyncSearchResponseSize() throws Exception { - // successfully create an initial response - AsyncExecutionId executionId1 = new AsyncExecutionId(Long.toString(randomNonNegativeLong()), - new TaskId(randomAlphaOfLength(10), randomNonNegativeLong())); - TestAsyncResponse initialResponse = new TestAsyncResponse(randomAlphaOfLength(130), randomLong()); - PlainActionFuture createFuture1 = new PlainActionFuture<>(); - indexService.createResponse(executionId1.getDocId(), Map.of(), initialResponse, createFuture1); - createFuture1.actionGet(); - - // setting very small limit for the max size of async search response - int limit = randomIntBetween(1, 125); - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.transientSettings(Settings.builder().put("search.max_async_search_response_size", limit + "b")); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - String expectedErrMsg = "Can't store an async search response larger than ["+ limit + "] bytes. " + - "This limit can be set by changing the [" + MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING.getKey() + "] setting."; - - // test that an update operation of the initial response fails - PlainActionFuture updateFuture = new PlainActionFuture<>(); - TestAsyncResponse updateResponse = new TestAsyncResponse(randomAlphaOfLength(130), randomLong()); - indexService.updateResponse(executionId1.getDocId(), Map.of(), updateResponse, updateFuture); - IllegalArgumentException e = expectThrows(IllegalArgumentException.class, updateFuture::actionGet); - assertEquals(expectedErrMsg, e.getMessage()); - assertEquals(0, e.getSuppressed().length); // no other suppressed exceptions - // test that the inital response is overwritten with a failure - PlainActionFuture getFuture = new PlainActionFuture<>(); - indexService.getResponse(executionId1, randomBoolean(), getFuture); - assertEquals(expectedErrMsg, getFuture.actionGet().failure); - - // test that a create operation fails - AsyncExecutionId executionId2 = new AsyncExecutionId(Long.toString(randomNonNegativeLong()), - new TaskId(randomAlphaOfLength(10), randomNonNegativeLong())); - PlainActionFuture createFuture = new PlainActionFuture<>(); - TestAsyncResponse initialResponse2 = new TestAsyncResponse(randomAlphaOfLength(130), randomLong()); - indexService.createResponse(executionId2.getDocId(), Map.of(), initialResponse2, createFuture); - IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class, createFuture::actionGet); - assertEquals(expectedErrMsg, e2.getMessage()); - assertEquals(0, e2.getSuppressed().length); // no other suppressed exceptions - - // restoring limit - updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.transientSettings(Settings.builder().put("search.max_async_search_response_size", (String) null)); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + try { + // successfully create an initial response + AsyncExecutionId executionId1 = new AsyncExecutionId(Long.toString(randomNonNegativeLong()), + new TaskId(randomAlphaOfLength(10), randomNonNegativeLong())); + TestAsyncResponse initialResponse = new TestAsyncResponse(randomAlphaOfLength(130), randomLong()); + PlainActionFuture createFuture1 = new PlainActionFuture<>(); + indexService.createResponse(executionId1.getDocId(), Map.of(), initialResponse, createFuture1); + createFuture1.actionGet(); + + // setting very small limit for the max size of async search response + int limit = randomIntBetween(1, 125); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.transientSettings(Settings.builder().put("search.max_async_search_response_size", limit + "b")); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + String expectedErrMsg = "Can't store an async search response larger than [" + limit + "] bytes. " + + "This limit can be set by changing the [" + MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING.getKey() + "] setting."; + + // test that an update operation of the initial response fails + PlainActionFuture updateFuture = new PlainActionFuture<>(); + TestAsyncResponse updateResponse = new TestAsyncResponse(randomAlphaOfLength(130), randomLong()); + indexService.updateResponse(executionId1.getDocId(), Map.of(), updateResponse, updateFuture); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, updateFuture::actionGet); + assertEquals(expectedErrMsg, e.getMessage()); + // test that the inital response is overwritten with a failure + PlainActionFuture getFuture = new PlainActionFuture<>(); + indexService.getResponse(executionId1, randomBoolean(), getFuture); + assertEquals(expectedErrMsg, getFuture.actionGet().failure); + + // test that a create operation fails + AsyncExecutionId executionId2 = new AsyncExecutionId(Long.toString(randomNonNegativeLong()), + new TaskId(randomAlphaOfLength(10), randomNonNegativeLong())); + PlainActionFuture createFuture = new PlainActionFuture<>(); + TestAsyncResponse initialResponse2 = new TestAsyncResponse(randomAlphaOfLength(130), randomLong()); + indexService.createResponse(executionId2.getDocId(), Map.of(), initialResponse2, createFuture); + IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class, createFuture::actionGet); + assertEquals(expectedErrMsg, e2.getMessage()); + } finally { + // restoring limit + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.transientSettings(Settings.builder().put("search.max_async_search_response_size", (String) null)); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + } } + } diff --git a/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java index 8f5e3ebeb03b1..94ce58dce9cd1 100644 --- a/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java +++ b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java @@ -15,7 +15,9 @@ import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; +import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; @@ -39,6 +41,7 @@ import org.hamcrest.Description; import org.junit.After; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Base64; @@ -286,8 +289,10 @@ public StoredAsyncResponse getStoredRecord(String id) throws if (doc.isExists()) { String value = doc.getSource().get("result").toString(); try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) { - try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) { - in.setVersion(Version.readVersion(in)); + final Version version = Version.readVersion(buf); + final InputStream compressedIn = CompressorFactory.COMPRESSOR.threadLocalInputStream(buf); + try (StreamInput in = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(compressedIn), registry)) { + in.setVersion(version); return new StoredAsyncResponse<>(EqlSearchResponse::new, in); } } diff --git a/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/AsyncSqlSearchActionIT.java b/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/AsyncSqlSearchActionIT.java index 7b4c851d1e628..0352646c55a64 100644 --- a/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/AsyncSqlSearchActionIT.java +++ b/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/AsyncSqlSearchActionIT.java @@ -14,7 +14,9 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; +import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; @@ -37,6 +39,7 @@ import org.elasticsearch.xpack.sql.proto.Protocol; import org.junit.After; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Base64; @@ -285,8 +288,10 @@ public StoredAsyncResponse getStoredRecord(String id) throws E if (doc.isExists()) { String value = doc.getSource().get("result").toString(); try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) { - try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) { - in.setVersion(Version.readVersion(in)); + final Version version = Version.readVersion(buf); + final InputStream compressedIn = CompressorFactory.COMPRESSOR.threadLocalInputStream(buf); + try (StreamInput in = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(compressedIn), registry)) { + in.setVersion(version); return new StoredAsyncResponse<>(SqlQueryResponse::new, in); } }