Skip to content

Commit

Permalink
Pool table instances to make PureJavaSnappy thread safe (#271)
Browse files Browse the repository at this point in the history
#270

Co-authored-by: BO8979 <BO8979@W1971362.northamerica.cerner.net>
  • Loading branch information
bokken and BO8979 committed Jan 20, 2021
1 parent 7f47cf7 commit 110727e
Showing 1 changed file with 81 additions and 16 deletions.
97 changes: 81 additions & 16 deletions src/main/java/org/xerial/snappy/pure/PureJavaSnappy.java
@@ -1,26 +1,41 @@
package org.xerial.snappy.pure;

import org.xerial.snappy.SnappyApi;
import static org.xerial.snappy.pure.UnsafeUtil.getAddress;
import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET;

import java.io.IOException;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedDeque;

import static org.xerial.snappy.pure.UnsafeUtil.getAddress;
import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET;
import org.xerial.snappy.SnappyApi;

/**
* A pure-java Snappy implementation using https://github.com/airlift/aircompressor
*/
public class PureJavaSnappy implements SnappyApi
{
private final short[] table = new short[SnappyRawCompressor.MAX_HASH_TABLE_SIZE];
/**
* Using a {@link ConcurrentLinkedDeque}, with values constantly popped and pushed from the head, leads to the fewest
* {@code short[]} instances remaining live over time.
*/
private final static ConcurrentLinkedDeque<SoftReference<short[]>> CACHED_TABLES = new ConcurrentLinkedDeque<>();

private final static int MAX_OUTPUT_LENGTH = Integer.MAX_VALUE;

@Override
public long rawCompress(long inputAddr, long inputSize, long destAddr)
throws IOException
{
return SnappyRawCompressor.compress(null, inputAddr, inputSize, null, destAddr, MAX_OUTPUT_LENGTH, table);
final short[] table = getTable();
try
{
return SnappyRawCompressor.compress(null, inputAddr, inputSize, null, destAddr, MAX_OUTPUT_LENGTH, table);
}
finally
{
returnTable(table);
}
}

@Override
Expand Down Expand Up @@ -76,16 +91,24 @@ else if (compressed.hasArray()) {
// collected in a block, and technically, the JVM is allowed to eliminate these locks.
synchronized (input) {
synchronized (compressed) {
int written = SnappyRawCompressor.compress(
inputBase,
inputAddress,
inputLimit,
outputBase,
outputAddress,
outputLimit,
table);
compressed.position(compressed.position() + written);
return written;
final short[] table = getTable();
try
{
int written = SnappyRawCompressor.compress(
inputBase,
inputAddress,
inputLimit,
outputBase,
outputAddress,
outputLimit,
table);
compressed.position(compressed.position() + written);
return written;
}
finally
{
returnTable(table);
}
}
}
}
Expand All @@ -99,7 +122,15 @@ public int rawCompress(Object input, int inputOffset, int inputByteLength, Objec
long outputAddress = ARRAY_BYTE_BASE_OFFSET + outputOffset;
long outputLimit = outputAddress + MAX_OUTPUT_LENGTH;

return SnappyRawCompressor.compress(input, inputAddress, inputLimit, output, outputAddress, outputLimit, table);
final short[] table = getTable();
try
{
return SnappyRawCompressor.compress(input, inputAddress, inputLimit, output, outputAddress, outputLimit, table);
}
finally
{
returnTable(table);
}
}

@Override
Expand Down Expand Up @@ -241,4 +272,38 @@ public void arrayCopy(Object src, int offset, int byteLength, Object dest, int d
{
System.arraycopy(src, offset, dest, dOffset, byteLength);
}

private static short[] getTable()
{
SoftReference<short[]> existingRef;
while((existingRef = CACHED_TABLES.poll()) != null)
{
short[] table = existingRef.get();
if (table != null)
{
//purge oldest entries have lost references
SoftReference<short[]> entry;
boolean lastEmpty = true;
while (lastEmpty && (entry = CACHED_TABLES.peekLast()) != null)
{
if (entry.get() == null)
{
CACHED_TABLES.removeLastOccurrence(entry);
}
else
{
lastEmpty = false;
}
}

return table;
}
}
return new short[SnappyRawCompressor.MAX_HASH_TABLE_SIZE];
}

private static void returnTable(short[] table)
{
CACHED_TABLES.addFirst(new SoftReference<short[]>(table));
}
}

0 comments on commit 110727e

Please sign in to comment.