From f99993f14031809fa453389cdaa4e4421f5789e9 Mon Sep 17 00:00:00 2001 From: tkountis Date: Tue, 30 Jun 2020 16:56:01 +0100 Subject: [PATCH] WIP --- .../metrics/impl/MetricsCompressor.java | 91 ++++++++++--------- 1 file changed, 50 insertions(+), 41 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/internal/metrics/impl/MetricsCompressor.java b/hazelcast/src/main/java/com/hazelcast/internal/metrics/impl/MetricsCompressor.java index fa56e269d4f4f..075be77732750 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/metrics/impl/MetricsCompressor.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/metrics/impl/MetricsCompressor.java @@ -115,10 +115,12 @@ public class MetricsCompressor { // output streams for the blob containing the dictionary private DataOutputStream dictionaryDos; + private Deflater dictionaryCompressor; private MorePublicByteArrayOutputStream dictionaryBaos = new MorePublicByteArrayOutputStream(INITIAL_BUFFER_SIZE_DICTIONARY); // output streams for the blob containing the metrics private DataOutputStream metricDos; + private Deflater metricsCompressor; private MorePublicByteArrayOutputStream metricBaos = new MorePublicByteArrayOutputStream(INITIAL_BUFFER_SIZE_METRICS); // temporary buffer to avoid DeflaterOutputStream's extra byte[] allocations @@ -287,8 +289,9 @@ public int count() { return count; } + private void reset(int estimatedBytesDictionary, int estimatedBytesMetrics) { - Deflater dictionaryCompressor = new Deflater(); + dictionaryCompressor = new Deflater(); dictionaryCompressor.setLevel(Deflater.BEST_SPEED); // shrink the `dictionaryBaos` if capacity is more than 50% larger than the estimated size @@ -298,7 +301,7 @@ private void reset(int estimatedBytesDictionary, int estimatedBytesMetrics) { dictionaryBaos.reset(); dictionaryDos = new DataOutputStream(new DeflaterOutputStream(dictionaryBaos, dictionaryCompressor)); - Deflater metricsCompressor = new Deflater(); + metricsCompressor = new Deflater(); metricsCompressor.setLevel(Deflater.BEST_SPEED); // shrink the `metricsBaos` if capacity is more than 50% larger than the estimated size if (metricBaos.capacity() > multiplyExact(estimatedBytesMetrics, 3) / 2) { @@ -316,7 +319,9 @@ private byte[] getRenderedBlob() { try { writeDictionary(); dictionaryDos.close(); + dictionaryCompressor.end(); metricDos.close(); + metricsCompressor.end(); } catch (IOException e) { // should never be thrown throw new RuntimeException(e); @@ -381,31 +386,31 @@ public static void extractMetrics(byte[] blob, MetricConsumer consumer, Supplier } private static String[] readDictionary(byte[] dictionaryBlob) throws IOException { - DataInputStream dis = new DataInputStream(new InflaterInputStream(new ByteArrayInputStream(dictionaryBlob))); - int dictionarySize = dis.readInt(); - String[] dictionary = new String[dictionarySize]; - String lastWord = ""; - StringBuilder sb = new StringBuilder(); - - for (int i = 0; i < dictionarySize; i++) { - int dictionaryId = dis.readInt(); - byte commonLen = dis.readByte(); - byte diffLen = dis.readByte(); - - for (int j = 0; j < commonLen; j++) { - sb.append(lastWord.charAt(j)); - } + try (DataInputStream dis = new DataInputStream(new InflaterInputStream(new ByteArrayInputStream(dictionaryBlob)))) { + int dictionarySize = dis.readInt(); + String[] dictionary = new String[dictionarySize]; + String lastWord = ""; + StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < dictionarySize; i++) { + int dictionaryId = dis.readInt(); + byte commonLen = dis.readByte(); + byte diffLen = dis.readByte(); + + for (int j = 0; j < commonLen; j++) { + sb.append(lastWord.charAt(j)); + } - for (int j = 0; j < diffLen; j++) { - sb.append(dis.readChar()); + for (int j = 0; j < diffLen; j++) { + sb.append(dis.readChar()); + } + String readWord = sb.toString(); + lastWord = readWord; + dictionary[dictionaryId] = readWord; + sb.delete(0, commonLen + diffLen); } - String readWord = sb.toString(); - lastWord = readWord; - dictionary[dictionaryId] = readWord; - sb.delete(0, commonLen + diffLen); + return dictionary; } - - return dictionary; } private static class MorePublicByteArrayOutputStream extends ByteArrayOutputStream { @@ -464,24 +469,28 @@ private MetricsDecompressor(byte[] metricsBlob, int countMetrics, String[] dicti } private void extractMetrics() throws IOException { - for (int i = 0; i < countMetrics; i++) { - MetricDescriptor descriptor = readMetricDescriptor(); - lastDescriptor.copy(descriptor); - - int typeOrdinal = dis.readUnsignedByte(); - ValueType type = ValueType.valueOf(typeOrdinal); - - switch (type) { - case LONG: - consumer.consumeLong(descriptor, dis.readLong()); - break; - case DOUBLE: - consumer.consumeDouble(descriptor, dis.readDouble()); - break; - default: - throw new IllegalStateException("Unexpected metric value type: " + type + " with ordinal " + typeOrdinal); + try { + for (int i = 0; i < countMetrics; i++) { + MetricDescriptor descriptor = readMetricDescriptor(); + lastDescriptor.copy(descriptor); + + int typeOrdinal = dis.readUnsignedByte(); + ValueType type = ValueType.valueOf(typeOrdinal); + + switch (type) { + case LONG: + consumer.consumeLong(descriptor, dis.readLong()); + break; + case DOUBLE: + consumer.consumeDouble(descriptor, dis.readDouble()); + break; + default: + throw new IllegalStateException("Unexpected metric value type: " + + type + " with ordinal " + typeOrdinal); + } } - + } finally { + dis.close(); } }