Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
tkountis committed Jul 2, 2020
1 parent 857640d commit e79a69c
Showing 1 changed file with 48 additions and 38 deletions.
Expand Up @@ -117,10 +117,12 @@ public class MetricsCompressor {

// output streams for the blob containing the dictionary
private DataOutputStream dictionaryDos;
private Deflater dictionaryCompressor;
private MorePublicByteArrayOutputStream dictionaryBaos = new MorePublicByteArrayOutputStream(INITIAL_BUFFER_SIZE_DICTIONARY);

// output streams for the blob containing the metrics
private DataOutputStream metricDos;
private Deflater metricsCompressor;
private MorePublicByteArrayOutputStream metricBaos = new MorePublicByteArrayOutputStream(INITIAL_BUFFER_SIZE_METRICS);

// temporary buffer to avoid DeflaterOutputStream's extra byte[] allocations
Expand Down Expand Up @@ -289,8 +291,9 @@ public int count() {
return count;
}


private void reset(int estimatedBytesDictionary, int estimatedBytesMetrics) {
Deflater dictionaryCompressor = new Deflater();
dictionaryCompressor = new Deflater();
dictionaryCompressor.setLevel(Deflater.BEST_SPEED);

// shrink the `dictionaryBaos` if capacity is more than 50% larger than the estimated size
Expand All @@ -300,7 +303,7 @@ private void reset(int estimatedBytesDictionary, int estimatedBytesMetrics) {
dictionaryBaos.reset();
dictionaryDos = new DataOutputStream(new DeflaterOutputStream(dictionaryBaos, dictionaryCompressor));

Deflater metricsCompressor = new Deflater();
metricsCompressor = new Deflater();
metricsCompressor.setLevel(Deflater.BEST_SPEED);
// shrink the `metricsBaos` if capacity is more than 50% larger than the estimated size
if (metricBaos.capacity() > multiplyExact(estimatedBytesMetrics, 3) / 2) {
Expand All @@ -318,7 +321,9 @@ private byte[] getRenderedBlob() {
try {
writeDictionary();
dictionaryDos.close();
dictionaryCompressor.end();
metricDos.close();
metricsCompressor.end();
} catch (IOException e) {
// should never be thrown
throw new RuntimeException(e);
Expand Down Expand Up @@ -383,27 +388,28 @@ public static void extractMetrics(byte[] blob, MetricConsumer consumer, Supplier
}

private static String[] readDictionary(byte[] dictionaryBlob) throws IOException {
DataInputStream dis = new DataInputStream(new InflaterInputStream(new ByteArrayInputStream(dictionaryBlob)));
int dictionarySize = dis.readInt();
String[] dictionary = new String[dictionarySize];
String lastWord = "";
StringBuilder sb = new StringBuilder();

for (int i = 0; i < dictionarySize; i++) {
int dictionaryId = dis.readInt();
int commonLen = dis.readUnsignedByte();
int diffLen = dis.readUnsignedByte();
sb.append(lastWord, 0, commonLen);
for (int j = 0; j < diffLen; j++) {
sb.append(dis.readChar());
try (DataInputStream dis = new DataInputStream(new InflaterInputStream(new ByteArrayInputStream(dictionaryBlob)))) {
int dictionarySize = dis.readInt();
String[] dictionary = new String[dictionarySize];
String lastWord = "";
StringBuilder sb = new StringBuilder();

for (int i = 0; i < dictionarySize; i++) {
int dictionaryId = dis.readInt();
int commonLen = dis.readUnsignedByte();
int diffLen = dis.readUnsignedByte();
sb.append(lastWord, 0, commonLen);
for (int j = 0; j < diffLen; j++) {
sb.append(dis.readChar());
}
String readWord = sb.toString();
lastWord = readWord;
dictionary[dictionaryId] = readWord;
sb.setLength(0);
}
String readWord = sb.toString();
lastWord = readWord;
dictionary[dictionaryId] = readWord;
sb.setLength(0);
}

return dictionary;
return dictionary;
}
}

private static class MorePublicByteArrayOutputStream extends ByteArrayOutputStream {
Expand Down Expand Up @@ -462,24 +468,28 @@ private MetricsDecompressor(byte[] metricsBlob, int countMetrics, String[] dicti
}

private void extractMetrics() throws IOException {
for (int i = 0; i < countMetrics; i++) {
MetricDescriptor descriptor = readMetricDescriptor();
lastDescriptor.copy(descriptor);

int typeOrdinal = dis.readUnsignedByte();
ValueType type = ValueType.valueOf(typeOrdinal);

switch (type) {
case LONG:
consumer.consumeLong(descriptor, dis.readLong());
break;
case DOUBLE:
consumer.consumeDouble(descriptor, dis.readDouble());
break;
default:
throw new IllegalStateException("Unexpected metric value type: " + type + " with ordinal " + typeOrdinal);
try {
for (int i = 0; i < countMetrics; i++) {
MetricDescriptor descriptor = readMetricDescriptor();
lastDescriptor.copy(descriptor);

int typeOrdinal = dis.readUnsignedByte();
ValueType type = ValueType.valueOf(typeOrdinal);

switch (type) {
case LONG:
consumer.consumeLong(descriptor, dis.readLong());
break;
case DOUBLE:
consumer.consumeDouble(descriptor, dis.readDouble());
break;
default:
throw new IllegalStateException("Unexpected metric value type: "
+ type + " with ordinal " + typeOrdinal);
}
}

} finally {
dis.close();
}
}

Expand Down

0 comments on commit e79a69c

Please sign in to comment.