From 2eb2674179181ff7b581f72906d62559517ee4fb Mon Sep 17 00:00:00 2001 From: BO8979 Date: Wed, 2 Dec 2020 07:12:22 -0600 Subject: [PATCH] Pool table instances to make PureJavaSnappy thread safe https://github.com/xerial/snappy-java/issues/270 --- .../xerial/snappy/pure/PureJavaSnappy.java | 97 ++++++++++++++++--- 1 file changed, 81 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/xerial/snappy/pure/PureJavaSnappy.java b/src/main/java/org/xerial/snappy/pure/PureJavaSnappy.java index faf54ef3..a863ff13 100644 --- a/src/main/java/org/xerial/snappy/pure/PureJavaSnappy.java +++ b/src/main/java/org/xerial/snappy/pure/PureJavaSnappy.java @@ -1,26 +1,41 @@ package org.xerial.snappy.pure; -import org.xerial.snappy.SnappyApi; +import static org.xerial.snappy.pure.UnsafeUtil.getAddress; +import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET; import java.io.IOException; +import java.lang.ref.SoftReference; import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentLinkedDeque; -import static org.xerial.snappy.pure.UnsafeUtil.getAddress; -import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET; +import org.xerial.snappy.SnappyApi; /** * A pure-java Snappy implementation using https://github.com/airlift/aircompressor */ public class PureJavaSnappy implements SnappyApi { - private final short[] table = new short[SnappyRawCompressor.MAX_HASH_TABLE_SIZE]; + /** + * Using a {@link ConcurrentLinkedDeque}, with values constantly popped and pushed from the head, leads to the fewest + * {@code short[]} instances remaining live over time. + */ + private final static ConcurrentLinkedDeque> CACHED_TABLES = new ConcurrentLinkedDeque<>(); + private final static int MAX_OUTPUT_LENGTH = Integer.MAX_VALUE; @Override public long rawCompress(long inputAddr, long inputSize, long destAddr) throws IOException { - return SnappyRawCompressor.compress(null, inputAddr, inputSize, null, destAddr, MAX_OUTPUT_LENGTH, table); + final short[] table = getTable(); + try + { + return SnappyRawCompressor.compress(null, inputAddr, inputSize, null, destAddr, MAX_OUTPUT_LENGTH, table); + } + finally + { + returnTable(table); + } } @Override @@ -76,16 +91,24 @@ else if (compressed.hasArray()) { // collected in a block, and technically, the JVM is allowed to eliminate these locks. synchronized (input) { synchronized (compressed) { - int written = SnappyRawCompressor.compress( - inputBase, - inputAddress, - inputLimit, - outputBase, - outputAddress, - outputLimit, - table); - compressed.position(compressed.position() + written); - return written; + final short[] table = getTable(); + try + { + int written = SnappyRawCompressor.compress( + inputBase, + inputAddress, + inputLimit, + outputBase, + outputAddress, + outputLimit, + table); + compressed.position(compressed.position() + written); + return written; + } + finally + { + returnTable(table); + } } } } @@ -99,7 +122,15 @@ public int rawCompress(Object input, int inputOffset, int inputByteLength, Objec long outputAddress = ARRAY_BYTE_BASE_OFFSET + outputOffset; long outputLimit = outputAddress + MAX_OUTPUT_LENGTH; - return SnappyRawCompressor.compress(input, inputAddress, inputLimit, output, outputAddress, outputLimit, table); + final short[] table = getTable(); + try + { + return SnappyRawCompressor.compress(input, inputAddress, inputLimit, output, outputAddress, outputLimit, table); + } + finally + { + returnTable(table); + } } @Override @@ -241,4 +272,38 @@ public void arrayCopy(Object src, int offset, int byteLength, Object dest, int d { System.arraycopy(src, offset, dest, dOffset, byteLength); } + + private static short[] getTable() + { + SoftReference existingRef; + while((existingRef = CACHED_TABLES.poll()) != null) + { + short[] table = existingRef.get(); + if (table != null) + { + //purge oldest entries have lost references + SoftReference entry; + boolean lastEmpty = true; + while (lastEmpty && (entry = CACHED_TABLES.peekLast()) != null) + { + if (entry.get() == null) + { + CACHED_TABLES.removeLastOccurrence(entry); + } + else + { + lastEmpty = false; + } + } + + return table; + } + } + return new short[SnappyRawCompressor.MAX_HASH_TABLE_SIZE]; + } + + private static void returnTable(short[] table) + { + CACHED_TABLES.addFirst(new SoftReference(table)); + } }