diff --git a/src/main/java/org/xerial/snappy/PureJavaCrc32C.java b/src/main/java/org/xerial/snappy/PureJavaCrc32C.java index bed1f767..5991101a 100644 --- a/src/main/java/org/xerial/snappy/PureJavaCrc32C.java +++ b/src/main/java/org/xerial/snappy/PureJavaCrc32C.java @@ -48,8 +48,7 @@ public int getIntegerValue() /** {@inheritDoc} */ public long getValue() { - long ret = crc; - return (~ret) & 0xffffffffL; + return (~crc) & 0xffffffffL; } /** {@inheritDoc} */ diff --git a/src/main/java/org/xerial/snappy/SnappyFramed.java b/src/main/java/org/xerial/snappy/SnappyFramed.java index df8a82d5..a1de1479 100644 --- a/src/main/java/org/xerial/snappy/SnappyFramed.java +++ b/src/main/java/org/xerial/snappy/SnappyFramed.java @@ -1,125 +1,165 @@ -/* - * Created: Apr 12, 2013 - */ -package org.xerial.snappy; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; - -/** - * Constants and utilities for implementing x-snappy-framed. - * - * @author Brett Okken - * @since 1.1.0 - */ -final class SnappyFramed -{ - public static final int COMPRESSED_DATA_FLAG = 0x00; - - public static final int UNCOMPRESSED_DATA_FLAG = 0x01; - - public static final int STREAM_IDENTIFIER_FLAG = 0xff; - - private static final int MASK_DELTA = 0xa282ead8; - - /** - * The header consists of the stream identifier flag, 3 bytes indicating a - * length of 6, and "sNaPpY" in ASCII. - */ - public static final byte[] HEADER_BYTES = new byte[] { - (byte) STREAM_IDENTIFIER_FLAG, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, - 0x50, 0x70, 0x59}; - - public static int maskedCrc32c(byte[] data) - { - return maskedCrc32c(data, 0, data.length); - } - - public static int maskedCrc32c(byte[] data, int offset, int length) - { - final PureJavaCrc32C crc32c = new PureJavaCrc32C(); - crc32c.update(data, offset, length); - return mask(crc32c.getIntegerValue()); - } - - /** - * Checksums are not stored directly, but masked, as checksumming data and - * then its own checksum can be problematic. The masking is the same as used - * in Apache Hadoop: Rotate the checksum by 15 bits, then add the constant - * 0xa282ead8 (using wraparound as normal for unsigned integers). This is - * equivalent to the following C code: - *

- *

-     * uint32_t mask_checksum(uint32_t x) {
-     *     return ((x >> 15) | (x << 17)) + 0xa282ead8;
-     * }
-     * 
- */ - public static int mask(int crc) - { - // Rotate right by 15 bits and add a constant. - return ((crc >>> 15) | (crc << 17)) + MASK_DELTA; - } - - static final int readBytes(ReadableByteChannel source, ByteBuffer dest) - throws IOException - { - // tells how many bytes to read. - final int expectedLength = dest.remaining(); - - int totalRead = 0; - - // how many bytes were read. - int lastRead = source.read(dest); - - totalRead = lastRead; - - // if we did not read as many bytes as we had hoped, try reading again. - if (lastRead < expectedLength) { - // as long the buffer is not full (remaining() == 0) and we have not reached EOF (lastRead == -1) keep reading. - while (dest.remaining() != 0 && lastRead != -1) { - lastRead = source.read(dest); - - // if we got EOF, do not add to total read. - if (lastRead != -1) { - totalRead += lastRead; - } - } - } - - if (totalRead > 0) { - dest.limit(dest.position()); - } - else { - dest.position(dest.limit()); - } - - return totalRead; - } - - static int skip(final ReadableByteChannel source, final int skip, final ByteBuffer buffer) - throws IOException - { - if (skip <= 0) { - return 0; - } - - int toSkip = skip; - int skipped = 0; - while (toSkip > 0 && skipped != -1) { - buffer.clear(); - if (toSkip < buffer.capacity()) { - buffer.limit(toSkip); - } - - skipped = source.read(buffer); - if (skipped > 0) { - toSkip -= skipped; - } - } - - buffer.clear(); - return skip - toSkip; - } -} +/* + * Created: Apr 12, 2013 + */ +package org.xerial.snappy; + +import java.io.IOException; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.zip.Checksum; + +/** + * Constants and utilities for implementing x-snappy-framed. + * + * @author Brett Okken + * @since 1.1.0 + */ +final class SnappyFramed +{ + public static final int COMPRESSED_DATA_FLAG = 0x00; + + public static final int UNCOMPRESSED_DATA_FLAG = 0x01; + + public static final int STREAM_IDENTIFIER_FLAG = 0xff; + + private static final int MASK_DELTA = 0xa282ead8; + + private static final Supplier CHECKSUM_SUPPLIER; + + static + { + Supplier supplier = null; + try + { + final Class crc32cClazz = Class.forName("java.util.zip.CRC32C"); + final MethodHandles.Lookup lookup = MethodHandles.publicLookup(); + + final MethodHandle conHandle = lookup.findConstructor(crc32cClazz, MethodType.methodType(void.class)) + .asType(MethodType.methodType(Checksum.class)); + supplier = () -> { + try + { + return (Checksum) conHandle.invokeExact(); + } + catch (Throwable e) + { + throw new IllegalStateException(e); + } + }; + } + catch(Throwable t) + { + Logger.getLogger(SnappyFramed.class.getName()) + .log(Level.FINE, "java.util.zip.CRC32C not loaded, using PureJavaCrc32C", t); + supplier = null; + } + + CHECKSUM_SUPPLIER = supplier != null ? supplier : PureJavaCrc32C::new; + } + + /** + * The header consists of the stream identifier flag, 3 bytes indicating a + * length of 6, and "sNaPpY" in ASCII. + */ + public static final byte[] HEADER_BYTES = new byte[] { + (byte) STREAM_IDENTIFIER_FLAG, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, + 0x50, 0x70, 0x59}; + + public static Checksum getCRC32C() + { + return CHECKSUM_SUPPLIER.get(); + } + + public static int maskedCrc32c(Checksum crc32c, byte[] data, int offset, int length) + { + crc32c.reset(); + crc32c.update(data, offset, length); + return mask((int) crc32c.getValue()); + } + + /** + * Checksums are not stored directly, but masked, as checksumming data and + * then its own checksum can be problematic. The masking is the same as used + * in Apache Hadoop: Rotate the checksum by 15 bits, then add the constant + * 0xa282ead8 (using wraparound as normal for unsigned integers). This is + * equivalent to the following C code: + *

+ *

+     * uint32_t mask_checksum(uint32_t x) {
+     *     return ((x >> 15) | (x << 17)) + 0xa282ead8;
+     * }
+     * 
+ */ + public static int mask(int crc) + { + // Rotate right by 15 bits and add a constant. + return ((crc >>> 15) | (crc << 17)) + MASK_DELTA; + } + + static final int readBytes(ReadableByteChannel source, ByteBuffer dest) + throws IOException + { + // tells how many bytes to read. + final int expectedLength = dest.remaining(); + + int totalRead = 0; + + // how many bytes were read. + int lastRead = source.read(dest); + + totalRead = lastRead; + + // if we did not read as many bytes as we had hoped, try reading again. + if (lastRead < expectedLength) { + // as long the buffer is not full (remaining() == 0) and we have not reached EOF (lastRead == -1) keep reading. + while (dest.remaining() != 0 && lastRead != -1) { + lastRead = source.read(dest); + + // if we got EOF, do not add to total read. + if (lastRead != -1) { + totalRead += lastRead; + } + } + } + + if (totalRead > 0) { + dest.limit(dest.position()); + } + else { + dest.position(dest.limit()); + } + + return totalRead; + } + + static int skip(final ReadableByteChannel source, final int skip, final ByteBuffer buffer) + throws IOException + { + if (skip <= 0) { + return 0; + } + + int toSkip = skip; + int skipped = 0; + while (toSkip > 0 && skipped != -1) { + buffer.clear(); + if (toSkip < buffer.capacity()) { + buffer.limit(toSkip); + } + + skipped = source.read(buffer); + if (skipped > 0) { + toSkip -= skipped; + } + } + + buffer.clear(); + return skip - toSkip; + } +} diff --git a/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java b/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java index d96b866d..83b4c3ca 100644 --- a/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java +++ b/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java @@ -1,683 +1,685 @@ -/* - * Created: Apr 15, 2013 - */ -package org.xerial.snappy; - -import static java.lang.Math.min; -import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG; -import static org.xerial.snappy.SnappyFramed.HEADER_BYTES; -import static org.xerial.snappy.SnappyFramed.STREAM_IDENTIFIER_FLAG; -import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG; -import static org.xerial.snappy.SnappyFramed.readBytes; -import static org.xerial.snappy.SnappyFramedOutputStream.MAX_BLOCK_SIZE; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; -import java.util.Arrays; - -import org.xerial.snappy.pool.BufferPool; -import org.xerial.snappy.pool.DefaultPoolFactory; - -/** - * Implements the x-snappy-framed as an {@link InputStream} and - * {@link ReadableByteChannel}. - * - * @author Brett Okken - * @since 1.1.0 - */ -public final class SnappyFramedInputStream - extends InputStream - implements - ReadableByteChannel -{ - - private final ReadableByteChannel rbc; - private final ByteBuffer frameHeader; - private final boolean verifyChecksums; - private final BufferPool bufferPool; - - /** - * A single frame read from the underlying {@link InputStream}. - */ - private ByteBuffer input; - - /** - * The decompressed data from {@link #input}. - */ - private ByteBuffer uncompressedDirect; - - /** - * Indicates if this instance has been closed. - */ - private boolean closed; - - /** - * Indicates if we have reached the EOF on {@link #input}. - */ - private boolean eof; - - /** - * The position in {@link #input} buffer to read to. - */ - private int valid; - - /** - * The next position to read from {@link #buffer}. - */ - private int position; - - /** - * Buffer contains a copy of the uncompressed data for the block. - */ - private byte[] buffer; - - /** - * Creates a Snappy input stream to read data from the specified underlying - * input stream. - *

- * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. - *

- * - * @param in the underlying input stream. Must not be {@code null}. - * @throws IOException - */ - public SnappyFramedInputStream(InputStream in) - throws IOException - { - this(in, true, DefaultPoolFactory.getDefaultPool()); - } - - /** - * Creates a Snappy input stream to read data from the specified underlying - * input stream. - * - * @param in the underlying input stream. Must not be {@code null}. - * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. - * @throws IOException - */ - public SnappyFramedInputStream(InputStream in, BufferPool bufferPool) - throws IOException - { - this(in, true, bufferPool); - } - - /** - * Creates a Snappy input stream to read data from the specified underlying - * input stream. - *

- * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. - *

- * - * @param in the underlying input stream. Must not be {@code null}. - * @param verifyChecksums if true, checksums in input stream will be verified - * @throws IOException - */ - public SnappyFramedInputStream(InputStream in, boolean verifyChecksums) - throws IOException - { - this(in, verifyChecksums, DefaultPoolFactory.getDefaultPool()); - } - - /** - * Creates a Snappy input stream to read data from the specified underlying - * input stream. - * - * @param in the underlying input stream. Must not be {@code null}. - * @param verifyChecksums if true, checksums in input stream will be verified - * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. - * @throws IOException - */ - public SnappyFramedInputStream(InputStream in, boolean verifyChecksums, - BufferPool bufferPool) - throws IOException - { - this(Channels.newChannel(in), verifyChecksums, bufferPool); - } - - /** - * Creates a Snappy input stream to read data from the specified underlying - * channel. - * - * @param in the underlying readable channel. Must not be {@code null}. - * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. - * @throws IOException - */ - public SnappyFramedInputStream(ReadableByteChannel in, BufferPool bufferPool) - throws IOException - { - this(in, true, bufferPool); - } - - /** - * Creates a Snappy input stream to read data from the specified underlying - * channel. - *

- * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. - *

- * - * @param in the underlying readable channel. Must not be {@code null}. - * @throws IOException - */ - public SnappyFramedInputStream(ReadableByteChannel in) - throws IOException - { - this(in, true); - } - - /** - * Creates a Snappy input stream to read data from the specified underlying - * channel. - *

- * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. - *

- * - * @param in the underlying readable channel. Must not be {@code null}. - * @param verifyChecksums if true, checksums in input stream will be verified - * @throws IOException - */ - public SnappyFramedInputStream(ReadableByteChannel in, - boolean verifyChecksums) - throws IOException - { - this(in, verifyChecksums, DefaultPoolFactory.getDefaultPool()); - } - - /** - * Creates a Snappy input stream to read data from the specified underlying - * channel. - * - * @param in the underlying readable channel. Must not be {@code null}. - * @param verifyChecksums if true, checksums in input stream will be verified - * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. - * @throws IOException - */ - public SnappyFramedInputStream(ReadableByteChannel in, - boolean verifyChecksums, BufferPool bufferPool) - throws IOException - { - if (in == null) { - throw new NullPointerException("in is null"); - } - - if (bufferPool == null) { - throw new NullPointerException("bufferPool is null"); - } - - this.bufferPool = bufferPool; - this.rbc = in; - this.verifyChecksums = verifyChecksums; - - allocateBuffersBasedOnSize(MAX_BLOCK_SIZE + 5); - this.frameHeader = ByteBuffer.allocate(4); - - // stream must begin with stream header - final byte[] expectedHeader = HEADER_BYTES; - final byte[] actualHeader = new byte[expectedHeader.length]; - final ByteBuffer actualBuffer = ByteBuffer.wrap(actualHeader); - - final int read = SnappyFramed.readBytes(in, actualBuffer); - if (read < expectedHeader.length) { - throw new EOFException( - "encountered EOF while reading stream header"); - } - if (!Arrays.equals(expectedHeader, actualHeader)) { - throw new IOException("invalid stream header"); - } - } - - /** - * @param size - */ - private void allocateBuffersBasedOnSize(int size) - { - if (input != null) { - bufferPool.releaseDirect(input); - } - - if (uncompressedDirect != null) { - bufferPool.releaseDirect(uncompressedDirect); - } - - if (buffer != null) { - bufferPool.releaseArray(buffer); - } - - input = bufferPool.allocateDirect(size); - final int maxCompressedLength = Snappy.maxCompressedLength(size); - uncompressedDirect = bufferPool.allocateDirect(maxCompressedLength); - buffer = bufferPool.allocateArray(maxCompressedLength); - } - - @Override - public int read() - throws IOException - { - if (closed) { - return -1; - } - if (!ensureBuffer()) { - return -1; - } - return buffer[position++] & 0xFF; - } - - @Override - public int read(byte[] output, int offset, int length) - throws IOException - { - - if (output == null) { - throw new IllegalArgumentException("output is null"); - } - - if (offset < 0 || length < 0 || offset + length > output.length) { - throw new IllegalArgumentException("invalid offset [" + offset - + "] and length [" + length + ']'); - } - - if (closed) { - throw new ClosedChannelException(); - } - - if (length == 0) { - return 0; - } - if (!ensureBuffer()) { - return -1; - } - - final int size = min(length, available()); - System.arraycopy(buffer, position, output, offset, size); - position += size; - return size; - } - - @Override - public int available() - throws IOException - { - if (closed) { - return 0; - } - return valid - position; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isOpen() - { - return !closed; - } - - /** - * {@inheritDoc} - */ - @Override - public int read(ByteBuffer dst) - throws IOException - { - - if (dst == null) { - throw new IllegalArgumentException("dst is null"); - } - - if (closed) { - throw new ClosedChannelException(); - } - - if (dst.remaining() == 0) { - return 0; - } - if (!ensureBuffer()) { - return -1; - } - - final int size = min(dst.remaining(), available()); - dst.put(buffer, position, size); - position += size; - return size; - } - - /** - * Transfers the entire content of this {@link InputStream} to os. - * This potentially limits the amount of buffering required to decompress - * content. - *

- * Unlike {@link #read(byte[], int, int)}, this method does not need to be - * called multiple times. A single call will transfer all available content. - * Any calls after the source has been exhausted will result in a return - * value of {@code 0}. - *

- * - * @param os The destination to write decompressed content to. - * @return The number of bytes transferred. - * @throws IOException - * @since 1.1.1 - */ - public long transferTo(OutputStream os) - throws IOException - { - if (os == null) { - throw new IllegalArgumentException("os is null"); - } - - if (closed) { - throw new ClosedChannelException(); - } - - long totTransfered = 0; - - while (ensureBuffer()) { - final int available = available(); - os.write(buffer, position, available); - position += available; - totTransfered += available; - } - - return totTransfered; - } - - /** - * Transfers the entire content of this {@link ReadableByteChannel} to - * wbc. This potentially limits the amount of buffering required to - * decompress content. - *

- *

- * Unlike {@link #read(ByteBuffer)}, this method does not need to be called - * multiple times. A single call will transfer all available content. Any - * calls after the source has been exhausted will result in a return value - * of {@code 0}. - *

- * - * @param wbc The destination to write decompressed content to. - * @return The number of bytes transferred. - * @throws IOException - * @since 1.1.1 - */ - public long transferTo(WritableByteChannel wbc) - throws IOException - { - if (wbc == null) { - throw new IllegalArgumentException("wbc is null"); - } - - if (closed) { - throw new ClosedChannelException(); - } - - final ByteBuffer bb = ByteBuffer.wrap(buffer); - - long totTransfered = 0; - - while (ensureBuffer()) { - bb.clear(); - bb.position(position); - bb.limit(position + available()); - - wbc.write(bb); - - final int written = bb.position() - position; - position += written; - - totTransfered += written; - } - - return totTransfered; - } - - @Override - public void close() - throws IOException - { - try { - rbc.close(); - } - finally { - if (!closed) { - closed = true; - - if (input != null) { - bufferPool.releaseDirect(input); - input = null; - } - - if (uncompressedDirect != null) { - bufferPool.releaseDirect(uncompressedDirect); - uncompressedDirect = null; - } - - if (buffer != null) { - bufferPool.releaseArray(buffer); - buffer = null; - } - } - } - } - - static enum FrameAction - { - RAW, SKIP, UNCOMPRESS; - } - - public static final class FrameMetaData - { - final int length; - final FrameAction frameAction; - - /** - * @param frameAction - * @param length - */ - public FrameMetaData(FrameAction frameAction, int length) - { - super(); - this.frameAction = frameAction; - this.length = length; - } - } - - public static final class FrameData - { - final int checkSum; - final int offset; - - /** - * @param checkSum - * @param offset - */ - public FrameData(int checkSum, int offset) - { - super(); - this.checkSum = checkSum; - this.offset = offset; - } - } - - private boolean ensureBuffer() - throws IOException - { - if (available() > 0) { - return true; - } - if (eof) { - return false; - } - - if (!readBlockHeader()) { - eof = true; - return false; - } - - // get action based on header - final FrameMetaData frameMetaData = getFrameMetaData(frameHeader); - - if (FrameAction.SKIP == frameMetaData.frameAction) { - SnappyFramed.skip(rbc, frameMetaData.length, - ByteBuffer.wrap(buffer)); - return ensureBuffer(); - } - - if (frameMetaData.length > input.capacity()) { - allocateBuffersBasedOnSize(frameMetaData.length); - } - - input.clear(); - input.limit(frameMetaData.length); - - final int actualRead = readBytes(rbc, input); - if (actualRead != frameMetaData.length) { - throw new EOFException("unexpectd EOF when reading frame"); - } - input.flip(); - - final FrameData frameData = getFrameData(input); - - if (FrameAction.UNCOMPRESS == frameMetaData.frameAction) { - - input.position(frameData.offset); - - final int uncompressedLength = Snappy.uncompressedLength(input); - - if (uncompressedLength > uncompressedDirect.capacity()) { - bufferPool.releaseDirect(uncompressedDirect); - bufferPool.releaseArray(buffer); - uncompressedDirect = bufferPool.allocateDirect(uncompressedLength); - buffer = bufferPool.allocateArray(uncompressedLength); - } - - uncompressedDirect.clear(); - - this.valid = Snappy.uncompress(input, uncompressedDirect); - - uncompressedDirect.get(buffer, 0, valid); - this.position = 0; - } - else { - // we need to start reading at the offset - input.position(frameData.offset); - this.position = 0; - this.valid = input.remaining(); - this.input.get(buffer, 0, input.remaining()); - } - - if (verifyChecksums) { - final int actualCrc32c = SnappyFramed.maskedCrc32c(buffer, - position, valid - position); - if (frameData.checkSum != actualCrc32c) { - throw new IOException("Corrupt input: invalid checksum"); - } - } - - return true; - } - - private boolean readBlockHeader() - throws IOException - { - frameHeader.clear(); - int read = readBytes(rbc, frameHeader); - - if (read == -1) { - return false; - } - - if (read < frameHeader.capacity()) { - throw new EOFException("encountered EOF while reading block header"); - } - frameHeader.flip(); - - return true; - } - - /** - * @param frameHeader - * @return - * @throws IOException - */ - private FrameMetaData getFrameMetaData(ByteBuffer frameHeader) - throws IOException - { - - assert frameHeader.hasArray(); - - final byte[] frameHeaderArray = frameHeader.array(); - - int length = (frameHeaderArray[1] & 0xFF); - length |= (frameHeaderArray[2] & 0xFF) << 8; - length |= (frameHeaderArray[3] & 0xFF) << 16; - - int minLength = 0; - final FrameAction frameAction; - final int flag = frameHeaderArray[0] & 0xFF; - switch (flag) { - case COMPRESSED_DATA_FLAG: - frameAction = FrameAction.UNCOMPRESS; - minLength = 5; - break; - case UNCOMPRESSED_DATA_FLAG: - frameAction = FrameAction.RAW; - minLength = 5; - break; - case STREAM_IDENTIFIER_FLAG: - if (length != 6) { - throw new IOException( - "stream identifier chunk with invalid length: " - + length); - } - frameAction = FrameAction.SKIP; - minLength = 6; - break; - default: - // Reserved unskippable chunks (chunk types 0x02-0x7f) - if (flag <= 0x7f) { - throw new IOException("unsupported unskippable chunk: " - + Integer.toHexString(flag)); - } - - // all that is left is Reserved skippable chunks (chunk types - // 0x80-0xfe) - frameAction = FrameAction.SKIP; - minLength = 0; - } - - if (length < minLength) { - throw new IOException("invalid length: " + length - + " for chunk flag: " + Integer.toHexString(flag)); - } - - return new FrameMetaData(frameAction, length); - } - - /** - * @param content - * @return - * @throws IOException - */ - private FrameData getFrameData(ByteBuffer content) - throws IOException - { - return new FrameData(getCrc32c(content), 4); - } - - private int getCrc32c(ByteBuffer content) - { - - final int position = content.position(); - - return ((content.get(position + 3) & 0xFF) << 24) - | ((content.get(position + 2) & 0xFF) << 16) - | ((content.get(position + 1) & 0xFF) << 8) - | (content.get(position) & 0xFF); - } -} +/* + * Created: Apr 15, 2013 + */ +package org.xerial.snappy; + +import static java.lang.Math.min; +import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG; +import static org.xerial.snappy.SnappyFramed.HEADER_BYTES; +import static org.xerial.snappy.SnappyFramed.STREAM_IDENTIFIER_FLAG; +import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG; +import static org.xerial.snappy.SnappyFramed.readBytes; +import static org.xerial.snappy.SnappyFramedOutputStream.MAX_BLOCK_SIZE; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Arrays; +import java.util.zip.Checksum; + +import org.xerial.snappy.pool.BufferPool; +import org.xerial.snappy.pool.DefaultPoolFactory; + +/** + * Implements the x-snappy-framed as an {@link InputStream} and + * {@link ReadableByteChannel}. + * + * @author Brett Okken + * @since 1.1.0 + */ +public final class SnappyFramedInputStream + extends InputStream + implements + ReadableByteChannel +{ + + private final Checksum crc32 = SnappyFramed.getCRC32C(); + private final ReadableByteChannel rbc; + private final ByteBuffer frameHeader; + private final boolean verifyChecksums; + private final BufferPool bufferPool; + + /** + * A single frame read from the underlying {@link InputStream}. + */ + private ByteBuffer input; + + /** + * The decompressed data from {@link #input}. + */ + private ByteBuffer uncompressedDirect; + + /** + * Indicates if this instance has been closed. + */ + private boolean closed; + + /** + * Indicates if we have reached the EOF on {@link #input}. + */ + private boolean eof; + + /** + * The position in {@link #input} buffer to read to. + */ + private int valid; + + /** + * The next position to read from {@link #buffer}. + */ + private int position; + + /** + * Buffer contains a copy of the uncompressed data for the block. + */ + private byte[] buffer; + + /** + * Creates a Snappy input stream to read data from the specified underlying + * input stream. + *

+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. + *

+ * + * @param in the underlying input stream. Must not be {@code null}. + * @throws IOException + */ + public SnappyFramedInputStream(InputStream in) + throws IOException + { + this(in, true, DefaultPoolFactory.getDefaultPool()); + } + + /** + * Creates a Snappy input stream to read data from the specified underlying + * input stream. + * + * @param in the underlying input stream. Must not be {@code null}. + * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. + * @throws IOException + */ + public SnappyFramedInputStream(InputStream in, BufferPool bufferPool) + throws IOException + { + this(in, true, bufferPool); + } + + /** + * Creates a Snappy input stream to read data from the specified underlying + * input stream. + *

+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. + *

+ * + * @param in the underlying input stream. Must not be {@code null}. + * @param verifyChecksums if true, checksums in input stream will be verified + * @throws IOException + */ + public SnappyFramedInputStream(InputStream in, boolean verifyChecksums) + throws IOException + { + this(in, verifyChecksums, DefaultPoolFactory.getDefaultPool()); + } + + /** + * Creates a Snappy input stream to read data from the specified underlying + * input stream. + * + * @param in the underlying input stream. Must not be {@code null}. + * @param verifyChecksums if true, checksums in input stream will be verified + * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. + * @throws IOException + */ + public SnappyFramedInputStream(InputStream in, boolean verifyChecksums, + BufferPool bufferPool) + throws IOException + { + this(Channels.newChannel(in), verifyChecksums, bufferPool); + } + + /** + * Creates a Snappy input stream to read data from the specified underlying + * channel. + * + * @param in the underlying readable channel. Must not be {@code null}. + * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. + * @throws IOException + */ + public SnappyFramedInputStream(ReadableByteChannel in, BufferPool bufferPool) + throws IOException + { + this(in, true, bufferPool); + } + + /** + * Creates a Snappy input stream to read data from the specified underlying + * channel. + *

+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. + *

+ * + * @param in the underlying readable channel. Must not be {@code null}. + * @throws IOException + */ + public SnappyFramedInputStream(ReadableByteChannel in) + throws IOException + { + this(in, true); + } + + /** + * Creates a Snappy input stream to read data from the specified underlying + * channel. + *

+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. + *

+ * + * @param in the underlying readable channel. Must not be {@code null}. + * @param verifyChecksums if true, checksums in input stream will be verified + * @throws IOException + */ + public SnappyFramedInputStream(ReadableByteChannel in, + boolean verifyChecksums) + throws IOException + { + this(in, verifyChecksums, DefaultPoolFactory.getDefaultPool()); + } + + /** + * Creates a Snappy input stream to read data from the specified underlying + * channel. + * + * @param in the underlying readable channel. Must not be {@code null}. + * @param verifyChecksums if true, checksums in input stream will be verified + * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. + * @throws IOException + */ + public SnappyFramedInputStream(ReadableByteChannel in, + boolean verifyChecksums, BufferPool bufferPool) + throws IOException + { + if (in == null) { + throw new NullPointerException("in is null"); + } + + if (bufferPool == null) { + throw new NullPointerException("bufferPool is null"); + } + + this.bufferPool = bufferPool; + this.rbc = in; + this.verifyChecksums = verifyChecksums; + + allocateBuffersBasedOnSize(MAX_BLOCK_SIZE + 5); + this.frameHeader = ByteBuffer.allocate(4); + + // stream must begin with stream header + final byte[] expectedHeader = HEADER_BYTES; + final byte[] actualHeader = new byte[expectedHeader.length]; + final ByteBuffer actualBuffer = ByteBuffer.wrap(actualHeader); + + final int read = SnappyFramed.readBytes(in, actualBuffer); + if (read < expectedHeader.length) { + throw new EOFException( + "encountered EOF while reading stream header"); + } + if (!Arrays.equals(expectedHeader, actualHeader)) { + throw new IOException("invalid stream header"); + } + } + + /** + * @param size + */ + private void allocateBuffersBasedOnSize(int size) + { + if (input != null) { + bufferPool.releaseDirect(input); + } + + if (uncompressedDirect != null) { + bufferPool.releaseDirect(uncompressedDirect); + } + + if (buffer != null) { + bufferPool.releaseArray(buffer); + } + + input = bufferPool.allocateDirect(size); + final int maxCompressedLength = Snappy.maxCompressedLength(size); + uncompressedDirect = bufferPool.allocateDirect(maxCompressedLength); + buffer = bufferPool.allocateArray(maxCompressedLength); + } + + @Override + public int read() + throws IOException + { + if (closed) { + return -1; + } + if (!ensureBuffer()) { + return -1; + } + return buffer[position++] & 0xFF; + } + + @Override + public int read(byte[] output, int offset, int length) + throws IOException + { + + if (output == null) { + throw new IllegalArgumentException("output is null"); + } + + if (offset < 0 || length < 0 || offset + length > output.length) { + throw new IllegalArgumentException("invalid offset [" + offset + + "] and length [" + length + ']'); + } + + if (closed) { + throw new ClosedChannelException(); + } + + if (length == 0) { + return 0; + } + if (!ensureBuffer()) { + return -1; + } + + final int size = min(length, available()); + System.arraycopy(buffer, position, output, offset, size); + position += size; + return size; + } + + @Override + public int available() + throws IOException + { + if (closed) { + return 0; + } + return valid - position; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isOpen() + { + return !closed; + } + + /** + * {@inheritDoc} + */ + @Override + public int read(ByteBuffer dst) + throws IOException + { + + if (dst == null) { + throw new IllegalArgumentException("dst is null"); + } + + if (closed) { + throw new ClosedChannelException(); + } + + if (dst.remaining() == 0) { + return 0; + } + if (!ensureBuffer()) { + return -1; + } + + final int size = min(dst.remaining(), available()); + dst.put(buffer, position, size); + position += size; + return size; + } + + /** + * Transfers the entire content of this {@link InputStream} to os. + * This potentially limits the amount of buffering required to decompress + * content. + *

+ * Unlike {@link #read(byte[], int, int)}, this method does not need to be + * called multiple times. A single call will transfer all available content. + * Any calls after the source has been exhausted will result in a return + * value of {@code 0}. + *

+ * + * @param os The destination to write decompressed content to. + * @return The number of bytes transferred. + * @throws IOException + * @since 1.1.1 + */ + public long transferTo(OutputStream os) + throws IOException + { + if (os == null) { + throw new IllegalArgumentException("os is null"); + } + + if (closed) { + throw new ClosedChannelException(); + } + + long totTransfered = 0; + + while (ensureBuffer()) { + final int available = available(); + os.write(buffer, position, available); + position += available; + totTransfered += available; + } + + return totTransfered; + } + + /** + * Transfers the entire content of this {@link ReadableByteChannel} to + * wbc. This potentially limits the amount of buffering required to + * decompress content. + *

+ *

+ * Unlike {@link #read(ByteBuffer)}, this method does not need to be called + * multiple times. A single call will transfer all available content. Any + * calls after the source has been exhausted will result in a return value + * of {@code 0}. + *

+ * + * @param wbc The destination to write decompressed content to. + * @return The number of bytes transferred. + * @throws IOException + * @since 1.1.1 + */ + public long transferTo(WritableByteChannel wbc) + throws IOException + { + if (wbc == null) { + throw new IllegalArgumentException("wbc is null"); + } + + if (closed) { + throw new ClosedChannelException(); + } + + final ByteBuffer bb = ByteBuffer.wrap(buffer); + + long totTransfered = 0; + + while (ensureBuffer()) { + bb.clear(); + bb.position(position); + bb.limit(position + available()); + + wbc.write(bb); + + final int written = bb.position() - position; + position += written; + + totTransfered += written; + } + + return totTransfered; + } + + @Override + public void close() + throws IOException + { + try { + rbc.close(); + } + finally { + if (!closed) { + closed = true; + + if (input != null) { + bufferPool.releaseDirect(input); + input = null; + } + + if (uncompressedDirect != null) { + bufferPool.releaseDirect(uncompressedDirect); + uncompressedDirect = null; + } + + if (buffer != null) { + bufferPool.releaseArray(buffer); + buffer = null; + } + } + } + } + + static enum FrameAction + { + RAW, SKIP, UNCOMPRESS; + } + + public static final class FrameMetaData + { + final int length; + final FrameAction frameAction; + + /** + * @param frameAction + * @param length + */ + public FrameMetaData(FrameAction frameAction, int length) + { + super(); + this.frameAction = frameAction; + this.length = length; + } + } + + public static final class FrameData + { + final int checkSum; + final int offset; + + /** + * @param checkSum + * @param offset + */ + public FrameData(int checkSum, int offset) + { + super(); + this.checkSum = checkSum; + this.offset = offset; + } + } + + private boolean ensureBuffer() + throws IOException + { + if (available() > 0) { + return true; + } + if (eof) { + return false; + } + + if (!readBlockHeader()) { + eof = true; + return false; + } + + // get action based on header + final FrameMetaData frameMetaData = getFrameMetaData(frameHeader); + + if (FrameAction.SKIP == frameMetaData.frameAction) { + SnappyFramed.skip(rbc, frameMetaData.length, + ByteBuffer.wrap(buffer)); + return ensureBuffer(); + } + + if (frameMetaData.length > input.capacity()) { + allocateBuffersBasedOnSize(frameMetaData.length); + } + + input.clear(); + input.limit(frameMetaData.length); + + final int actualRead = readBytes(rbc, input); + if (actualRead != frameMetaData.length) { + throw new EOFException("unexpectd EOF when reading frame"); + } + input.flip(); + + final FrameData frameData = getFrameData(input); + + if (FrameAction.UNCOMPRESS == frameMetaData.frameAction) { + + input.position(frameData.offset); + + final int uncompressedLength = Snappy.uncompressedLength(input); + + if (uncompressedLength > uncompressedDirect.capacity()) { + bufferPool.releaseDirect(uncompressedDirect); + bufferPool.releaseArray(buffer); + uncompressedDirect = bufferPool.allocateDirect(uncompressedLength); + buffer = bufferPool.allocateArray(uncompressedLength); + } + + uncompressedDirect.clear(); + + this.valid = Snappy.uncompress(input, uncompressedDirect); + + uncompressedDirect.get(buffer, 0, valid); + this.position = 0; + } + else { + // we need to start reading at the offset + input.position(frameData.offset); + this.position = 0; + this.valid = input.remaining(); + this.input.get(buffer, 0, input.remaining()); + } + + if (verifyChecksums) { + final int actualCrc32c = SnappyFramed.maskedCrc32c(crc32, buffer, + position, valid - position); + if (frameData.checkSum != actualCrc32c) { + throw new IOException("Corrupt input: invalid checksum"); + } + } + + return true; + } + + private boolean readBlockHeader() + throws IOException + { + frameHeader.clear(); + int read = readBytes(rbc, frameHeader); + + if (read == -1) { + return false; + } + + if (read < frameHeader.capacity()) { + throw new EOFException("encountered EOF while reading block header"); + } + frameHeader.flip(); + + return true; + } + + /** + * @param frameHeader + * @return + * @throws IOException + */ + private FrameMetaData getFrameMetaData(ByteBuffer frameHeader) + throws IOException + { + + assert frameHeader.hasArray(); + + final byte[] frameHeaderArray = frameHeader.array(); + + int length = (frameHeaderArray[1] & 0xFF); + length |= (frameHeaderArray[2] & 0xFF) << 8; + length |= (frameHeaderArray[3] & 0xFF) << 16; + + int minLength = 0; + final FrameAction frameAction; + final int flag = frameHeaderArray[0] & 0xFF; + switch (flag) { + case COMPRESSED_DATA_FLAG: + frameAction = FrameAction.UNCOMPRESS; + minLength = 5; + break; + case UNCOMPRESSED_DATA_FLAG: + frameAction = FrameAction.RAW; + minLength = 5; + break; + case STREAM_IDENTIFIER_FLAG: + if (length != 6) { + throw new IOException( + "stream identifier chunk with invalid length: " + + length); + } + frameAction = FrameAction.SKIP; + minLength = 6; + break; + default: + // Reserved unskippable chunks (chunk types 0x02-0x7f) + if (flag <= 0x7f) { + throw new IOException("unsupported unskippable chunk: " + + Integer.toHexString(flag)); + } + + // all that is left is Reserved skippable chunks (chunk types + // 0x80-0xfe) + frameAction = FrameAction.SKIP; + minLength = 0; + } + + if (length < minLength) { + throw new IOException("invalid length: " + length + + " for chunk flag: " + Integer.toHexString(flag)); + } + + return new FrameMetaData(frameAction, length); + } + + /** + * @param content + * @return + * @throws IOException + */ + private FrameData getFrameData(ByteBuffer content) + throws IOException + { + return new FrameData(getCrc32c(content), 4); + } + + private int getCrc32c(ByteBuffer content) + { + + final int position = content.position(); + + return ((content.get(position + 3) & 0xFF) << 24) + | ((content.get(position + 2) & 0xFF) << 16) + | ((content.get(position + 1) & 0xFF) << 8) + | (content.get(position) & 0xFF); + } +} diff --git a/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java b/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java index 6797b6c4..f35c1afb 100644 --- a/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java +++ b/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java @@ -1,568 +1,570 @@ -/* - * Created: Apr 12, 2013 - */ -package org.xerial.snappy; - -import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG; -import static org.xerial.snappy.SnappyFramed.HEADER_BYTES; -import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG; -import static org.xerial.snappy.SnappyFramed.maskedCrc32c; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.channels.Channels; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; - -import org.xerial.snappy.pool.BufferPool; -import org.xerial.snappy.pool.DefaultPoolFactory; - -/** - * Implements the x-snappy-framed as an {@link OutputStream} and - * {@link WritableByteChannel}. - * - * @author Brett Okken - * @since 1.1.0 - */ -public final class SnappyFramedOutputStream - extends OutputStream - implements - WritableByteChannel -{ - - /** - * The x-snappy-framed specification allows for a chunk size up to - * 16,777,211 bytes in length. However, it also goes on to state: - *

- * - * We place an additional restriction that the uncompressed data in a chunk - * must be no longer than 65536 bytes. This allows consumers to easily use - * small fixed-size buffers. - * - *

- */ - public static final int MAX_BLOCK_SIZE = 64 * 1024; - - /** - * The default block size to use. - */ - public static final int DEFAULT_BLOCK_SIZE = MAX_BLOCK_SIZE; - - /** - * The default min compression ratio to use. - */ - public static final double DEFAULT_MIN_COMPRESSION_RATIO = 0.85d; - - private final ByteBuffer headerBuffer = ByteBuffer.allocate(8).order( - ByteOrder.LITTLE_ENDIAN); - private final BufferPool bufferPool; - private final int blockSize; - private final ByteBuffer buffer; - private final ByteBuffer directInputBuffer; - private final ByteBuffer outputBuffer; - private final double minCompressionRatio; - - private final WritableByteChannel out; - - // private int position; - private boolean closed; - - /** - * Creates a new {@link SnappyFramedOutputStream} using the {@link #DEFAULT_BLOCK_SIZE} - * and {@link #DEFAULT_MIN_COMPRESSION_RATIO}. - *

- * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. - *

- * - * @param out The underlying {@link OutputStream} to write to. Must not be - * {@code null}. - * @throws IOException - */ - public SnappyFramedOutputStream(OutputStream out) - throws IOException - { - this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, DefaultPoolFactory.getDefaultPool()); - } - - /** - * Creates a new {@link SnappyFramedOutputStream} using the {@link #DEFAULT_BLOCK_SIZE} - * and {@link #DEFAULT_MIN_COMPRESSION_RATIO}. - * - * @param out The underlying {@link OutputStream} to write to. Must not be - * {@code null}. - * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. - * @throws IOException - */ - public SnappyFramedOutputStream(OutputStream out, BufferPool bufferPool) - throws IOException - { - this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, bufferPool); - } - - /** - * Creates a new {@link SnappyFramedOutputStream} instance. - *

- * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. - *

- * - * @param out The underlying {@link OutputStream} to write to. Must not be - * {@code null}. - * @param blockSize The block size (of raw data) to compress before writing frames - * to out. Must be in (0, 65536]. - * @param minCompressionRatio Defines the minimum compression ratio ( - * {@code compressedLength / rawLength}) that must be achieved to - * write the compressed data. This must be in (0, 1.0]. - * @throws IOException - */ - public SnappyFramedOutputStream(OutputStream out, int blockSize, - double minCompressionRatio) - throws IOException - { - this(Channels.newChannel(out), blockSize, minCompressionRatio, DefaultPoolFactory.getDefaultPool()); - } - - /** - * Creates a new {@link SnappyFramedOutputStream} instance. - * - * @param out The underlying {@link OutputStream} to write to. Must not be - * {@code null}. - * @param blockSize The block size (of raw data) to compress before writing frames - * to out. Must be in (0, 65536]. - * @param minCompressionRatio Defines the minimum compression ratio ( - * {@code compressedLength / rawLength}) that must be achieved to - * write the compressed data. This must be in (0, 1.0]. - * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. - * @throws IOException - */ - public SnappyFramedOutputStream(OutputStream out, int blockSize, - double minCompressionRatio, BufferPool bufferPool) - throws IOException - { - this(Channels.newChannel(out), blockSize, minCompressionRatio, bufferPool); - } - - /** - * Creates a new {@link SnappyFramedOutputStream} using the - * {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}. - *

- * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. - *

- * - * @param out The underlying {@link WritableByteChannel} to write to. Must - * not be {@code null}. - * @throws IOException - * @since 1.1.1 - */ - public SnappyFramedOutputStream(WritableByteChannel out) - throws IOException - { - this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, DefaultPoolFactory.getDefaultPool()); - } - - /** - * Creates a new {@link SnappyFramedOutputStream} using the - * {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}. - *

- * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. - *

- * - * @param out The underlying {@link WritableByteChannel} to write to. Must - * not be {@code null}. - * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. - * @throws IOException - */ - public SnappyFramedOutputStream(WritableByteChannel out, BufferPool bufferPool) - throws IOException - { - this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, bufferPool); - } - - /** - * Creates a new {@link SnappyFramedOutputStream} instance. - * - * @param out The underlying {@link WritableByteChannel} to write to. Must - * not be {@code null}. - * @param blockSize The block size (of raw data) to compress before writing frames - * to out. Must be in (0, 65536]. - * @param minCompressionRatio Defines the minimum compression ratio ( - * {@code compressedLength / rawLength}) that must be achieved to - * write the compressed data. This must be in (0, 1.0]. - * @throws IOException - * @since 1.1.1 - */ - public SnappyFramedOutputStream(WritableByteChannel out, int blockSize, - double minCompressionRatio) - throws IOException - { - this(out, blockSize, minCompressionRatio, DefaultPoolFactory.getDefaultPool()); - } - - /** - * Creates a new {@link SnappyFramedOutputStream} instance. - * - * @param out The underlying {@link WritableByteChannel} to write to. Must - * not be {@code null}. - * @param blockSize The block size (of raw data) to compress before writing frames - * to out. Must be in (0, 65536]. - * @param minCompressionRatio Defines the minimum compression ratio ( - * {@code compressedLength / rawLength}) that must be achieved to - * write the compressed data. This must be in (0, 1.0]. - * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. - * @throws IOException - * @since 1.1.1 - */ - public SnappyFramedOutputStream(WritableByteChannel out, int blockSize, - double minCompressionRatio, BufferPool bufferPool) - throws IOException - { - if (out == null) { - throw new NullPointerException("out is null"); - } - - if (bufferPool == null) { - throw new NullPointerException("buffer pool is null"); - } - - if (minCompressionRatio <= 0 || minCompressionRatio > 1.0) { - throw new IllegalArgumentException("minCompressionRatio " - + minCompressionRatio + " must be in (0,1.0]"); - } - - if (blockSize <= 0 || blockSize > MAX_BLOCK_SIZE) { - throw new IllegalArgumentException("block size " + blockSize - + " must be in (0, 65536]"); - } - this.blockSize = blockSize; - this.out = out; - this.minCompressionRatio = minCompressionRatio; - - this.bufferPool = bufferPool; - buffer = ByteBuffer.wrap(bufferPool.allocateArray(blockSize), 0, blockSize); - directInputBuffer = bufferPool.allocateDirect(blockSize); - outputBuffer = bufferPool.allocateDirect(Snappy - .maxCompressedLength(blockSize)); - - writeHeader(out); - } - - /** - * Writes the implementation specific header or "marker bytes" to - * out. - * - * @param out The underlying {@link OutputStream}. - * @throws IOException - */ - private void writeHeader(WritableByteChannel out) - throws IOException - { - out.write(ByteBuffer.wrap(HEADER_BYTES)); - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isOpen() - { - return !closed; - } - - @Override - public void write(int b) - throws IOException - { - if (closed) { - throw new IOException("Stream is closed"); - } - if (buffer.remaining() <= 0) { - flushBuffer(); - } - buffer.put((byte) b); - } - - @Override - public void write(byte[] input, int offset, int length) - throws IOException - { - if (closed) { - throw new IOException("Stream is closed"); - } - - if (input == null) { - throw new NullPointerException(); - } - else if ((offset < 0) || (offset > input.length) || (length < 0) - || ((offset + length) > input.length) - || ((offset + length) < 0)) { - throw new IndexOutOfBoundsException(); - } - - while (length > 0) { - if (buffer.remaining() <= 0) { - flushBuffer(); - } - - final int toPut = Math.min(length, buffer.remaining()); - buffer.put(input, offset, toPut); - offset += toPut; - length -= toPut; - } - } - - /** - * {@inheritDoc} - */ - @Override - public int write(ByteBuffer src) - throws IOException - { - if (closed) { - throw new ClosedChannelException(); - } - - if (buffer.remaining() <= 0) { - flushBuffer(); - } - - final int srcLength = src.remaining(); - - // easy case: enough free space in buffer for entire input - if (buffer.remaining() >= src.remaining()) { - buffer.put(src); - return srcLength; - } - - // store current limit - final int srcEnd = src.position() + src.remaining(); - - while ((src.position() + buffer.remaining()) <= srcEnd) { - // fill partial buffer as much as possible and flush - src.limit(src.position() + buffer.remaining()); - buffer.put(src); - flushBuffer(); - } - - // reset original limit - src.limit(srcEnd); - - // copy remaining partial block into now-empty buffer - buffer.put(src); - - return srcLength; - } - - /** - * Transfers all the content from is to this {@link OutputStream}. - * This potentially limits the amount of buffering required to compress - * content. - * - * @param is The source of data to compress. - * @return The number of bytes read from is. - * @throws IOException - * @since 1.1.1 - */ - public long transferFrom(InputStream is) - throws IOException - { - if (closed) { - throw new ClosedChannelException(); - } - - if (is == null) { - throw new NullPointerException(); - } - - if (buffer.remaining() == 0) { - flushBuffer(); - } - - assert buffer.hasArray(); - final byte[] bytes = buffer.array(); - - final int arrayOffset = buffer.arrayOffset(); - long totTransfered = 0; - int read; - while ((read = is.read(bytes, arrayOffset + buffer.position(), - buffer.remaining())) != -1) { - buffer.position(buffer.position() + read); - - if (buffer.remaining() == 0) { - flushBuffer(); - } - - totTransfered += read; - } - - return totTransfered; - } - - /** - * Transfers all the content from rbc to this - * {@link WritableByteChannel}. This potentially limits the amount of - * buffering required to compress content. - * - * @param rbc The source of data to compress. - * @return The number of bytes read from rbc. - * @throws IOException - * @since 1.1.1 - */ - public long transferFrom(ReadableByteChannel rbc) - throws IOException - { - if (closed) { - throw new ClosedChannelException(); - } - - if (rbc == null) { - throw new NullPointerException(); - } - - if (buffer.remaining() == 0) { - flushBuffer(); - } - - long totTransfered = 0; - int read; - while ((read = rbc.read(buffer)) != -1) { - if (buffer.remaining() == 0) { - flushBuffer(); - } - - totTransfered += read; - } - - return totTransfered; - } - - @Override - public final void flush() - throws IOException - { - if (closed) { - throw new IOException("Stream is closed"); - } - flushBuffer(); - } - - @Override - public final void close() - throws IOException - { - if (closed) { - return; - } - try { - flush(); - out.close(); - } - finally { - closed = true; - bufferPool.releaseArray(buffer.array()); - bufferPool.releaseDirect(directInputBuffer); - bufferPool.releaseDirect(outputBuffer); - } - } - - /** - * Compresses and writes out any buffered data. This does nothing if there - * is no currently buffered data. - * - * @throws IOException - */ - private void flushBuffer() - throws IOException - { - if (buffer.position() > 0) { - buffer.flip(); - writeCompressed(buffer); - buffer.clear(); - buffer.limit(blockSize); - } - } - - /** - * {@link SnappyFramed#maskedCrc32c(byte[], int, int)} the crc, compresses - * the data, determines if the compression ratio is acceptable and calls - * {@link #writeBlock(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer, boolean, int)} to - * actually write the frame. - * - * @param buffer - * @throws IOException - */ - private void writeCompressed(ByteBuffer buffer) - throws IOException - { - - final byte[] input = buffer.array(); - final int length = buffer.remaining(); - - // crc is based on the user supplied input data - final int crc32c = maskedCrc32c(input, 0, length); - - directInputBuffer.clear(); - directInputBuffer.put(buffer); - directInputBuffer.flip(); - - outputBuffer.clear(); - Snappy.compress(directInputBuffer, outputBuffer); - - final int compressedLength = outputBuffer.remaining(); - - // only use the compressed data if compression ratio is <= the - // minCompressonRatio - if (((double) compressedLength / (double) length) <= minCompressionRatio) { - writeBlock(out, outputBuffer, true, crc32c); - } - else { - // otherwise use the uncompressed data. - buffer.flip(); - writeBlock(out, buffer, false, crc32c); - } - } - - /** - * Write a frame (block) to out. - * - * @param out The {@link OutputStream} to write to. - * @param data The data to write. - * @param compressed Indicates if data is the compressed or raw content. - * This is based on whether the compression ratio desired is - * reached. - * @param crc32c The calculated checksum. - * @throws IOException - */ - private void writeBlock(final WritableByteChannel out, ByteBuffer data, - boolean compressed, int crc32c) - throws IOException - { - - headerBuffer.clear(); - headerBuffer.put((byte) (compressed ? COMPRESSED_DATA_FLAG - : UNCOMPRESSED_DATA_FLAG)); - - // the length written out to the header is both the checksum and the - // frame - final int headerLength = data.remaining() + 4; - - // write length - headerBuffer.put((byte) headerLength); - headerBuffer.put((byte) (headerLength >>> 8)); - headerBuffer.put((byte) (headerLength >>> 16)); - - // write crc32c of user input data - headerBuffer.putInt(crc32c); - - headerBuffer.flip(); - - // write the header - out.write(headerBuffer); - // write the raw data - out.write(data); - } -} +/* + * Created: Apr 12, 2013 + */ +package org.xerial.snappy; + +import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG; +import static org.xerial.snappy.SnappyFramed.HEADER_BYTES; +import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG; +import static org.xerial.snappy.SnappyFramed.maskedCrc32c; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.Channels; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.zip.Checksum; + +import org.xerial.snappy.pool.BufferPool; +import org.xerial.snappy.pool.DefaultPoolFactory; + +/** + * Implements the x-snappy-framed as an {@link OutputStream} and + * {@link WritableByteChannel}. + * + * @author Brett Okken + * @since 1.1.0 + */ +public final class SnappyFramedOutputStream + extends OutputStream + implements + WritableByteChannel +{ + + /** + * The x-snappy-framed specification allows for a chunk size up to + * 16,777,211 bytes in length. However, it also goes on to state: + *

+ * + * We place an additional restriction that the uncompressed data in a chunk + * must be no longer than 65536 bytes. This allows consumers to easily use + * small fixed-size buffers. + * + *

+ */ + public static final int MAX_BLOCK_SIZE = 64 * 1024; + + /** + * The default block size to use. + */ + public static final int DEFAULT_BLOCK_SIZE = MAX_BLOCK_SIZE; + + /** + * The default min compression ratio to use. + */ + public static final double DEFAULT_MIN_COMPRESSION_RATIO = 0.85d; + + private final Checksum crc32 = SnappyFramed.getCRC32C(); + private final ByteBuffer headerBuffer = ByteBuffer.allocate(8).order( + ByteOrder.LITTLE_ENDIAN); + private final BufferPool bufferPool; + private final int blockSize; + private final ByteBuffer buffer; + private final ByteBuffer directInputBuffer; + private final ByteBuffer outputBuffer; + private final double minCompressionRatio; + + private final WritableByteChannel out; + + // private int position; + private boolean closed; + + /** + * Creates a new {@link SnappyFramedOutputStream} using the {@link #DEFAULT_BLOCK_SIZE} + * and {@link #DEFAULT_MIN_COMPRESSION_RATIO}. + *

+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. + *

+ * + * @param out The underlying {@link OutputStream} to write to. Must not be + * {@code null}. + * @throws IOException + */ + public SnappyFramedOutputStream(OutputStream out) + throws IOException + { + this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, DefaultPoolFactory.getDefaultPool()); + } + + /** + * Creates a new {@link SnappyFramedOutputStream} using the {@link #DEFAULT_BLOCK_SIZE} + * and {@link #DEFAULT_MIN_COMPRESSION_RATIO}. + * + * @param out The underlying {@link OutputStream} to write to. Must not be + * {@code null}. + * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. + * @throws IOException + */ + public SnappyFramedOutputStream(OutputStream out, BufferPool bufferPool) + throws IOException + { + this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, bufferPool); + } + + /** + * Creates a new {@link SnappyFramedOutputStream} instance. + *

+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. + *

+ * + * @param out The underlying {@link OutputStream} to write to. Must not be + * {@code null}. + * @param blockSize The block size (of raw data) to compress before writing frames + * to out. Must be in (0, 65536]. + * @param minCompressionRatio Defines the minimum compression ratio ( + * {@code compressedLength / rawLength}) that must be achieved to + * write the compressed data. This must be in (0, 1.0]. + * @throws IOException + */ + public SnappyFramedOutputStream(OutputStream out, int blockSize, + double minCompressionRatio) + throws IOException + { + this(Channels.newChannel(out), blockSize, minCompressionRatio, DefaultPoolFactory.getDefaultPool()); + } + + /** + * Creates a new {@link SnappyFramedOutputStream} instance. + * + * @param out The underlying {@link OutputStream} to write to. Must not be + * {@code null}. + * @param blockSize The block size (of raw data) to compress before writing frames + * to out. Must be in (0, 65536]. + * @param minCompressionRatio Defines the minimum compression ratio ( + * {@code compressedLength / rawLength}) that must be achieved to + * write the compressed data. This must be in (0, 1.0]. + * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. + * @throws IOException + */ + public SnappyFramedOutputStream(OutputStream out, int blockSize, + double minCompressionRatio, BufferPool bufferPool) + throws IOException + { + this(Channels.newChannel(out), blockSize, minCompressionRatio, bufferPool); + } + + /** + * Creates a new {@link SnappyFramedOutputStream} using the + * {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}. + *

+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. + *

+ * + * @param out The underlying {@link WritableByteChannel} to write to. Must + * not be {@code null}. + * @throws IOException + * @since 1.1.1 + */ + public SnappyFramedOutputStream(WritableByteChannel out) + throws IOException + { + this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, DefaultPoolFactory.getDefaultPool()); + } + + /** + * Creates a new {@link SnappyFramedOutputStream} using the + * {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}. + *

+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. + *

+ * + * @param out The underlying {@link WritableByteChannel} to write to. Must + * not be {@code null}. + * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. + * @throws IOException + */ + public SnappyFramedOutputStream(WritableByteChannel out, BufferPool bufferPool) + throws IOException + { + this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, bufferPool); + } + + /** + * Creates a new {@link SnappyFramedOutputStream} instance. + * + * @param out The underlying {@link WritableByteChannel} to write to. Must + * not be {@code null}. + * @param blockSize The block size (of raw data) to compress before writing frames + * to out. Must be in (0, 65536]. + * @param minCompressionRatio Defines the minimum compression ratio ( + * {@code compressedLength / rawLength}) that must be achieved to + * write the compressed data. This must be in (0, 1.0]. + * @throws IOException + * @since 1.1.1 + */ + public SnappyFramedOutputStream(WritableByteChannel out, int blockSize, + double minCompressionRatio) + throws IOException + { + this(out, blockSize, minCompressionRatio, DefaultPoolFactory.getDefaultPool()); + } + + /** + * Creates a new {@link SnappyFramedOutputStream} instance. + * + * @param out The underlying {@link WritableByteChannel} to write to. Must + * not be {@code null}. + * @param blockSize The block size (of raw data) to compress before writing frames + * to out. Must be in (0, 65536]. + * @param minCompressionRatio Defines the minimum compression ratio ( + * {@code compressedLength / rawLength}) that must be achieved to + * write the compressed data. This must be in (0, 1.0]. + * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. + * @throws IOException + * @since 1.1.1 + */ + public SnappyFramedOutputStream(WritableByteChannel out, int blockSize, + double minCompressionRatio, BufferPool bufferPool) + throws IOException + { + if (out == null) { + throw new NullPointerException("out is null"); + } + + if (bufferPool == null) { + throw new NullPointerException("buffer pool is null"); + } + + if (minCompressionRatio <= 0 || minCompressionRatio > 1.0) { + throw new IllegalArgumentException("minCompressionRatio " + + minCompressionRatio + " must be in (0,1.0]"); + } + + if (blockSize <= 0 || blockSize > MAX_BLOCK_SIZE) { + throw new IllegalArgumentException("block size " + blockSize + + " must be in (0, 65536]"); + } + this.blockSize = blockSize; + this.out = out; + this.minCompressionRatio = minCompressionRatio; + + this.bufferPool = bufferPool; + buffer = ByteBuffer.wrap(bufferPool.allocateArray(blockSize), 0, blockSize); + directInputBuffer = bufferPool.allocateDirect(blockSize); + outputBuffer = bufferPool.allocateDirect(Snappy + .maxCompressedLength(blockSize)); + + writeHeader(out); + } + + /** + * Writes the implementation specific header or "marker bytes" to + * out. + * + * @param out The underlying {@link OutputStream}. + * @throws IOException + */ + private void writeHeader(WritableByteChannel out) + throws IOException + { + out.write(ByteBuffer.wrap(HEADER_BYTES)); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isOpen() + { + return !closed; + } + + @Override + public void write(int b) + throws IOException + { + if (closed) { + throw new IOException("Stream is closed"); + } + if (buffer.remaining() <= 0) { + flushBuffer(); + } + buffer.put((byte) b); + } + + @Override + public void write(byte[] input, int offset, int length) + throws IOException + { + if (closed) { + throw new IOException("Stream is closed"); + } + + if (input == null) { + throw new NullPointerException(); + } + else if ((offset < 0) || (offset > input.length) || (length < 0) + || ((offset + length) > input.length) + || ((offset + length) < 0)) { + throw new IndexOutOfBoundsException(); + } + + while (length > 0) { + if (buffer.remaining() <= 0) { + flushBuffer(); + } + + final int toPut = Math.min(length, buffer.remaining()); + buffer.put(input, offset, toPut); + offset += toPut; + length -= toPut; + } + } + + /** + * {@inheritDoc} + */ + @Override + public int write(ByteBuffer src) + throws IOException + { + if (closed) { + throw new ClosedChannelException(); + } + + if (buffer.remaining() <= 0) { + flushBuffer(); + } + + final int srcLength = src.remaining(); + + // easy case: enough free space in buffer for entire input + if (buffer.remaining() >= src.remaining()) { + buffer.put(src); + return srcLength; + } + + // store current limit + final int srcEnd = src.position() + src.remaining(); + + while ((src.position() + buffer.remaining()) <= srcEnd) { + // fill partial buffer as much as possible and flush + src.limit(src.position() + buffer.remaining()); + buffer.put(src); + flushBuffer(); + } + + // reset original limit + src.limit(srcEnd); + + // copy remaining partial block into now-empty buffer + buffer.put(src); + + return srcLength; + } + + /** + * Transfers all the content from is to this {@link OutputStream}. + * This potentially limits the amount of buffering required to compress + * content. + * + * @param is The source of data to compress. + * @return The number of bytes read from is. + * @throws IOException + * @since 1.1.1 + */ + public long transferFrom(InputStream is) + throws IOException + { + if (closed) { + throw new ClosedChannelException(); + } + + if (is == null) { + throw new NullPointerException(); + } + + if (buffer.remaining() == 0) { + flushBuffer(); + } + + assert buffer.hasArray(); + final byte[] bytes = buffer.array(); + + final int arrayOffset = buffer.arrayOffset(); + long totTransfered = 0; + int read; + while ((read = is.read(bytes, arrayOffset + buffer.position(), + buffer.remaining())) != -1) { + buffer.position(buffer.position() + read); + + if (buffer.remaining() == 0) { + flushBuffer(); + } + + totTransfered += read; + } + + return totTransfered; + } + + /** + * Transfers all the content from rbc to this + * {@link WritableByteChannel}. This potentially limits the amount of + * buffering required to compress content. + * + * @param rbc The source of data to compress. + * @return The number of bytes read from rbc. + * @throws IOException + * @since 1.1.1 + */ + public long transferFrom(ReadableByteChannel rbc) + throws IOException + { + if (closed) { + throw new ClosedChannelException(); + } + + if (rbc == null) { + throw new NullPointerException(); + } + + if (buffer.remaining() == 0) { + flushBuffer(); + } + + long totTransfered = 0; + int read; + while ((read = rbc.read(buffer)) != -1) { + if (buffer.remaining() == 0) { + flushBuffer(); + } + + totTransfered += read; + } + + return totTransfered; + } + + @Override + public final void flush() + throws IOException + { + if (closed) { + throw new IOException("Stream is closed"); + } + flushBuffer(); + } + + @Override + public final void close() + throws IOException + { + if (closed) { + return; + } + try { + flush(); + out.close(); + } + finally { + closed = true; + bufferPool.releaseArray(buffer.array()); + bufferPool.releaseDirect(directInputBuffer); + bufferPool.releaseDirect(outputBuffer); + } + } + + /** + * Compresses and writes out any buffered data. This does nothing if there + * is no currently buffered data. + * + * @throws IOException + */ + private void flushBuffer() + throws IOException + { + if (buffer.position() > 0) { + buffer.flip(); + writeCompressed(buffer); + buffer.clear(); + buffer.limit(blockSize); + } + } + + /** + * {@link SnappyFramed#maskedCrc32c(byte[], int, int)} the crc, compresses + * the data, determines if the compression ratio is acceptable and calls + * {@link #writeBlock(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer, boolean, int)} to + * actually write the frame. + * + * @param buffer + * @throws IOException + */ + private void writeCompressed(ByteBuffer buffer) + throws IOException + { + + final byte[] input = buffer.array(); + final int length = buffer.remaining(); + + // crc is based on the user supplied input data + final int crc32c = maskedCrc32c(crc32, input, 0, length); + + directInputBuffer.clear(); + directInputBuffer.put(buffer); + directInputBuffer.flip(); + + outputBuffer.clear(); + Snappy.compress(directInputBuffer, outputBuffer); + + final int compressedLength = outputBuffer.remaining(); + + // only use the compressed data if compression ratio is <= the + // minCompressonRatio + if (((double) compressedLength / (double) length) <= minCompressionRatio) { + writeBlock(out, outputBuffer, true, crc32c); + } + else { + // otherwise use the uncompressed data. + buffer.flip(); + writeBlock(out, buffer, false, crc32c); + } + } + + /** + * Write a frame (block) to out. + * + * @param out The {@link OutputStream} to write to. + * @param data The data to write. + * @param compressed Indicates if data is the compressed or raw content. + * This is based on whether the compression ratio desired is + * reached. + * @param crc32c The calculated checksum. + * @throws IOException + */ + private void writeBlock(final WritableByteChannel out, ByteBuffer data, + boolean compressed, int crc32c) + throws IOException + { + + headerBuffer.clear(); + headerBuffer.put((byte) (compressed ? COMPRESSED_DATA_FLAG + : UNCOMPRESSED_DATA_FLAG)); + + // the length written out to the header is both the checksum and the + // frame + final int headerLength = data.remaining() + 4; + + // write length + headerBuffer.put((byte) headerLength); + headerBuffer.put((byte) (headerLength >>> 8)); + headerBuffer.put((byte) (headerLength >>> 16)); + + // write crc32c of user input data + headerBuffer.putInt(crc32c); + + headerBuffer.flip(); + + // write the header + out.write(headerBuffer); + // write the raw data + out.write(data); + } +} diff --git a/src/test/java/org/xerial/snappy/SnappyFramedStreamTest.java b/src/test/java/org/xerial/snappy/SnappyFramedStreamTest.java index b03ec870..e428bf98 100644 --- a/src/test/java/org/xerial/snappy/SnappyFramedStreamTest.java +++ b/src/test/java/org/xerial/snappy/SnappyFramedStreamTest.java @@ -9,7 +9,6 @@ import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG; import static org.xerial.snappy.SnappyFramed.HEADER_BYTES; import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG; -import static org.xerial.snappy.SnappyFramed.maskedCrc32c; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -418,4 +417,9 @@ private byte[] getRandom(double compressionRatio, int length) assertEquals(random.length, length); return random; } + + public static int maskedCrc32c(byte[] data) + { + return SnappyFramed.maskedCrc32c(new PureJavaCrc32C(), data, 0, data.length); + } }