Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
tkountis committed Jun 30, 2020
1 parent e086b9c commit f99993f
Showing 1 changed file with 50 additions and 41 deletions.
Expand Up @@ -115,10 +115,12 @@ public class MetricsCompressor {

// output streams for the blob containing the dictionary
private DataOutputStream dictionaryDos;
private Deflater dictionaryCompressor;
private MorePublicByteArrayOutputStream dictionaryBaos = new MorePublicByteArrayOutputStream(INITIAL_BUFFER_SIZE_DICTIONARY);

// output streams for the blob containing the metrics
private DataOutputStream metricDos;
private Deflater metricsCompressor;
private MorePublicByteArrayOutputStream metricBaos = new MorePublicByteArrayOutputStream(INITIAL_BUFFER_SIZE_METRICS);

// temporary buffer to avoid DeflaterOutputStream's extra byte[] allocations
Expand Down Expand Up @@ -287,8 +289,9 @@ public int count() {
return count;
}


private void reset(int estimatedBytesDictionary, int estimatedBytesMetrics) {
Deflater dictionaryCompressor = new Deflater();
dictionaryCompressor = new Deflater();
dictionaryCompressor.setLevel(Deflater.BEST_SPEED);

// shrink the `dictionaryBaos` if capacity is more than 50% larger than the estimated size
Expand All @@ -298,7 +301,7 @@ private void reset(int estimatedBytesDictionary, int estimatedBytesMetrics) {
dictionaryBaos.reset();
dictionaryDos = new DataOutputStream(new DeflaterOutputStream(dictionaryBaos, dictionaryCompressor));

Deflater metricsCompressor = new Deflater();
metricsCompressor = new Deflater();
metricsCompressor.setLevel(Deflater.BEST_SPEED);
// shrink the `metricsBaos` if capacity is more than 50% larger than the estimated size
if (metricBaos.capacity() > multiplyExact(estimatedBytesMetrics, 3) / 2) {
Expand All @@ -316,7 +319,9 @@ private byte[] getRenderedBlob() {
try {
writeDictionary();
dictionaryDos.close();
dictionaryCompressor.end();
metricDos.close();
metricsCompressor.end();
} catch (IOException e) {
// should never be thrown
throw new RuntimeException(e);
Expand Down Expand Up @@ -381,31 +386,31 @@ public static void extractMetrics(byte[] blob, MetricConsumer consumer, Supplier
}

private static String[] readDictionary(byte[] dictionaryBlob) throws IOException {
DataInputStream dis = new DataInputStream(new InflaterInputStream(new ByteArrayInputStream(dictionaryBlob)));
int dictionarySize = dis.readInt();
String[] dictionary = new String[dictionarySize];
String lastWord = "";
StringBuilder sb = new StringBuilder();

for (int i = 0; i < dictionarySize; i++) {
int dictionaryId = dis.readInt();
byte commonLen = dis.readByte();
byte diffLen = dis.readByte();

for (int j = 0; j < commonLen; j++) {
sb.append(lastWord.charAt(j));
}
try (DataInputStream dis = new DataInputStream(new InflaterInputStream(new ByteArrayInputStream(dictionaryBlob)))) {
int dictionarySize = dis.readInt();
String[] dictionary = new String[dictionarySize];
String lastWord = "";
StringBuilder sb = new StringBuilder();

for (int i = 0; i < dictionarySize; i++) {
int dictionaryId = dis.readInt();
byte commonLen = dis.readByte();
byte diffLen = dis.readByte();

for (int j = 0; j < commonLen; j++) {
sb.append(lastWord.charAt(j));
}

for (int j = 0; j < diffLen; j++) {
sb.append(dis.readChar());
for (int j = 0; j < diffLen; j++) {
sb.append(dis.readChar());
}
String readWord = sb.toString();
lastWord = readWord;
dictionary[dictionaryId] = readWord;
sb.delete(0, commonLen + diffLen);
}
String readWord = sb.toString();
lastWord = readWord;
dictionary[dictionaryId] = readWord;
sb.delete(0, commonLen + diffLen);
return dictionary;
}

return dictionary;
}

private static class MorePublicByteArrayOutputStream extends ByteArrayOutputStream {
Expand Down Expand Up @@ -464,24 +469,28 @@ private MetricsDecompressor(byte[] metricsBlob, int countMetrics, String[] dicti
}

private void extractMetrics() throws IOException {
for (int i = 0; i < countMetrics; i++) {
MetricDescriptor descriptor = readMetricDescriptor();
lastDescriptor.copy(descriptor);

int typeOrdinal = dis.readUnsignedByte();
ValueType type = ValueType.valueOf(typeOrdinal);

switch (type) {
case LONG:
consumer.consumeLong(descriptor, dis.readLong());
break;
case DOUBLE:
consumer.consumeDouble(descriptor, dis.readDouble());
break;
default:
throw new IllegalStateException("Unexpected metric value type: " + type + " with ordinal " + typeOrdinal);
try {
for (int i = 0; i < countMetrics; i++) {
MetricDescriptor descriptor = readMetricDescriptor();
lastDescriptor.copy(descriptor);

int typeOrdinal = dis.readUnsignedByte();
ValueType type = ValueType.valueOf(typeOrdinal);

switch (type) {
case LONG:
consumer.consumeLong(descriptor, dis.readLong());
break;
case DOUBLE:
consumer.consumeDouble(descriptor, dis.readDouble());
break;
default:
throw new IllegalStateException("Unexpected metric value type: "
+ type + " with ordinal " + typeOrdinal);
}
}

} finally {
dis.close();
}
}

Expand Down

0 comments on commit f99993f

Please sign in to comment.