From 8b191e41668f681e06fc86b6e5495675f8a08015 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Mon, 2 Jan 2023 15:19:57 +0100 Subject: [PATCH] s2: Add example for indexing and existing stream (#723) Also allow seek after EOF. --- s2/decode.go | 6 +- s2/index_test.go | 174 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 179 insertions(+), 1 deletion(-) diff --git a/s2/decode.go b/s2/decode.go index 27c0f3c2c4..00c5cc72c2 100644 --- a/s2/decode.go +++ b/s2/decode.go @@ -952,7 +952,11 @@ func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) { // Seek allows seeking in compressed data. func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) { if r.err != nil { - return 0, r.err + if !errors.Is(r.err, io.EOF) { + return 0, r.err + } + // Reset on EOF + r.err = nil } if offset == 0 && whence == io.SeekCurrent { return r.blockStart + int64(r.i), nil diff --git a/s2/index_test.go b/s2/index_test.go index cd6cde1105..4fc2197fe3 100644 --- a/s2/index_test.go +++ b/s2/index_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "math/rand" + "os" "sync" "testing" @@ -234,3 +235,176 @@ func TestSeeking(t *testing.T) { }) } } + +// ExampleIndexStream shows an example of indexing a stream +// and indexing it after it has been written. +// The index can either be appended. +func ExampleIndexStream() { + fatalErr := func(err error) { + if err != nil { + panic(err) + } + } + + // Create a test stream without index + var streamName = "" + tmp := make([]byte, 5<<20) + { + rng := rand.New(rand.NewSource(0xbeefcafe)) + rng.Read(tmp) + // Make it compressible... + for i, v := range tmp { + tmp[i] = '0' + v&3 + } + // Compress it... + output, err := os.CreateTemp("", "IndexStream") + streamName = output.Name() + fatalErr(err) + + // We use smaller blocks just for the example... + enc := s2.NewWriter(output, s2.WriterSnappyCompat()) + err = enc.EncodeBuffer(tmp) + fatalErr(err) + + // Close and get index... + err = enc.Close() + fatalErr(err) + err = output.Close() + fatalErr(err) + } + + // Open our compressed stream without an index... + stream, err := os.Open(streamName) + fatalErr(err) + defer stream.Close() + + var indexInput = io.Reader(stream) + var indexOutput io.Writer + var indexedName string + + // Should index be combined with stream by appending? + // This could also be done by appending to an os.File + // If not it will be written to a separate file. + const combineOutput = false + + // Function to easier use defer. + func() { + if combineOutput { + output, err := os.CreateTemp("", "IndexStream-Combined") + fatalErr(err) + defer func() { + fatalErr(output.Close()) + if false { + fi, err := os.Stat(output.Name()) + fatalErr(err) + fmt.Println("Combined:", fi.Size(), "bytes") + } else { + fmt.Println("Index saved") + } + }() + + // Everything read from stream will also be written to output. + indexedName = output.Name() + indexInput = io.TeeReader(stream, output) + indexOutput = output + } else { + output, err := os.CreateTemp("", "IndexStream-Index") + fatalErr(err) + defer func() { + fatalErr(output.Close()) + fi, err := os.Stat(output.Name()) + fatalErr(err) + if false { + fmt.Println("Index:", fi.Size(), "bytes") + } else { + fmt.Println("Index saved") + } + }() + indexedName = output.Name() + indexOutput = output + } + + // Index the input + idx, err := s2.IndexStream(indexInput) + fatalErr(err) + + // Write the index + _, err = indexOutput.Write(idx) + fatalErr(err) + }() + + if combineOutput { + // Read from combined stream only. + stream, err := os.Open(indexedName) + fatalErr(err) + defer stream.Close() + // Create a reader with the input. + // We assert that the stream is an io.ReadSeeker. + r := s2.NewReader(io.ReadSeeker(stream)) + + // Request a ReadSeeker with random access. + // This will load the index from the stream. + rs, err := r.ReadSeeker(true, nil) + fatalErr(err) + + _, err = rs.Seek(-10, io.SeekEnd) + fatalErr(err) + + b, err := io.ReadAll(rs) + fatalErr(err) + if want := tmp[len(tmp)-10:]; !bytes.Equal(b, want) { + fatalErr(fmt.Errorf("wanted %v, got %v", want, b)) + } + fmt.Println("last 10 bytes read") + + _, err = rs.Seek(10, io.SeekStart) + fatalErr(err) + _, err = io.ReadFull(rs, b) + fatalErr(err) + if want := tmp[10:20]; !bytes.Equal(b, want) { + fatalErr(fmt.Errorf("wanted %v, got %v", want, b)) + } + fmt.Println("10 bytes at offset 10 read") + } else { + // Read from separate stream and index. + stream, err := os.Open(streamName) + fatalErr(err) + defer stream.Close() + // Create a reader with the input. + // We assert that the stream is an io.ReadSeeker. + r := s2.NewReader(io.ReadSeeker(stream)) + + // Read the separate index. + index, err := os.ReadFile(indexedName) + fatalErr(err) + + // Request a ReadSeeker with random access. + // The provided index will be used. + rs, err := r.ReadSeeker(true, index) + fatalErr(err) + + _, err = rs.Seek(-10, io.SeekEnd) + fatalErr(err) + + b, err := io.ReadAll(rs) + fatalErr(err) + if want := tmp[len(tmp)-10:]; !bytes.Equal(b, want) { + fatalErr(fmt.Errorf("wanted %v, got %v", want, b)) + } + fmt.Println("last 10 bytes read") + + _, err = rs.Seek(10, io.SeekStart) + fatalErr(err) + _, err = io.ReadFull(rs, b) + fatalErr(err) + if want := tmp[10:20]; !bytes.Equal(b, want) { + fatalErr(fmt.Errorf("wanted %v, got %v", want, b)) + } + fmt.Println("10 bytes at offset 10 read") + } + + // OUTPUT: + // Index saved + // last 10 bytes read + // 10 bytes at offset 10 read +}