Skip to content

Commit

Permalink
Compress async search responses before storing (elastic#74766)
Browse files Browse the repository at this point in the history
Related to elastic#67594
  • Loading branch information
dnhatn committed Jul 1, 2021
1 parent 005f209 commit 59d4508
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 53 deletions.
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer;

import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -582,18 +584,28 @@ boolean ensureAuthenticatedUserIsSame(Map<String, String> originHeaders, Authent
}

private void writeResponse(R response, OutputStream os) throws IOException {
os = new FilterOutputStream(os) {
@Override
public void close() {
// do not close the output
}
};
final Version minNodeVersion = clusterService.state().nodes().getMinNodeVersion();
final OutputStreamStreamOutput out = new OutputStreamStreamOutput(os);
out.setVersion(minNodeVersion);
Version.writeVersion(minNodeVersion, out);
response.writeTo(out);
Version.writeVersion(minNodeVersion, new OutputStreamStreamOutput(os));
if (minNodeVersion.onOrAfter(Version.V_8_0_0)) {
os = CompressorFactory.COMPRESSOR.threadLocalOutputStream(os);
}
try (OutputStreamStreamOutput out = new OutputStreamStreamOutput(os)) {
out.setVersion(minNodeVersion);
response.writeTo(out);
}
}

/**
* Decode the provided base-64 bytes into a {@link AsyncSearchResponse}.
*/
private R decodeResponse(CharBuffer encodedBuffer) throws IOException {
final InputStream encodedIn = Base64.getDecoder().wrap(new InputStream() {
InputStream encodedIn = Base64.getDecoder().wrap(new InputStream() {
@Override
public int read() {
if (encodedBuffer.hasRemaining()) {
Expand All @@ -603,9 +615,12 @@ public int read() {
}
}
});
final Version version = Version.readVersion(new InputStreamStreamInput(encodedIn));
assert version.onOrBefore(Version.CURRENT) : version + " >= " + Version.CURRENT;
if (version.onOrAfter(Version.V_8_0_0)) {
encodedIn = CompressorFactory.COMPRESSOR.threadLocalInputStream(encodedIn);
}
try (StreamInput in = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(encodedIn), registry)) {
final Version version = Version.readVersion(in);
assert version.onOrBefore(Version.CURRENT) : version + " >= " + Version.CURRENT;
in.setVersion(version);
return reader.read(in);
}
Expand Down
Expand Up @@ -305,47 +305,48 @@ public CircuitBreakerStats stats(String name) {
}

public void testMaxAsyncSearchResponseSize() throws Exception {
// successfully create an initial response
AsyncExecutionId executionId1 = new AsyncExecutionId(Long.toString(randomNonNegativeLong()),
new TaskId(randomAlphaOfLength(10), randomNonNegativeLong()));
TestAsyncResponse initialResponse = new TestAsyncResponse(randomAlphaOfLength(130), randomLong());
PlainActionFuture<IndexResponse> createFuture1 = new PlainActionFuture<>();
indexService.createResponse(executionId1.getDocId(), Map.of(), initialResponse, createFuture1);
createFuture1.actionGet();

// setting very small limit for the max size of async search response
int limit = randomIntBetween(1, 125);
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(Settings.builder().put("search.max_async_search_response_size", limit + "b"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
String expectedErrMsg = "Can't store an async search response larger than ["+ limit + "] bytes. " +
"This limit can be set by changing the [" + MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING.getKey() + "] setting.";

// test that an update operation of the initial response fails
PlainActionFuture<UpdateResponse> updateFuture = new PlainActionFuture<>();
TestAsyncResponse updateResponse = new TestAsyncResponse(randomAlphaOfLength(130), randomLong());
indexService.updateResponse(executionId1.getDocId(), Map.of(), updateResponse, updateFuture);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, updateFuture::actionGet);
assertEquals(expectedErrMsg, e.getMessage());
assertEquals(0, e.getSuppressed().length); // no other suppressed exceptions
// test that the inital response is overwritten with a failure
PlainActionFuture<TestAsyncResponse> getFuture = new PlainActionFuture<>();
indexService.getResponse(executionId1, randomBoolean(), getFuture);
assertEquals(expectedErrMsg, getFuture.actionGet().failure);

// test that a create operation fails
AsyncExecutionId executionId2 = new AsyncExecutionId(Long.toString(randomNonNegativeLong()),
new TaskId(randomAlphaOfLength(10), randomNonNegativeLong()));
PlainActionFuture<IndexResponse> createFuture = new PlainActionFuture<>();
TestAsyncResponse initialResponse2 = new TestAsyncResponse(randomAlphaOfLength(130), randomLong());
indexService.createResponse(executionId2.getDocId(), Map.of(), initialResponse2, createFuture);
IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class, createFuture::actionGet);
assertEquals(expectedErrMsg, e2.getMessage());
assertEquals(0, e2.getSuppressed().length); // no other suppressed exceptions

// restoring limit
updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(Settings.builder().put("search.max_async_search_response_size", (String) null));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
try {
// successfully create an initial response
AsyncExecutionId executionId1 = new AsyncExecutionId(Long.toString(randomNonNegativeLong()),
new TaskId(randomAlphaOfLength(10), randomNonNegativeLong()));
TestAsyncResponse initialResponse = new TestAsyncResponse(randomAlphaOfLength(130), randomLong());
PlainActionFuture<IndexResponse> createFuture1 = new PlainActionFuture<>();
indexService.createResponse(executionId1.getDocId(), Map.of(), initialResponse, createFuture1);
createFuture1.actionGet();

// setting very small limit for the max size of async search response
int limit = randomIntBetween(1, 125);
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(Settings.builder().put("search.max_async_search_response_size", limit + "b"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
String expectedErrMsg = "Can't store an async search response larger than [" + limit + "] bytes. " +
"This limit can be set by changing the [" + MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING.getKey() + "] setting.";

// test that an update operation of the initial response fails
PlainActionFuture<UpdateResponse> updateFuture = new PlainActionFuture<>();
TestAsyncResponse updateResponse = new TestAsyncResponse(randomAlphaOfLength(130), randomLong());
indexService.updateResponse(executionId1.getDocId(), Map.of(), updateResponse, updateFuture);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, updateFuture::actionGet);
assertEquals(expectedErrMsg, e.getMessage());
// test that the inital response is overwritten with a failure
PlainActionFuture<TestAsyncResponse> getFuture = new PlainActionFuture<>();
indexService.getResponse(executionId1, randomBoolean(), getFuture);
assertEquals(expectedErrMsg, getFuture.actionGet().failure);

// test that a create operation fails
AsyncExecutionId executionId2 = new AsyncExecutionId(Long.toString(randomNonNegativeLong()),
new TaskId(randomAlphaOfLength(10), randomNonNegativeLong()));
PlainActionFuture<IndexResponse> createFuture = new PlainActionFuture<>();
TestAsyncResponse initialResponse2 = new TestAsyncResponse(randomAlphaOfLength(130), randomLong());
indexService.createResponse(executionId2.getDocId(), Map.of(), initialResponse2, createFuture);
IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class, createFuture::actionGet);
assertEquals(expectedErrMsg, e2.getMessage());
} finally {
// restoring limit
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(Settings.builder().put("search.max_async_search_response_size", (String) null));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}
}

}
Expand Up @@ -15,7 +15,9 @@
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -39,6 +41,7 @@
import org.hamcrest.Description;
import org.junit.After;

import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Base64;
Expand Down Expand Up @@ -286,8 +289,10 @@ public StoredAsyncResponse<EqlSearchResponse> getStoredRecord(String id) throws
if (doc.isExists()) {
String value = doc.getSource().get("result").toString();
try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) {
try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) {
in.setVersion(Version.readVersion(in));
final Version version = Version.readVersion(buf);
final InputStream compressedIn = CompressorFactory.COMPRESSOR.threadLocalInputStream(buf);
try (StreamInput in = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(compressedIn), registry)) {
in.setVersion(version);
return new StoredAsyncResponse<>(EqlSearchResponse::new, in);
}
}
Expand Down
Expand Up @@ -14,7 +14,9 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -37,6 +39,7 @@
import org.elasticsearch.xpack.sql.proto.Protocol;
import org.junit.After;

import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Base64;
Expand Down Expand Up @@ -285,8 +288,10 @@ public StoredAsyncResponse<SqlQueryResponse> getStoredRecord(String id) throws E
if (doc.isExists()) {
String value = doc.getSource().get("result").toString();
try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) {
try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) {
in.setVersion(Version.readVersion(in));
final Version version = Version.readVersion(buf);
final InputStream compressedIn = CompressorFactory.COMPRESSOR.threadLocalInputStream(buf);
try (StreamInput in = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(compressedIn), registry)) {
in.setVersion(version);
return new StoredAsyncResponse<>(SqlQueryResponse::new, in);
}
}
Expand Down

0 comments on commit 59d4508

Please sign in to comment.