Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

[O11Y-224] add support for merging parquet files. #45

Merged
merged 146 commits into from Jan 10, 2022
Merged

Conversation

achille-roussel
Copy link
Contributor

@achille-roussel achille-roussel commented Jan 3, 2022

This PR adds APIs to support merging parquet files. At this time, this is still a work in progress but there is already a consequent amount of changes so I am opening the pull request to give visibility into progress and start receiving any feedback needed.

I had to go through a decent refactor of internal and external APIs to introduce the merge operations while trying to maintain a balance of usability and testability in the code. I have also spent time figuring out how we would be able to optimize the merge operations in cases where merging files row-by-row is unnecessary because we have knowledge about the structure of the file which allows to skip the whole decoding/merging/encoding pipeline: for example, if we can determine that a full page of a column can be copied without being decompressed, or maybe a range of a page doesn't need to be reconstructed into a row to be copied to the output, then large amounts of the merge operations can become copy or io.Copy calls.

I based most of the changes on the design of the standard library's io package, but applied to the context of parquet files; instead of dealing with bytes, we work on values, rows, and pages. There are functions like parquet.CopyRows, interface types like parquet.RowReader, parquet.RowWriter, and extensions like parquet.RowReaderTo and parquet.RowWriterTo which mirror the way io.ReaderFrom or io.WriterTo are employed in the standard library to optimize copies between source and destinations that can bypass the intermediary buffering steps.

The parquet.MergeRowGroups is the function that implements the merge operation, which takes a list of parquet.RowGroup as input and returns a new parquet.RowGroup which is a merged view of the inputs. The merged row group can then be written to a new file by copying its rows to a writer, the code ends up looking something like this:

merge, err := parquet.MergeRowGroups(rowGroups)

...

writer := parquet.NewWriter(file)

if _, err := parquet.CopyRows(writer, merge); err != nil {
  ...
}
if err := writer.Close(); err != nil {
  ...
}

Regarding the work that I still have to wrap up on this PR:

  • derive row groups as parquet.RowGroup from a parquet.File so we can merge files, I'm thinking I will probably be able to revisit the internals of parquet.Reader to use these APIs and unexport some of the lower level types
  • write the optimizations to support copying non-interleaving pages directly between the inputs and output, most of the code structure is in place already so this part is ~50% complete
  • add more tests for parquet.MergeRowGroups

More context is available on the Jira ticket if needed, looking forward for any feedback you may have!

file.go Show resolved Hide resolved
page_header.go Show resolved Hide resolved
file.go Outdated Show resolved Hide resolved
@@ -37,10 +42,8 @@ type File struct {
func OpenFile(r io.ReaderAt, size int64, options ...FileOption) (*File, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry one line above this... "does not validate that the pages have valid checksums" (avoids a double negative)

row.go Outdated Show resolved Hide resolved
writer.go Outdated Show resolved Hide resolved
writer.go Outdated Show resolved Hide resolved

length := len(footer)
footer = append(footer, 0, 0, 0, 0)
footer = append(footer, "PAR1"...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We reference this string in a few places, should we pull it into a constant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on the fence, I feel like adding indirection always has a readability cost, for example this code in file.go:

    if _, err := r.ReadAt(b[:4], 0); err != nil {
        return nil, fmt.Errorf("reading magic header of parquet file: %w", err)
    }
    if string(b[:4]) != "PAR1" {
        return nil, fmt.Errorf("invalid magic header of parquet file: %q", b[:4])
    }

    if _, err := r.ReadAt(b[:8], size-8); err != nil {
        return nil, fmt.Errorf("reading magic footer of parquet file: %w", err)
    }
    if string(b[4:8]) != "PAR1" {
        return nil, fmt.Errorf("invalid magic footer of parquet file: %q", b[4:8])
    }

With a constant it changes to:

    if _, err := r.ReadAt(b[:4], 0); err != nil {
        return nil, fmt.Errorf("reading magic header of parquet file: %w", err)
    }
    if string(b[:4]) != magicHeader {
        return nil, fmt.Errorf("invalid magic header of parquet file: %q", b[:4])
    }

    if _, err := r.ReadAt(b[:8], size-8); err != nil {
        return nil, fmt.Errorf("reading magic footer of parquet file: %w", err)
    }
    if string(b[4:8]) != magicHeader {
        return nil, fmt.Errorf("invalid magic footer of parquet file: %q", b[4:8])
    }

But now there's a more implicit relation between the use of 4 and 8 and the value of magicHeader; it isn't clear by simply reading at the file that the magic header is 4 bytes. The right change would probably also introduce more constants for the lengths, but then it gets a lot dirtier:

    if _, err := r.ReadAt(b[:magicHeaderSize], 0); err != nil {
        return nil, fmt.Errorf("reading magic header of parquet file: %w", err)
    }
    if string(b[:magicHeaderSize]) != magicHeader {
        return nil, fmt.Errorf("invalid magic header of parquet file: %q", b[:magicHeaderSize])
    }

    if _, err := r.ReadAt(b[:magicHeaderSize+footerLengthSize], size-(magicHeaderSize+footerLengthSize)); err != nil {
        return nil, fmt.Errorf("reading magic footer of parquet file: %w", err)
    }
    if string(b[footerLengthSize:footerLengthSize+magicHeaderSize]) != magicHeader {
        return nil, fmt.Errorf("invalid magic footer of parquet file: %q", b[footerLengthSize:footerLengthSize+magicHeaderSize])
    }

We can add local variables or functions to make things easier to follow, but then are we really helping the reader by increasing the amount of state they have to reason with to understand the code?

I've been thinking the inlined strings are clear enough, their use is well tested, and there is no need to export them, so using literals seemed like the best option.

column_buffer.go Outdated Show resolved Hide resolved
column_buffer.go Show resolved Hide resolved
Achille and others added 7 commits January 6, 2022 10:03
Co-authored-by: Kevin Burke <kevin.burke@segment.com>
Co-authored-by: Kevin Burke <kevin.burke@segment.com>
Co-authored-by: Kevin Burke <kevin.burke@segment.com>
Co-authored-by: Kevin Burke <kevin.burke@segment.com>
Co-authored-by: Kevin Burke <kevin.burke@segment.com>
@achille-roussel
Copy link
Contributor Author

I think enough time has been spent on this PR for now, the exported APIs are solid, I will leave the remaining optimizations for follow up work so we can start using the new code.

buffer.go Outdated Show resolved Hide resolved
buffer.go Outdated Show resolved Hide resolved
@@ -528,6 +528,7 @@ func (index uint64ColumnIndexer) ColumnIndex() ColumnIndex {
}

func splitFixedLenByteArrayList(size int, data []byte) [][]byte {
data = copyBytes(data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what this line is doing, it seems like it's copying the byte array into itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It copies the input byte slice and assigns the result to the data variable.

Achille and others added 2 commits January 10, 2022 11:11
Co-authored-by: Kevin Burke <kevin.burke@segment.com>
Co-authored-by: Kevin Burke <kevin.burke@segment.com>
page.go Outdated Show resolved Hide resolved
Achille and others added 2 commits January 10, 2022 11:19
Co-authored-by: Kevin Burke <kevin.burke@segment.com>
Co-authored-by: Kevin Burke <kevin.burke@segment.com>
reader.go Outdated Show resolved Hide resolved
Co-authored-by: Kevin Burke <kevin.burke@segment.com>
_ RowWriterTo = (*rowGroupRowReader)(nil)

_ RowReaderWithSchema = (*mergedRowGroupRowReader)(nil)
//_ RowWriterTo = (*mergedRowGroupRowReader)(nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mistake?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because WriteRowsTo is commented out, this type does not yet implement the interface.

@@ -14,6 +14,11 @@ import (
"github.com/segmentio/parquet/deprecated"
)

const (
// 170 x sizeof(Value) = 4KB
defaultValueBufferSize = 170
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a test for this? If sizeof(Value)*defaultValueBufferSize exceeds 4kb, fail the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I don't know if failing a test is necessarily the right tool here, the code would still work if the size of Value was not 24 bytes, the buffer size would just be a different size. The comment here is just to hint at how this value was chosen.

One problem with changing it to 4096 / unsafe.Sizeof(Value{}) is it would result in a constant of type uintptr, instead of having an untyped literal, which would require type conversions when used in arithmetic operations with other integer types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I guess performance might slip if sizeof(Value)*170 was larger than 4096 right? The goal of a test would be to say to the developer, "hey you should change the value by hand" instead of just letting the buffer get too large.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only value that we don't set the size of explicitly in parquet.Value is the pointer, everything else is fixed sized and laid out to avoid padding between fields; the only two possible sizes are 20 and 24 bytes on 32 and 64 bits architectures.

There could be performance differences from the way the compiler is going to represent copying 20 bytes values, but I don't think we would see a difference on I/O operations since we are defining the number of values in a buffer, not the size in bytes; code that reads values into buffers of size defaultValueBufferSize would use the same amount of I/O operations on any platform.

Copy link
Contributor

@kevinburkesegment kevinburkesegment left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok - I've tried my best to give a good review, obviously this is a pretty large change. I'm interested to see how this does with some of our production datasets!

My other comment would be that it would be nice to have a example_test.go with examples of the most common use cases we'd expect - for example, reading data from Parquet files into memory and iterating over it, or writing Parquet data to files.

@achille-roussel
Copy link
Contributor Author

achille-roussel commented Jan 10, 2022

Examples are being worked on in https://github.com/segmentio/parquet/tree/O11Y-211

Thanks a lot for the thorough review Kevin!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants