Skip to content

Commit

Permalink
Fixed bug in the handling of column chunks and boolean fields
Browse files Browse the repository at this point in the history
  • Loading branch information
Craig Swank authored and Craig Swank committed Feb 14, 2019
1 parent 7ce3dd1 commit 82775d4
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 47 deletions.
5 changes: 3 additions & 2 deletions cmd/parquetgen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,8 @@ func (f *BoolField) Read(r io.ReadSeeker, meta *parquet.Metadata, pos parquet.Po
return err
}
f.vals, err = parquet.GetBools(rr, int(pos.N))
vals, err := parquet.GetBools(rr, int(pos.N), pos.N)
f.vals = append(f.vals, vals...)
return err
}
{{end}}`
Expand Down Expand Up @@ -491,7 +492,7 @@ func (f *BoolOptionalField) Read(r io.ReadSeeker, meta *parquet.Metadata, pos pa
return err
}
v, err := parquet.GetBools(rr, f.Values()-len(f.vals))
v, err := parquet.GetBools(rr, f.Values()-len(f.vals), pos.N)
f.vals = append(f.vals, v...)
return err
}
Expand Down
98 changes: 56 additions & 42 deletions parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func New(fields ...Field) *Metadata {
func (m *Metadata) StartRowGroup(fields ...Field) {
m.rowGroups = append(m.rowGroups, RowGroup{
fields: schemaElements(fields),
columns: make(map[string]sch.ColumnChunk),
columns: make(map[string][]sch.ColumnChunk),
})
}

Expand Down Expand Up @@ -100,7 +100,6 @@ func (m *Metadata) updateRowGroup(col string, dataLen, compressedLen, headerLen,
}

rg := m.rowGroups[i-1]

rg.rowGroup.NumRows += int64(count)
err := rg.updateColumnChunk(col, dataLen+headerLen, compressedLen+headerLen, count, m.schema)
m.rowGroups[i-1] = rg
Expand Down Expand Up @@ -142,22 +141,37 @@ func (m *Metadata) Footer(w io.Writer) error {
continue
}

ch, ok := mrg.columns[col.Name]
chs, ok := mrg.columns[col.Name]

if !ok {
return fmt.Errorf("unknown column %s", col.Name)
}

ch.FileOffset = pos
ch.MetaData.DataPageOffset = pos
rg.TotalByteSize += ch.MetaData.TotalCompressedSize
rg.Columns = append(rg.Columns, &ch)
pos += ch.MetaData.TotalCompressedSize

for _, ch := range chs {
ch.FileOffset = pos
ch.MetaData.DataPageOffset = pos
rg.TotalByteSize += ch.MetaData.TotalCompressedSize
rg.Columns = append(rg.Columns, &sch.ColumnChunk{
FilePath: ch.FilePath,
FileOffset: ch.FileOffset,
MetaData: &sch.ColumnMetaData{
Type: ch.MetaData.Type,
NumValues: ch.MetaData.NumValues,
PathInSchema: ch.MetaData.PathInSchema,
TotalUncompressedSize: ch.MetaData.TotalUncompressedSize,
TotalCompressedSize: ch.MetaData.TotalCompressedSize,
},
})
pos += ch.MetaData.TotalCompressedSize
}
}

rg.NumRows = rg.NumRows / int64(len(mrg.fields.schema)-1)
f.RowGroups = append(f.RowGroups, &rg)
f.RowGroups = append(f.RowGroups, &sch.RowGroup{
Columns: rg.Columns,
TotalByteSize: rg.TotalByteSize,
NumRows: rg.NumRows,
})
}

buf, err := m.ts.Write(context.TODO(), f)
Expand All @@ -176,7 +190,7 @@ func (m *Metadata) Footer(w io.Writer) error {
type RowGroup struct {
fields schema
rowGroup sch.RowGroup
columns map[string]sch.ColumnChunk
columns map[string][]sch.ColumnChunk
child *RowGroup

Rows int64
Expand All @@ -187,28 +201,25 @@ func (r *RowGroup) Columns() []*sch.ColumnChunk {
}

func (r *RowGroup) updateColumnChunk(col string, dataLen, compressedLen, count int, fields schema) error {
ch, ok := r.columns[col]
if !ok {
t, err := columnType(col, fields)
if err != nil {
return err
}

ch = sch.ColumnChunk{
MetaData: &sch.ColumnMetaData{
Type: t,
Encodings: []sch.Encoding{sch.Encoding_PLAIN},
PathInSchema: []string{col},
Codec: sch.CompressionCodec_SNAPPY,
},
}
chs := r.columns[col]
t, err := columnType(col, fields)
if err != nil {
return err
}

ch.MetaData.NumValues += int64(count)
ch := sch.ColumnChunk{
MetaData: &sch.ColumnMetaData{
NumValues: int64(count),
Type: t,
TotalUncompressedSize: int64(dataLen),
TotalCompressedSize: int64(compressedLen),
Encodings: []sch.Encoding{sch.Encoding_PLAIN},
PathInSchema: []string{col},
Codec: sch.CompressionCodec_SNAPPY,
},
}

ch.MetaData.TotalUncompressedSize += int64(dataLen)
ch.MetaData.TotalCompressedSize += int64(compressedLen)
r.columns[col] = ch
r.columns[col] = append(chs, ch)
return nil
}

Expand Down Expand Up @@ -355,18 +366,21 @@ func StringType(se *sch.SchemaElement) {
se.Type = &t
}

func GetBools(r io.Reader, n int) ([]bool, error) {
func GetBools(r io.Reader, n, pageSize int) ([]bool, error) {
if n == 0 {
return []bool{}, nil
}

var index int
var vals [8]uint32
data, _ := ioutil.ReadAll(r)
out := make([]bool, n)

for i := 0; i < n; i++ {
if index == 0 {
if len(data) == 0 {
return nil, errors.New("not enough data to decode all values")
}
vals = unpack8uint32(data[:1])
vals = unpack8uint32(data[0])
data = data[1:]
}
out[i] = vals[index] == 1
Expand All @@ -375,15 +389,15 @@ func GetBools(r io.Reader, n int) ([]bool, error) {
return out, nil
}

func unpack8uint32(data []byte) [8]uint32 {
func unpack8uint32(data byte) [8]uint32 {
var a [8]uint32
a[0] = uint32((data[0]>>0)&1) << 0
a[1] = uint32((data[0]>>1)&1) << 0
a[2] = uint32((data[0]>>2)&1) << 0
a[3] = uint32((data[0]>>3)&1) << 0
a[4] = uint32((data[0]>>4)&1) << 0
a[5] = uint32((data[0]>>5)&1) << 0
a[6] = uint32((data[0]>>6)&1) << 0
a[7] = uint32((data[0]>>7)&1) << 0
a[0] = uint32((data >> 0) & 1)
a[1] = uint32((data >> 1) & 1)
a[2] = uint32((data >> 2) & 1)
a[3] = uint32((data >> 3) & 1)
a[4] = uint32((data >> 4) & 1)
a[5] = uint32((data >> 5) & 1)
a[6] = uint32((data >> 6) & 1)
a[7] = uint32((data >> 7) & 1)
return a
}
5 changes: 3 additions & 2 deletions parquet_generated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ func (f *BoolOptionalField) Read(r io.ReadSeeker, meta *parquet.Metadata, pos pa
return err
}

v, err := parquet.GetBools(rr, f.Values()-len(f.vals))
v, err := parquet.GetBools(rr, f.Values()-len(f.vals), pos.N)
f.vals = append(f.vals, v...)
return err
}
Expand Down Expand Up @@ -972,6 +972,7 @@ func (f *BoolField) Read(r io.ReadSeeker, meta *parquet.Metadata, pos parquet.Po
return err
}

f.vals, err = parquet.GetBools(rr, int(pos.N))
vals, err := parquet.GetBools(rr, int(pos.N), pos.N)
f.vals = append(f.vals, vals...)
return err
}
117 changes: 116 additions & 1 deletion parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestParquet(t *testing.T) {
input [][]Person
//if expected is nil then input is used for the assertions
expected [][]Person
pageSize int
}

testCases := []testCase{
Expand Down Expand Up @@ -79,6 +80,44 @@ func TestParquet(t *testing.T) {
},
},
},
{
name: "multiple people multiple row groups small page size",
pageSize: 2,
input: [][]Person{
{
{Being: Being{ID: 1, Age: pint32(-10)}},
{Happiness: 55},
{Sadness: pint64(1)},
{Code: pstring("10!01")},
{Funkiness: 0.2},
},
{
{Lameness: pfloat32(-0.4)},
{Keen: pbool(true)},
{Birthday: 55},
{Anniversary: puint64(1010010)},
{Secret: "hush hush"},
{Keen: pbool(false)},
{Sleepy: true},
},
},
expected: [][]Person{
{
{Being: Being{ID: 1, Age: pint32(-10)}},
{Happiness: 55},
{Sadness: pint64(1)},
{Code: pstring("10!01")},
{Funkiness: 0.2},
{Lameness: pfloat32(-0.4)},
{Keen: pbool(true)},
{Birthday: 55},
{Anniversary: puint64(1010010)},
{Secret: ""},
{Keen: pbool(false)},
{Sleepy: true},
},
},
},
{
name: "boolean optional",
input: [][]Person{
Expand All @@ -91,6 +130,19 @@ func TestParquet(t *testing.T) {
},
},
},
{
name: "boolean optional small page size",
pageSize: 2,
input: [][]Person{
{
{Keen: nil},
{Keen: pbool(true)},
{Keen: nil},
{Keen: pbool(false)},
{Keen: nil},
},
},
},
{
name: "boolean optional multiple row groups",
input: [][]Person{
Expand All @@ -110,11 +162,74 @@ func TestParquet(t *testing.T) {
},
},
},
{
name: "boolean optional multiple row groups small page size",
pageSize: 2,
input: [][]Person{
{
{Keen: nil},
{Keen: pbool(true)},
{Keen: nil},
{Keen: pbool(false)},
{Keen: nil},
},
{
{Keen: pbool(true)},
{Keen: nil},
{Keen: pbool(false)},
{Keen: nil},
{Keen: pbool(true)},
},
},
},
{
name: "boolean multiple row groups small page size",
pageSize: 2,
input: [][]Person{
{
{Sleepy: false},
{Sleepy: true},
{Sleepy: true},
{Sleepy: false},
{Sleepy: true},
},
{
{Sleepy: true},
{Sleepy: true},
{Sleepy: false},
{Sleepy: true},
{Sleepy: true},
},
},
},
{
name: "optional string multiple row groups small page size",
pageSize: 2,
input: [][]Person{
{
{Code: pstring("a")},
{Code: pstring("b")},
{Code: pstring("c")},
{Code: pstring("d")},
{Code: pstring("e")},
},
{
{Code: pstring("f")},
{Code: pstring("g")},
{Code: pstring("h")},
{Code: pstring("i")},
{Code: pstring("j")},
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.pageSize == 0 {
tc.pageSize = 100
}
var buf bytes.Buffer
w, err := NewParquetWriter(&buf)
w, err := NewParquetWriter(&buf, MaxPageSize(tc.pageSize))
assert.Nil(t, err, tc.name)
for _, rowgroup := range tc.input {
for _, p := range rowgroup {
Expand Down

0 comments on commit 82775d4

Please sign in to comment.