Skip to content

Commit

Permalink
Merge pull request #211 from oakad/issue_210
Browse files Browse the repository at this point in the history
CompressingReader: compressed data reader stream (#210)
  • Loading branch information
pierrec committed Dec 8, 2023
2 parents a0f6c5e + 7613989 commit e974631
Show file tree
Hide file tree
Showing 3 changed files with 328 additions and 0 deletions.
222 changes: 222 additions & 0 deletions compressing_reader.go
@@ -0,0 +1,222 @@
package lz4

import (
"errors"
"io"

"github.com/pierrec/lz4/v4/internal/lz4block"
"github.com/pierrec/lz4/v4/internal/lz4errors"
"github.com/pierrec/lz4/v4/internal/lz4stream"
)

type crState int

const (
crStateInitial crState = iota
crStateReading
crStateFlushing
crStateDone
)

type CompressingReader struct {
state crState
src io.ReadCloser // source reader
level lz4block.CompressionLevel // how hard to try
frame *lz4stream.Frame // frame being built
in []byte
out ovWriter
handler func(int)
}

// NewCompressingReader creates a reader which reads compressed data from
// raw stream. This makes it a logical opposite of a normal lz4.Reader.
// We require an io.ReadCloser as an underlying source for compatibility
// with Go's http.Request.
func NewCompressingReader(src io.ReadCloser) *CompressingReader {
zrd := &CompressingReader {
frame: lz4stream.NewFrame(),
}

_ = zrd.Apply(DefaultBlockSizeOption, DefaultChecksumOption, defaultOnBlockDone)
zrd.Reset(src)

return zrd
}

// Source exposes the underlying source stream for introspection and control.
func (zrd *CompressingReader) Source() io.ReadCloser {
return zrd.src
}

// Close simply invokes the underlying stream Close method. This method is
// provided for the benefit of Go http client/server, which relies on Close
// for goroutine termination.
func (zrd *CompressingReader) Close() error {
return zrd.src.Close()
}

// Apply applies useful options to the lz4 encoder.
func (zrd *CompressingReader) Apply(options ...Option) (err error) {
if zrd.state != crStateInitial {
return lz4errors.ErrOptionClosedOrError
}

zrd.Reset(zrd.src)

for _, o := range options {
if err = o(zrd); err != nil {
return
}
}
return
}

func (*CompressingReader) private() {}

func (zrd *CompressingReader) init() error {
zrd.frame.InitW(&zrd.out, 1, false)
size := zrd.frame.Descriptor.Flags.BlockSizeIndex()
zrd.in = size.Get()
return zrd.frame.Descriptor.Write(zrd.frame, &zrd.out)
}

// Read allows reading of lz4 compressed data
func (zrd *CompressingReader) Read(p []byte) (n int, err error) {
defer func() {
if err != nil {
zrd.state = crStateDone
}
}()

if !zrd.out.reset(p) {
return len(p), nil
}

switch zrd.state {
case crStateInitial:
err = zrd.init()
if err != nil {
return
}
zrd.state = crStateReading
case crStateDone:
return 0, errors.New("This reader is done")
case crStateFlushing:
if zrd.out.dataPos > 0 {
n = zrd.out.dataPos
zrd.out.data = nil
zrd.out.dataPos = 0
return
} else {
zrd.state = crStateDone
return 0, io.EOF
}
}

for zrd.state == crStateReading {
block := zrd.frame.Blocks.Block

var rCount int
rCount, err = io.ReadFull(zrd.src, zrd.in)
switch err {
case nil:
err = block.Compress(
zrd.frame, zrd.in[ : rCount], zrd.level,
).Write(zrd.frame, &zrd.out)
zrd.handler(len(block.Data))
if err != nil {
return
}

if zrd.out.dataPos == len(zrd.out.data) {
n = zrd.out.dataPos
zrd.out.dataPos = 0
zrd.out.data = nil
return
}
case io.EOF, io.ErrUnexpectedEOF: // read may be partial
if rCount > 0 {
err = block.Compress(
zrd.frame, zrd.in[ : rCount], zrd.level,
).Write(zrd.frame, &zrd.out)
zrd.handler(len(block.Data))
if err != nil {
return
}
}

err = zrd.frame.CloseW(&zrd.out, 1)
if err != nil {
return
}
zrd.state = crStateFlushing

n = zrd.out.dataPos
zrd.out.dataPos = 0
zrd.out.data = nil
return
default:
return
}
}

err = lz4errors.ErrInternalUnhandledState
return
}

// Reset makes the stream usable again; mostly handy to reuse lz4 encoder
// instances.
func (zrd *CompressingReader) Reset(src io.ReadCloser) {
zrd.frame.Reset(1)
zrd.state = crStateInitial
zrd.src = src
zrd.out.clear()
}

type ovWriter struct {
data []byte
ov []byte
dataPos int
ovPos int
}

func (wr *ovWriter) Write(p []byte) (n int, err error) {
count := copy(wr.data[wr.dataPos : ], p)
wr.dataPos += count

if count < len(p) {
wr.ov = append(wr.ov, p[count : ]...)
}

return len(p), nil
}

func (wr *ovWriter) reset(out []byte) bool {
ovRem := len(wr.ov) - wr.ovPos

if ovRem >= len(out) {
wr.ovPos += copy(out, wr.ov[wr.ovPos : ])
return false
}

if ovRem > 0 {
copy(out, wr.ov[wr.ovPos : ])
wr.ov = wr.ov[ : 0]
wr.ovPos = 0
wr.dataPos = ovRem
} else if wr.ovPos > 0 {
wr.ov = wr.ov[ : 0]
wr.ovPos = 0
wr.dataPos = 0
}

wr.data = out
return true
}

func (wr *ovWriter) clear() {
wr.data = nil
wr.dataPos = 0
wr.ov = wr.ov[ : 0]
wr.ovPos = 0
}
78 changes: 78 additions & 0 deletions compressing_reader_test.go
@@ -0,0 +1,78 @@
package lz4_test

import (
"bytes"
"fmt"
"io/ioutil"
"strings"
"testing"

"github.com/pierrec/lz4/v4"
)

func TestCompressingReader(t *testing.T) {
goldenFiles := []string{
"testdata/e.txt",
"testdata/gettysburg.txt",
"testdata/Mark.Twain-Tom.Sawyer.txt",
"testdata/Mark.Twain-Tom.Sawyer_long.txt",
"testdata/pg1661.txt",
"testdata/pi.txt",
"testdata/random.data",
"testdata/repeat.txt",
"testdata/issue102.data",
}

for _, fname := range goldenFiles {
for _, option := range []lz4.Option{
lz4.BlockChecksumOption(true),
lz4.SizeOption(123),
} {
label := fmt.Sprintf("%s/%s", fname, option)
t.Run(label, func(t *testing.T) {
fname := fname
option := option
t.Parallel()

raw, err := ioutil.ReadFile(fname)
if err != nil {
t.Fatal(err)
}
r := ioutil.NopCloser(bytes.NewReader(raw))

// Compress.
zcomp := lz4.NewCompressingReader(r)
if err := zcomp.Apply(option, lz4.CompressionLevelOption(lz4.Level1)); err != nil {
t.Fatal(err)
}

zout, err := ioutil.ReadAll(zcomp)
if err != nil {
t.Fatal(err)
}

// Uncompress.
zr := lz4.NewReader(bytes.NewReader(zout))
out, err := ioutil.ReadAll(zr)
if err != nil {
t.Fatal(err)
}

// The uncompressed data must be the same as the initial input.
if got, want := len(out), len(raw); got != want {
t.Errorf("invalid sizes: got %d; want %d", got, want)
}

if !bytes.Equal(out, raw) {
t.Fatal("uncompressed data does not match original")
}

if strings.Contains(option.String(), "SizeOption") {
if got, want := zr.Size(), 123; got != want {
t.Errorf("invalid sizes: got %d; want %d", got, want)
}
}
})
}
}
}
28 changes: 28 additions & 0 deletions options.go
Expand Up @@ -57,6 +57,13 @@ func BlockSizeOption(size BlockSize) Option {
}
w.frame.Descriptor.Flags.BlockSizeIndexSet(lz4block.Index(size))
return nil
case *CompressingReader:
size := uint32(size)
if !lz4block.IsValid(size) {
return fmt.Errorf("%w: %d", lz4errors.ErrOptionInvalidBlockSize, size)
}
w.frame.Descriptor.Flags.BlockSizeIndexSet(lz4block.Index(size))
return nil
}
return lz4errors.ErrOptionNotApplicable
}
Expand All @@ -72,6 +79,9 @@ func BlockChecksumOption(flag bool) Option {
case *Writer:
w.frame.Descriptor.Flags.BlockChecksumSet(flag)
return nil
case *CompressingReader:
w.frame.Descriptor.Flags.BlockChecksumSet(flag)
return nil
}
return lz4errors.ErrOptionNotApplicable
}
Expand All @@ -87,6 +97,9 @@ func ChecksumOption(flag bool) Option {
case *Writer:
w.frame.Descriptor.Flags.ContentChecksumSet(flag)
return nil
case *CompressingReader:
w.frame.Descriptor.Flags.ContentChecksumSet(flag)
return nil
}
return lz4errors.ErrOptionNotApplicable
}
Expand All @@ -104,6 +117,10 @@ func SizeOption(size uint64) Option {
w.frame.Descriptor.Flags.SizeSet(size > 0)
w.frame.Descriptor.ContentSize = size
return nil
case *CompressingReader:
w.frame.Descriptor.Flags.SizeSet(size > 0)
w.frame.Descriptor.ContentSize = size
return nil
}
return lz4errors.ErrOptionNotApplicable
}
Expand Down Expand Up @@ -162,6 +179,14 @@ func CompressionLevelOption(level CompressionLevel) Option {
}
w.level = lz4block.CompressionLevel(level)
return nil
case *CompressingReader:
switch level {
case Fast, Level1, Level2, Level3, Level4, Level5, Level6, Level7, Level8, Level9:
default:
return fmt.Errorf("%w: %d", lz4errors.ErrOptionInvalidCompressionLevel, level)
}
w.level = lz4block.CompressionLevel(level)
return nil
}
return lz4errors.ErrOptionNotApplicable
}
Expand All @@ -186,6 +211,9 @@ func OnBlockDoneOption(handler func(size int)) Option {
case *Reader:
rw.handler = handler
return nil
case *CompressingReader:
rw.handler = handler
return nil
}
return lz4errors.ErrOptionNotApplicable
}
Expand Down

0 comments on commit e974631

Please sign in to comment.