Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

[O11Y-224] add support for merging parquet files. #45

Merged
merged 146 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
146 commits
Select commit Hold shift + click to select a range
6297550
refactor row traversal
Dec 16, 2021
63faa7c
reorganize code
Dec 16, 2021
fe2dad8
micro optimization
Dec 16, 2021
fa039c4
optimize struct field lookup
Dec 16, 2021
4096e31
remove unnecessary check
Dec 16, 2021
eb07e76
export parquet.NewSchema
Dec 16, 2021
fa0c576
refactor traversal into deconstruct
Dec 17, 2021
6b205ed
don't use a static array as row buffer is row group writer
Dec 17, 2021
e6d6860
working row deconstruction + reconstruction
Dec 19, 2021
92f06e0
optimize decoding of repeated values
Dec 19, 2021
872fe74
remove unused code
Dec 19, 2021
bc57ee9
fix bugs in delta binary packed encoding + add more tests for parquet…
Dec 20, 2021
e86f6a5
fix issue when reading empty sequences of repeated columns
Dec 20, 2021
efe4bf1
validate column index in deconstruction/reconstruction tests
Dec 20, 2021
a24ce66
add benchmarks for parquet.Reader
Dec 20, 2021
028f7d5
introduce new batching APIs + optimizations + bug fixes
Dec 21, 2021
500414a
add more tests and documentation
Dec 21, 2021
f0e0754
remove more unused code
Dec 21, 2021
53d2ea2
refactor reader internals to remove intermediary buffers
Dec 21, 2021
60441d6
optimize row reader
Dec 21, 2021
140a2fe
more tests and more fixes
Dec 22, 2021
c77ad3a
simplify column read func code
Dec 22, 2021
c3319bf
simplify column read func code (2)
Dec 22, 2021
89490fc
simplify column read func code (3)
Dec 22, 2021
39bf0ab
simplify column read func code (4)
Dec 22, 2021
a924da2
optimize reading column groups with a single element
Dec 22, 2021
bbe7d99
allow the reader to reuse ColumnPages objects
Dec 22, 2021
d17209e
fix bugs in RLE encoding and data page reader
Dec 22, 2021
40433b0
fix multiple bug in writer causing rows to be split between pages
Dec 23, 2021
cba040b
fix counting of null values when writing repeated columns
Dec 23, 2021
2c1624a
add test with nested lists
Dec 23, 2021
a9099b2
fix RLE/Bit-Packed tests + document that the decoders may return io.E…
Dec 23, 2021
c10efd4
inline function calls to improve reader throughput
Dec 23, 2021
7cc724b
fix test representation of fixed-length byte array
Dec 23, 2021
f216a28
split ReadRow/WriteRow into Read/ReadRow and Write/WriteRow
Dec 26, 2021
50bc2a5
add parquet.RowGroup APIs
Dec 26, 2021
f5dd389
introduce Page type and refactor PageWriter
Dec 27, 2021
2c31f5a
refactor package to use encoding.ByteArrayList
Dec 27, 2021
8beedc8
use encoding.ByteArrayList in parquet.RowGroupColumn + add parquet.Pa…
Dec 27, 2021
209936c
express parquet.RowGroupColumn in terms of parquet.Page + add tests
Dec 27, 2021
975243d
remove traverse.go
Dec 27, 2021
186d822
simplify creation of optional and repeated pages from row group columns
Dec 27, 2021
10ba1df
simplify value reader/writer interfaces
Dec 27, 2021
fd4532e
replace parquet.PageWriter with parquet.RowGroupColumn
Dec 27, 2021
9953df9
delegate generation of repetition and definition levels to parquet.Ro…
Dec 27, 2021
8e25392
refactor dictionary types + replace parquet.PageReader with parquet.V…
Dec 28, 2021
558ce36
expose the content of pages via a value reader
Dec 28, 2021
478b224
lazily configure schema on writer and row group + write row groups to…
Dec 28, 2021
b232d45
fix memory management when decoding into parquet.Value
Dec 28, 2021
cf82611
hold the number of rows in pages
Dec 28, 2021
15e0906
lazily allocate row group columns in writer
Dec 28, 2021
4063d1a
refactor writer internals to prepare for writeRowGroup implementation
Dec 29, 2021
da52a81
get rid of the boolean managing state of the writer
Dec 29, 2021
d03cac5
write footer in rowGroupWriter
Dec 29, 2021
8b6bf13
handling flush called after close
Dec 29, 2021
3d6b732
split row group column pages to respect target page size
Dec 29, 2021
d041a2f
document parquet.SortingColumn
Dec 29, 2021
b698ad7
remove unused NodeAt function
Dec 29, 2021
140149d
[O11Y-219] initial support for parquet MAP type
Dec 29, 2021
58e5900
Update row_unsafe.go
Dec 29, 2021
a249a9a
add more tests and fix bugs
Dec 29, 2021
1871d3e
fix tests
Dec 29, 2021
7129198
revisit row reconstruction logic
Dec 29, 2021
8c0a277
fix tests
Dec 29, 2021
f99088f
refactor map deconstruction/reconstruction to remove intermediary buffer
Dec 29, 2021
2d38f12
remove parquet.node type, the abstraction was not worth it
Dec 30, 2021
eedb358
[O11Y-220] add schema conversions
Dec 30, 2021
1e30ad6
remove unused max function
Dec 30, 2021
3cc049a
Merge remote-tracking branch 'origin/main' into O11Y-194
Dec 30, 2021
5281285
Merge branch 'O11Y-194' of ssh://github.com/segmentio/parquet into O1…
Dec 30, 2021
7cbeca3
Merge remote-tracking branch 'origin/O11Y-194' into O11Y-221
Dec 30, 2021
babea64
Merge remote-tracking branch 'origin/O11Y-221' into O11Y-219
Dec 30, 2021
04c6734
Merge remote-tracking branch 'origin/O11Y-219' into O11Y-220
Dec 30, 2021
867b716
[O11Y-216] support DELTA_BYTE_ARRAY on FIXED_LEN_BYTE_ARRAY columns
Dec 30, 2021
f94de8d
refactor row group abstractions
Dec 30, 2021
9b4372c
refactor buffer column to use ReadRowAt/WriteRow
Dec 31, 2021
2c50b29
modify RowGroupColumn interface to return the list of pages it contai…
Dec 31, 2021
633a257
add back the slicing of pages to target size in writer
Dec 31, 2021
8924c5b
Merge remote-tracking branch 'origin/main' into O11Y-224
Dec 31, 2021
9d1d87b
implement parquet.ValueReaderAt on parquet.BufferColumn
Dec 31, 2021
0dd07b9
refactor buffer column and pages to remove parquet.ValueReaderAt
Dec 31, 2021
41a3e2f
express WriteRowGroup as a row copy operation
Dec 31, 2021
76c2ae9
automatically convert rows when reader and writer declare a schema
Dec 31, 2021
88681d3
refator conversion APIs so they can play along with parquet.CopyRows
Dec 31, 2021
121a19e
express reading rows from parquet.Buffer and parquet.Reader with the …
Dec 31, 2021
d4d7dd3
document the APIs
Dec 31, 2021
0475a91
commont on the design decisions in parquet.CopyRows
Dec 31, 2021
3049139
work on row group merging
Dec 31, 2021
0991785
express sorting colums of row group as []parquet.SortingColumn rather…
Dec 31, 2021
c214ab2
add configuration options to row group merging function
Dec 31, 2021
7b9412a
don't precompute the number of rows in merged row groups
Dec 31, 2021
e651579
expose column index on row group columns
Dec 31, 2021
e55dc83
sort row group sorting columns
Dec 31, 2021
cd863e2
refactor parquet.Type.Less into parquet.Type.Compare
Dec 31, 2021
0307f66
refactor writer
Jan 2, 2022
736360f
optimize writes using pages
Jan 2, 2022
bcc4220
optimize value count of repeated pages
Jan 2, 2022
925e6f8
rename BufferColumn => ColumnBuffer
Jan 2, 2022
6df25e5
fix typos
Jan 2, 2022
5e37223
refactor writer to support reading pages from arbitrary row group col…
Jan 2, 2022
2ea6b89
add columnPath type
Jan 2, 2022
809f80d
introduced BufferedPage/CompressedPage interfaces
Jan 2, 2022
414461e
add tests for row group merging logic
Jan 2, 2022
1866936
add more tests for the merging logic + fix bugs in ordering code
Jan 3, 2022
afa0d2e
remove unused code
Jan 3, 2022
5e6d1d1
remove more unused code
Jan 3, 2022
48c5463
capture buffer schema when creating reader of buffer rows
Jan 3, 2022
64a8652
no need to expose schema when falling back to copying individual rows
Jan 3, 2022
205f8be
add more validations when writing row groups
Jan 3, 2022
d6ffc4f
fix handling of sorting columns when merging row groups
Jan 3, 2022
fe303a2
move all error values to errors.go
Jan 3, 2022
0beac07
make row group readers more generic
Jan 3, 2022
8b3cb88
add implementations of row group interfaces on top of parquet files
Jan 4, 2022
bb35581
refactor readers to use file row group APIs
Jan 4, 2022
f9c4b88
rename RowGroupColumn to ColumnChunk
Jan 4, 2022
ff77f51
make page readers reusable to address performance regression
Jan 4, 2022
35a5b4d
support reusing inner fields of page header
Jan 4, 2022
3e4842f
udpate segmentio/encoding
Jan 4, 2022
6911f3e
cleanups
Jan 4, 2022
08ee163
PR feedback
Jan 5, 2022
58a5d57
remove Pages method on parquet.RowGroup
Jan 5, 2022
0327ad6
PR feedback
Jan 5, 2022
df21f95
rename: pageHeaderStatisticsOf => filePage.statistics
Jan 5, 2022
bc88eb1
set boundary order on offset index using sorting columns of row groups
Jan 5, 2022
0f5e476
expose page index on column chunks and re-enable the tests
Jan 5, 2022
930c695
remove column_chunk.go
Jan 5, 2022
1222779
fix page index generation when writing compressed pages
Jan 5, 2022
88d16e7
fix broken build
Jan 5, 2022
b64505b
all: add documentation and links to additional documentation
kevinburkesegment Jan 5, 2022
9fd4ac7
always buffer column pages in parquet writer to avoid wrongly reorder…
Jan 6, 2022
ab3fe9c
simplify writer internals
Jan 6, 2022
3e26561
Merge branch 'O11Y-224' of ssh://github.com/segmentio/parquet into O1…
Jan 6, 2022
09be285
tests merging row groups when input is either buffers or files
Jan 6, 2022
835f7f3
cleanup
Jan 6, 2022
f6a2d84
Update file.go
Jan 6, 2022
95c2f33
Update row.go
Jan 6, 2022
0a527ec
Update row.go
Jan 6, 2022
47d4ab0
Update value.go
Jan 6, 2022
fbe6d81
Update column_buffer.go
Jan 6, 2022
616933e
PR feedback
Jan 6, 2022
46b4427
wrap up
Jan 10, 2022
203b961
Update buffer.go
Jan 10, 2022
98650c0
Update buffer.go
Jan 10, 2022
acbab33
Update format/parquet.go
Jan 10, 2022
61e0490
Update page.go
Jan 10, 2022
f346f1f
Update reader.go
Jan 10, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
305 changes: 219 additions & 86 deletions buffer.go
Original file line number Diff line number Diff line change
@@ -1,126 +1,259 @@
package parquet

import (
"bytes"
"encoding/binary"
"io"
"os"
"path/filepath"
"sync"
"sort"
)

const (
defaultBufferSize = 4096
defaultLevelBufferSize = 1024
)

var (
defaultBufferPool bufferPool
)

type Buffer interface {
io.Reader
io.Writer
// Buffer represents an in-memory group of parquet rows.
//
// The main purpose of the Buffer type is to provide a way to sort rows before
// writing them to a parquet file. Buffer implements sort.Interface as a way
// to support reordering the rows that have been written to it.
type Buffer struct {
config *RowGroupConfig
schema *Schema
rowbuf []Value
colbuf [][]Value
columns []ColumnBuffer
sorted []ColumnBuffer
}

type BufferPool interface {
GetBuffer() Buffer
PutBuffer(Buffer)
// NewBuffer constructs a new buffer, using the given list of buffer options
// to configure the buffer returned by the function.
//
// The function panics if the buffer configuration is invalid. Programs that
// cannot guarantee the validity of the options passed to NewBuffer should
// construct the buffer configuration independently prior to calling this
// function:
//
// config, err := parquet.NewRowGroupConfig(options...)
// if err != nil {
// // handle the configuration error
// ...
// } else {
// // this call to create a buffer is guaranteed not to panic
// buffer := parquet.NewBuffer(config)
// ...
// }
//
func NewBuffer(options ...RowGroupOption) *Buffer {
config, err := NewRowGroupConfig(options...)
if err != nil {
panic(err)
}
buf := &Buffer{
config: config,
}
if config.Schema != nil {
buf.configure(config.Schema)
}
return buf
}

func NewBufferPool() BufferPool { return new(bufferPool) }
func (buf *Buffer) configure(schema *Schema) {
if schema == nil {
return
}
sortingColumns := buf.config.SortingColumns
buf.sorted = make([]ColumnBuffer, len(sortingColumns))

type bufferPool struct{ sync.Pool }
forEachLeafColumnOf(schema, func(leaf leafColumn) {
nullOrdering := nullsGoLast
columnType := leaf.node.Type()
bufferSize := buf.config.ColumnBufferSize
dictionary := (Dictionary)(nil)
encoding, _ := encodingAndCompressionOf(leaf.node)

func (pool *bufferPool) GetBuffer() Buffer {
b, _ := pool.Get().(*buffer)
if b == nil {
b = new(buffer)
} else {
b.Reset()
if isDictionaryEncoding(encoding) {
bufferSize /= 2
dictionary = columnType.NewDictionary(bufferSize)
columnType = dictionary.Type()
}

column := columnType.NewColumnBuffer(leaf.columnIndex, bufferSize)
switch {
case leaf.maxRepetitionLevel > 0:
column = newRepeatedColumnBuffer(column, leaf.maxRepetitionLevel, leaf.maxDefinitionLevel, nullOrdering)
case leaf.maxDefinitionLevel > 0:
column = newOptionalColumnBuffer(column, leaf.maxDefinitionLevel, nullOrdering)
}
buf.columns = append(buf.columns, column)

if sortingIndex := searchSortingColumn(sortingColumns, leaf.path); sortingIndex < len(sortingColumns) {
if sortingColumns[sortingIndex].Descending() {
column = &reversedColumnBuffer{column}
}
if sortingColumns[sortingIndex].NullsFirst() {
nullOrdering = nullsGoFirst
}
buf.sorted[sortingIndex] = column
}
})

buf.schema = schema
buf.rowbuf = make([]Value, 0, 10)
buf.colbuf = make([][]Value, len(buf.columns))
}

// Size returns the estimated size of the buffer in memory (in bytes).
func (buf *Buffer) Size() int64 {
size := int64(0)
for _, col := range buf.columns {
size += col.Size()
}
return b
return size
}

func (pool *bufferPool) PutBuffer(buf Buffer) {
if b, _ := buf.(*buffer); b != nil {
pool.Put(b)
// NumRows returns the number of rows written to the buffer.
func (buf *Buffer) NumRows() int {
if len(buf.columns) == 0 {
return 0
} else {
// All columns have the same number of rows.
return buf.columns[0].Len()
}
}

type buffer struct{ bytes.Buffer }
// NumColumns returns the number of columns in the buffer.
//
// The count will be zero until a schema is configured on buf.
func (buf *Buffer) NumColumns() int { return len(buf.columns) }

func (b *buffer) Close() error {
b.Reset()
return nil
}
// Column returns the buffer column at index i.
//
// The method panics if i is negative or beyond the last column index in buf.
func (buf *Buffer) Column(i int) ColumnChunk { return buf.columns[i] }

type fileBufferPool struct {
err error
tempdir string
pattern string
}
// Schema returns the schema of the buffer.
//
// The schema is either configured by passing a Schema in the option list when
// constructing the buffer, or lazily discovered when the first row is written.
func (buf *Buffer) Schema() *Schema { return buf.schema }

// SortingColumns returns the list of columns by which the buffer will be
// sorted.
//
// The sorting order is configured by passing a SortingColumns option when
// constructing the buffer.
func (buf *Buffer) SortingColumns() []SortingColumn { return buf.config.SortingColumns }

// Len returns the number of rows written to the buffer.
func (buf *Buffer) Len() int { return buf.NumRows() }

func NewFileBufferPool(tempdir, pattern string) BufferPool {
pool := &fileBufferPool{
tempdir: tempdir,
pattern: pattern,
// Less returns true if row[i] < row[j] in the buffer.
func (buf *Buffer) Less(i, j int) bool {
for _, col := range buf.sorted {
switch {
case col.Less(i, j):
return true
case col.Less(j, i):
return false
}
}
pool.tempdir, pool.err = filepath.Abs(pool.tempdir)
return pool
return false
}

func (pool *fileBufferPool) GetBuffer() Buffer {
if pool.err != nil {
return &errorBuffer{err: pool.err}
// Swap exchanges the rows at indexes i and j.
func (buf *Buffer) Swap(i, j int) {
for _, col := range buf.columns {
col.Swap(i, j)
}
f, err := os.CreateTemp(pool.tempdir, pool.pattern)
if err != nil {
return &errorBuffer{err: err}
}

// Reset clears the content of the buffer, allowing it to be reused.
func (buf *Buffer) Reset() {
for _, col := range buf.columns {
col.Reset()
}
return f
}

func (pool *fileBufferPool) PutBuffer(buf Buffer) {
if f, _ := buf.(*os.File); f != nil {
defer f.Close()
os.Remove(f.Name())
// Write writes a row held in a Go value to the buffer.
func (buf *Buffer) Write(row interface{}) error {
if buf.schema == nil {
buf.configure(SchemaOf(row))
}
defer func() {
clearValues(buf.rowbuf)
}()
buf.rowbuf = buf.schema.Deconstruct(buf.rowbuf[:0], row)
return buf.WriteRow(buf.rowbuf)
}

type errorBuffer struct{ err error }
// WriteRow writes a parquet row to the buffer.
func (buf *Buffer) WriteRow(row Row) error {
defer func() {
for i, colbuf := range buf.colbuf {
clearValues(colbuf)
buf.colbuf[i] = colbuf[:0]
}
}()

func (errbuf *errorBuffer) Read([]byte) (int, error) { return 0, errbuf.err }
func (errbuf *errorBuffer) Write([]byte) (int, error) { return 0, errbuf.err }
func (errbuf *errorBuffer) ReadFrom(io.Reader) (int64, error) { return 0, errbuf.err }
func (errbuf *errorBuffer) WriteTo(io.Writer) (int64, error) { return 0, errbuf.err }
if buf.schema == nil {
return ErrRowGroupSchemaMissing
}

var (
_ io.ReaderFrom = (*errorBuffer)(nil)
_ io.WriterTo = (*errorBuffer)(nil)
)
for _, value := range row {
columnIndex := value.Column()
buf.colbuf[columnIndex] = append(buf.colbuf[columnIndex], value)
}

type lengthPrefixedWriter struct {
writer io.Writer
buffer []byte
}
for columnIndex, values := range buf.colbuf {
if err := buf.columns[columnIndex].WriteRow(values); err != nil {
return err
}
}

func (w *lengthPrefixedWriter) Reset(ww io.Writer) {
w.writer = ww
w.buffer = append(w.buffer[:0], 0, 0, 0, 0)
return nil
}

func (w *lengthPrefixedWriter) Close() error {
if len(w.buffer) > 0 {
defer func() { w.buffer = w.buffer[:0] }()
binary.LittleEndian.PutUint32(w.buffer, uint32(len(w.buffer))-4)
_, err := w.writer.Write(w.buffer)
return err
// WriteRowGroup satisfies the RowGroupWriter interface.
func (buf *Buffer) WriteRowGroup(rowGroup RowGroup) (int64, error) {
rowGroupSchema := rowGroup.Schema()
switch {
case rowGroupSchema == nil:
return 0, ErrRowGroupSchemaMissing
case buf.schema == nil:
buf.configure(rowGroupSchema)
case !nodesAreEqual(buf.schema, rowGroupSchema):
return 0, ErrRowGroupSchemaMismatch
}
return nil
if !sortingColumnsHavePrefix(rowGroup.SortingColumns(), buf.SortingColumns()) {
return 0, ErrRowGroupSortingColumnsMismatch
}
n := buf.NumRows()
_, err := CopyRows(bufferWriter{buf}, rowGroup.Rows())
return int64(buf.NumRows() - n), err
}

// Rows returns a reader exposing the current content of the buffer.
//
// The buffer and the returned reader share memory. Mutating the buffer
// concurrently to reading rows may result in non-deterministic behavior.
func (buf *Buffer) Rows() RowReader { return &rowGroupRowReader{rowGroup: buf} }

// bufferWriter is an adapter for Buffer which implements both RowWriter and
// PageWriter to enable optimizations in CopyRows for types that support writing
// rows by copying whole pages instead of calling WriteRow repeatedly.
type bufferWriter struct{ buf *Buffer }

func (w bufferWriter) WriteRow(row Row) error {
return w.buf.WriteRow(row)
}

func (w *lengthPrefixedWriter) Write(b []byte) (int, error) {
w.buffer = append(w.buffer, b...)
return len(b), nil
func (w bufferWriter) WriteValues(values []Value) (int, error) {
return w.buf.columns[values[0].Column()].WriteValues(values)
}

func (w bufferWriter) WritePage(page Page) (int64, error) {
return CopyValues(w.buf.columns[page.Column()], page.Values())
}

var (
_ RowGroup = (*Buffer)(nil)
_ RowGroupWriter = (*Buffer)(nil)
_ sort.Interface = (*Buffer)(nil)

_ RowWriter = (*bufferWriter)(nil)
_ PageWriter = (*bufferWriter)(nil)
_ ValueWriter = (*bufferWriter)(nil)
)