Skip to content

Commit b8bb792

Browse files
authored
Merge pull request #782 from steveyen/scorch-intcoder-optimizations
Various scorch optimizations around merge & chunkedIntCoder
2 parents eca31df + 99ed127 commit b8bb792

File tree

5 files changed

+28
-30
lines changed

5 files changed

+28
-30
lines changed

index/scorch/introducer.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,14 +130,14 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
130130
segment: s.root.segment[i].segment,
131131
cachedDocs: s.root.segment[i].cachedDocs,
132132
}
133-
133+
134134
// apply new obsoletions
135135
if s.root.segment[i].deleted == nil {
136136
newss.deleted = delta
137137
} else {
138138
newss.deleted = roaring.Or(s.root.segment[i].deleted, delta)
139139
}
140-
140+
141141
// check for live size before copying
142142
if newss.LiveSize() > 0 {
143143
newSnapshot.segment = append(newSnapshot.segment, newss)
@@ -241,7 +241,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
241241
// the root segments would be the obsolete segment set
242242
delete(nextMerge.old, segmentID)
243243

244-
} else if s.root.segment[i].LiveSize() > 0 {
244+
} else if s.root.segment[i].LiveSize() > 0 {
245245
// this segment is staying
246246
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
247247
id: s.root.segment[i].id,
@@ -269,7 +269,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
269269
}
270270
}
271271
}
272-
// In case where all the docs in the newly merged segment getting
272+
// In case where all the docs in the newly merged segment getting
273273
// deleted by the time we reach here, can skip the introduction.
274274
if nextMerge.new != nil &&
275275
nextMerge.new.Count() > newSegmentDeleted.GetCardinality() {

index/scorch/merge.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ package scorch
1616

1717
import (
1818
"bytes"
19-
"encoding/json"
19+
"encoding/json"
2020

2121
"fmt"
2222
"os"

index/scorch/persister.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ OUTER:
6868
persistWatchers = s.pausePersisterForMergerCatchUp(lastPersistedEpoch,
6969
&lastMergedEpoch, persistWatchers)
7070

71-
7271
var ourSnapshot *IndexSnapshot
7372
var ourPersisted []chan error
7473

index/scorch/segment/zap/intcoder.go

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ type chunkedIntCoder struct {
3030
encoder *govarint.Base128Encoder
3131
chunkLens []uint64
3232
currChunk uint64
33+
34+
buf []byte
3335
}
3436

3537
// newChunkedIntCoder returns a new chunk int coder which packs data into
@@ -67,12 +69,8 @@ func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error {
6769
// starting a new chunk
6870
if c.encoder != nil {
6971
// close out last
70-
c.encoder.Close()
71-
encodingBytes := c.chunkBuf.Bytes()
72-
c.chunkLens[c.currChunk] = uint64(len(encodingBytes))
73-
c.final = append(c.final, encodingBytes...)
72+
c.Close()
7473
c.chunkBuf.Reset()
75-
c.encoder = govarint.NewU64Base128Encoder(&c.chunkBuf)
7674
}
7775
c.currChunk = chunk
7876
}
@@ -98,26 +96,25 @@ func (c *chunkedIntCoder) Close() {
9896

9997
// Write commits all the encoded chunked integers to the provided writer.
10098
func (c *chunkedIntCoder) Write(w io.Writer) (int, error) {
101-
var tw int
102-
buf := make([]byte, binary.MaxVarintLen64)
103-
// write out the number of chunks
99+
bufNeeded := binary.MaxVarintLen64 * (1 + len(c.chunkLens))
100+
if len(c.buf) < bufNeeded {
101+
c.buf = make([]byte, bufNeeded)
102+
}
103+
buf := c.buf
104+
105+
// write out the number of chunks & each chunkLen
104106
n := binary.PutUvarint(buf, uint64(len(c.chunkLens)))
105-
nw, err := w.Write(buf[:n])
106-
tw += nw
107+
for _, chunkLen := range c.chunkLens {
108+
n += binary.PutUvarint(buf[n:], uint64(chunkLen))
109+
}
110+
111+
tw, err := w.Write(buf[:n])
107112
if err != nil {
108113
return tw, err
109114
}
110-
// write out the chunk lens
111-
for _, chunkLen := range c.chunkLens {
112-
n := binary.PutUvarint(buf, uint64(chunkLen))
113-
nw, err = w.Write(buf[:n])
114-
tw += nw
115-
if err != nil {
116-
return tw, err
117-
}
118-
}
115+
119116
// write out the data
120-
nw, err = w.Write(c.final)
117+
nw, err := w.Write(c.final)
121118
tw += nw
122119
if err != nil {
123120
return tw, err

index/scorch/segment/zap/merge.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,9 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
170170
rv := make([]uint64, len(fieldsInv))
171171
fieldDvLocs := make([]uint64, len(fieldsInv))
172172

173+
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
174+
locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
175+
173176
// docTermMap is keyed by docNum, where the array impl provides
174177
// better memory usage behavior than a sparse-friendlier hashmap
175178
// for when docs have much structural similarity (i.e., every doc
@@ -227,9 +230,6 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
227230
newRoaring := roaring.NewBitmap()
228231
newRoaringLocs := roaring.NewBitmap()
229232

230-
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
231-
locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
232-
233233
finishTerm := func(term []byte) error {
234234
if term == nil {
235235
return nil
@@ -316,10 +316,12 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
316316
return nil, 0, err2
317317
}
318318

319+
newDocNumsI := newDocNums[itrI]
320+
319321
postItr = postings.iterator(postItr)
320322
next, err2 := postItr.Next()
321323
for next != nil && err2 == nil {
322-
hitNewDocNum := newDocNums[itrI][next.Number()]
324+
hitNewDocNum := newDocNumsI[next.Number()]
323325
if hitNewDocNum == docDropped {
324326
return nil, 0, fmt.Errorf("see hit with dropped doc num")
325327
}

0 commit comments

Comments
 (0)