Skip to content

Commit

Permalink
Merge pull request #13 from parsyl/gzip
Browse files Browse the repository at this point in the history
added gzip compression
  • Loading branch information
cswank authored Jun 18, 2021
2 parents 8be0fe2 + f68aa5f commit 59fa1fd
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 12 deletions.
77 changes: 66 additions & 11 deletions fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package parquet

import (
"bytes"
"compress/gzip"
"math/bits"
"strings"

Expand Down Expand Up @@ -39,6 +40,12 @@ func RequiredFieldSnappy(r *RequiredField) {
r.compression = sch.CompressionCodec_SNAPPY
}

// RequiredFieldGzip sets the compression for a column to gzip
// It is an optional arg to NewRequiredField
func RequiredFieldGzip(r *RequiredField) {
r.compression = sch.CompressionCodec_GZIP
}

// RequiredFieldUncompressed sets the compression to none
// It is an optional arg to NewRequiredField
func RequiredFieldUncompressed(r *RequiredField) {
Expand All @@ -47,12 +54,16 @@ func RequiredFieldUncompressed(r *RequiredField) {

// DoWrite writes the actual raw data.
func (f *RequiredField) DoWrite(w io.Writer, meta *Metadata, vals []byte, count int, stats Stats) error {
l, cl, vals := compress(f.compression, vals)
l, cl, vals, err := compress(f.compression, vals)
if err != nil {
return err
}

if err := meta.WritePageHeader(w, f.pth, l, cl, count, count, 0, 0, f.compression, stats); err != nil {
return err
}

_, err := w.Write(vals)
_, err = w.Write(vals)
return err
}

Expand Down Expand Up @@ -145,6 +156,12 @@ func OptionalFieldSnappy(r *OptionalField) {
r.compression = sch.CompressionCodec_SNAPPY
}

// OptionalFieldGzip sets the compression for a column to gzip
// It is an optional arg to NewOptionalField
func OptionalFieldGzip(r *OptionalField) {
r.compression = sch.CompressionCodec_GZIP
}

// OptionalFieldUncompressed sets the compression to none
// It is an optional arg to NewOptionalField
func OptionalFieldUncompressed(o *OptionalField) {
Expand Down Expand Up @@ -189,7 +206,11 @@ func (f *OptionalField) DoWrite(w io.Writer, meta *Metadata, vals []byte, count
repLen := wc.n - defLen

wc.Write(vals)
l, cl, vals := compress(f.compression, buf.Bytes())
l, cl, vals, err := compress(f.compression, buf.Bytes())
if err != nil {
return err
}

if err := meta.WritePageHeader(w, f.pth, l, cl, len(f.Defs), count, defLen, repLen, f.compression, stats); err != nil {
return err
}
Expand Down Expand Up @@ -293,6 +314,26 @@ func pageData(r io.Reader, ph *sch.PageHeader, pg Page) ([]byte, error) {
if err != nil {
return nil, err
}
case sch.CompressionCodec_GZIP:
var buf bytes.Buffer
_, err := io.CopyN(&buf, r, int64(ph.CompressedPageSize))
if err != nil {
return nil, err
}

zr, err := gzip.NewReader(&buf)
if err != nil {
return nil, err
}

data, err = io.ReadAll(zr)
if err != nil {
return nil, err
}

if err := zr.Close(); err != nil {
return nil, err
}
case sch.CompressionCodec_UNCOMPRESSED:
data = make([]byte, ph.UncompressedPageSize)
if _, err := r.Read(data); err != nil {
Expand All @@ -305,18 +346,32 @@ func pageData(r io.Reader, ph *sch.PageHeader, pg Page) ([]byte, error) {
return data, nil
}

func compress(codec sch.CompressionCodec, vals []byte) (int, int, []byte) {
var l, cl int
func compress(codec sch.CompressionCodec, vals []byte) (int, int, []byte, error) {
var err error
l := len(vals)
switch codec {
case sch.CompressionCodec_SNAPPY:
l = len(vals)
vals = snappy.Encode(nil, vals)
cl = len(vals)
case sch.CompressionCodec_UNCOMPRESSED:
l = len(vals)
cl = len(vals)
case sch.CompressionCodec_GZIP:
var buf bytes.Buffer
zw, err := gzip.NewWriterLevel(&buf, gzip.BestSpeed)
if err != nil {
return l, 0, vals, err
}

_, err = zw.Write(vals)
if err != nil {
return l, 0, vals, err
}

err = zw.Close()
if err != nil {
return l, 0, vals, err
}

vals = buf.Bytes()
}
return l, cl, vals
return l, len(vals), vals, err
}

// writeLevels writes vals to w as RLE/bitpack encoded data
Expand Down
10 changes: 10 additions & 0 deletions internal/gen/template.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions parquet_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func TestParquet(t *testing.T) {
}

for i, tc := range testCases {
for j, comp := range []string{"uncompressed", "snappy"} {
for j, comp := range []string{"uncompressed", "snappy", "gzip"} {
t.Run(fmt.Sprintf("%02d %s %s", 2*i+j, tc.name, comp), func(t *testing.T) {
if tc.pageSize == 0 {
tc.pageSize = 100
Expand Down Expand Up @@ -760,6 +760,7 @@ func getPageHeaders(r io.ReadSeeker, name string, footer *sch.FileMetaData) ([]s
var compressionTest = map[string]func(*ParquetWriter) error{
"uncompressed": Uncompressed,
"snappy": Snappy,
"gzip": Gzip,
}

func getLen(peeps [][]Person) int {
Expand Down

0 comments on commit 59fa1fd

Please sign in to comment.