Skip to content

Commit

Permalink
Revert "Compress async search responses before storing (elastic#74766)"
Browse files Browse the repository at this point in the history
This reverts commit 55175de.
  • Loading branch information
dnhatn committed Jun 30, 2021
1 parent 55175de commit 005f209
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -60,7 +59,6 @@
import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer;

import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -584,28 +582,18 @@ boolean ensureAuthenticatedUserIsSame(Map<String, String> originHeaders, Authent
}

private void writeResponse(R response, OutputStream os) throws IOException {
os = new FilterOutputStream(os) {
@Override
public void close() {
// do not close the output
}
};
final Version minNodeVersion = clusterService.state().nodes().getMinNodeVersion();
Version.writeVersion(minNodeVersion, new OutputStreamStreamOutput(os));
if (minNodeVersion.onOrAfter(Version.V_8_0_0)) {
os = CompressorFactory.COMPRESSOR.threadLocalOutputStream(os);
}
try (OutputStreamStreamOutput out = new OutputStreamStreamOutput(os)) {
out.setVersion(minNodeVersion);
response.writeTo(out);
}
final OutputStreamStreamOutput out = new OutputStreamStreamOutput(os);
out.setVersion(minNodeVersion);
Version.writeVersion(minNodeVersion, out);
response.writeTo(out);
}

/**
* Decode the provided base-64 bytes into a {@link AsyncSearchResponse}.
*/
private R decodeResponse(CharBuffer encodedBuffer) throws IOException {
InputStream encodedIn = Base64.getDecoder().wrap(new InputStream() {
final InputStream encodedIn = Base64.getDecoder().wrap(new InputStream() {
@Override
public int read() {
if (encodedBuffer.hasRemaining()) {
Expand All @@ -615,12 +603,9 @@ public int read() {
}
}
});
final Version version = Version.readVersion(new InputStreamStreamInput(encodedIn));
assert version.onOrBefore(Version.CURRENT) : version + " >= " + Version.CURRENT;
if (version.onOrAfter(Version.V_8_0_0)) {
encodedIn = CompressorFactory.COMPRESSOR.threadLocalInputStream(encodedIn);
}
try (StreamInput in = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(encodedIn), registry)) {
final Version version = Version.readVersion(in);
assert version.onOrBefore(Version.CURRENT) : version + " >= " + Version.CURRENT;
in.setVersion(version);
return reader.read(in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -41,7 +39,6 @@
import org.hamcrest.Description;
import org.junit.After;

import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Base64;
Expand Down Expand Up @@ -289,10 +286,8 @@ public StoredAsyncResponse<EqlSearchResponse> getStoredRecord(String id) throws
if (doc.isExists()) {
String value = doc.getSource().get("result").toString();
try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) {
final Version version = Version.readVersion(buf);
final InputStream compressedIn = CompressorFactory.COMPRESSOR.threadLocalInputStream(buf);
try (StreamInput in = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(compressedIn), registry)) {
in.setVersion(version);
try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) {
in.setVersion(Version.readVersion(in));
return new StoredAsyncResponse<>(EqlSearchResponse::new, in);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -39,7 +37,6 @@
import org.elasticsearch.xpack.sql.proto.Protocol;
import org.junit.After;

import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Base64;
Expand Down Expand Up @@ -288,10 +285,8 @@ public StoredAsyncResponse<SqlQueryResponse> getStoredRecord(String id) throws E
if (doc.isExists()) {
String value = doc.getSource().get("result").toString();
try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) {
final Version version = Version.readVersion(buf);
final InputStream compressedIn = CompressorFactory.COMPRESSOR.threadLocalInputStream(buf);
try (StreamInput in = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(compressedIn), registry)) {
in.setVersion(version);
try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) {
in.setVersion(Version.readVersion(in));
return new StoredAsyncResponse<>(SqlQueryResponse::new, in);
}
}
Expand Down

0 comments on commit 005f209

Please sign in to comment.