Skip to content

Commit

Permalink
fix: Peek OOM and performance issue (#335)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaost authored Jun 7, 2024
1 parent 3e411b1 commit c4ec256
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 31 deletions.
76 changes: 56 additions & 20 deletions nocopy_linkbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,12 @@ type UnsafeLinkBuffer struct {
flush *linkBufferNode // malloc head
write *linkBufferNode // malloc tail

caches [][]byte // buf allocated by Next when cross-package, which should be freed when release
// buf allocated by Next when cross-package, which should be freed when release
caches [][]byte

// for `Peek` only, avoid creating too many []byte in `caches`
// fix the issue when we have a large buffer and we call `Peek` multiple times
cachePeek []byte
}

// Len implements Reader.
Expand Down Expand Up @@ -124,28 +129,50 @@ func (b *UnsafeLinkBuffer) Peek(n int) (p []byte, err error) {
if b.isSingleNode(n) {
return b.read.Peek(n), nil
}

// multiple nodes
var pIdx int
if block1k < n && n <= mallocMax {
p = malloc(n, n)
b.caches = append(b.caches, p)
} else {
p = dirtmake.Bytes(n, n)
}
var node = b.read
var l int
for ack := n; ack > 0; ack = ack - l {
l = node.Len()
if l >= ack {
pIdx += copy(p[pIdx:], node.Peek(ack))
break
} else if l > 0 {
pIdx += copy(p[pIdx:], node.Peek(l))

// try to make use of the cap of b.cachePeek, if can't, free it.
if b.cachePeek != nil && cap(b.cachePeek) < n {
free(b.cachePeek)
b.cachePeek = nil
}
if b.cachePeek == nil {
b.cachePeek = malloc(0, n) // init with zero len, will append later
}
p = b.cachePeek
if len(p) >= n {
// in case we peek smaller than last time,
// we can return cache data directly.
// we will reset cachePeek when Next or Skip, no worries about stale data
return p[:n], nil
}

// How it works >>>>>>
// [ -------- node0 -------- ][ --------- node1 --------- ] <- b.read
// [ --------------- p --------------- ]
// ^ len(p) ^ n here
// ^ scanned
// `scanned` var is the len of last nodes which we scanned and already copied to p
// `len(p) - scanned` is the start pos of current node for p to copy from
// `n - len(p)` is the len of bytes we're going to append to p
// we copy `len(node1)` - `len(p) - scanned` bytes in case node1 doesn't have enough data
for scanned, node := 0, b.read; len(p) < n; node = node.next {
l := node.Len()
if scanned+l <= len(p) { // already copied in p, skip
scanned += l
continue
}
node = node.next
start := len(p) - scanned // `start` must be smaller than l coz `scanned+l <= len(p)` is false
copyn := n - len(p)
if nodeLeftN := l - start; copyn > nodeLeftN {
copyn = nodeLeftN
}
p = append(p, node.Peek(l)[start:start+copyn]...)
scanned += l
}
_ = pIdx
return p, nil
b.cachePeek = p
return p[:n], nil
}

// Skip implements Reader.
Expand Down Expand Up @@ -187,6 +214,10 @@ func (b *UnsafeLinkBuffer) Release() (err error) {
b.caches[i] = nil
}
b.caches = b.caches[:0]
if b.cachePeek != nil {
free(b.cachePeek)
b.cachePeek = nil
}
return nil
}

Expand Down Expand Up @@ -692,6 +723,11 @@ func (b *UnsafeLinkBuffer) indexByte(c byte, skip int) int {

// recalLen re-calculate the length
func (b *UnsafeLinkBuffer) recalLen(delta int) (length int) {
if delta < 0 && len(b.cachePeek) > 0 {
// b.cachePeek will contain stale data if we read out even a single byte from buffer,
// so we need to reset it or the next Peek call will return invalid bytes.
b.cachePeek = b.cachePeek[:0]
}
return int(atomic.AddInt64(&b.length, int64(delta)))
}

Expand Down
52 changes: 41 additions & 11 deletions nocopy_linkbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestLinkBuffer(t *testing.T) {
Equal(t, buf.Len(), 100)
}

func TestGetBytes(t *testing.T) {
func TestLinkBufferGetBytes(t *testing.T) {
buf := NewLinkBuffer()
var (
num = 10
Expand Down Expand Up @@ -195,8 +195,7 @@ func TestLinkBufferWithInvalid(t *testing.T) {
}
}

// cross-block operation test
func TestLinkBufferIndex(t *testing.T) {
func TestLinkBufferMultiNode(t *testing.T) {
// clean & new
LinkBufferCap = 8

Expand All @@ -206,6 +205,9 @@ func TestLinkBufferIndex(t *testing.T) {
var p []byte

p, _ = buf.Malloc(15)
for i := 0; i < len(p); i++ { // updates p[0] - p[14] to 0 - 14
p[i] = byte(i)
}
Equal(t, len(p), 15)
MustTrue(t, buf.read == buf.flush)
Equal(t, buf.read.off, 0)
Expand All @@ -215,6 +217,9 @@ func TestLinkBufferIndex(t *testing.T) {
Equal(t, cap(buf.write.buf), 16) // mcache up-aligned to the power of 2

p, _ = buf.Malloc(7)
for i := 0; i < len(p); i++ { // updates p[0] - p[6] to 15 - 21
p[i] = byte(i + 15)
}
Equal(t, len(p), 7)
MustTrue(t, buf.read == buf.flush)
Equal(t, buf.read.off, 0)
Expand All @@ -236,19 +241,44 @@ func TestLinkBufferIndex(t *testing.T) {

p, _ = buf.Next(13)
Equal(t, len(p), 13)
Equal(t, p[0], byte(0))
Equal(t, p[12], byte(12))
MustTrue(t, buf.read != buf.flush)
Equal(t, buf.read.off, 13)
Equal(t, buf.read.Len(), 2)
Equal(t, buf.read.next.Len(), 7)
Equal(t, buf.flush.off, 0)
Equal(t, buf.flush.malloc, 7)

// Peek
p, _ = buf.Peek(4)
Equal(t, len(p), 4)
Equal(t, p[0], byte(13))
Equal(t, p[1], byte(14))
Equal(t, p[2], byte(15))
Equal(t, p[3], byte(16))
Equal(t, len(buf.cachePeek), 4)
p, _ = buf.Peek(3) // case: smaller than the last call
Equal(t, len(p), 3)
Equal(t, p[0], byte(13))
Equal(t, p[2], byte(15))
Equal(t, len(buf.cachePeek), 4)
p, _ = buf.Peek(5) // case: Peek than the max call, and cap(buf.cachePeek) < n
Equal(t, len(p), 5)
Equal(t, p[0], byte(13))
Equal(t, p[4], byte(17))
Equal(t, len(buf.cachePeek), 5)
p, _ = buf.Peek(6) // case: Peek than the last call, and cap(buf.cachePeek) > n
Equal(t, len(p), 6)
Equal(t, p[0], byte(13))
Equal(t, p[5], byte(18))
Equal(t, len(buf.cachePeek), 6)
MustTrue(t, buf.read != buf.flush)
Equal(t, buf.read.off, 13)
Equal(t, buf.read.Len(), 2)
Equal(t, buf.flush.off, 0)
Equal(t, buf.flush.malloc, 7)
// Peek ends

buf.book(block8k, block8k)
MustTrue(t, buf.flush == buf.write)
Expand Down Expand Up @@ -377,7 +407,7 @@ func TestLinkBufferResetTail(t *testing.T) {
Equal(t, got, except)
}

func TestWriteBuffer(t *testing.T) {
func TestLinkBufferWriteBuffer(t *testing.T) {
buf1 := NewLinkBuffer()
buf2 := NewLinkBuffer()
b2, _ := buf2.Malloc(1)
Expand Down Expand Up @@ -414,7 +444,7 @@ func TestLinkBufferCheckSingleNode(t *testing.T) {
buf.isSingleNode(1)
}

func TestWriteMultiFlush(t *testing.T) {
func TestLinkBufferWriteMultiFlush(t *testing.T) {
buf := NewLinkBuffer()
b1, _ := buf.Malloc(4)
b1[0] = 1
Expand Down Expand Up @@ -444,7 +474,7 @@ func TestWriteMultiFlush(t *testing.T) {
MustTrue(t, len(buf.Bytes()) == 4)
}

func TestWriteBinary(t *testing.T) {
func TestLinkBufferWriteBinary(t *testing.T) {
// clean & new
LinkBufferCap = 8

Expand All @@ -465,7 +495,7 @@ func TestWriteBinary(t *testing.T) {
Equal(t, b[9], byte(0))
}

func TestWriteDirect(t *testing.T) {
func TestLinkBufferWriteDirect(t *testing.T) {
// clean & new
LinkBufferCap = 32

Expand All @@ -492,7 +522,7 @@ func TestWriteDirect(t *testing.T) {
}
}

func TestNoCopyWriteAndRead(t *testing.T) {
func TestLinkBufferNoCopyWriteAndRead(t *testing.T) {
// [origin_node:4096B] + [data_node:512B] + [new_node:16B] + [normal_node:4096B]
const (
mallocLen = 4096 * 2
Expand Down Expand Up @@ -578,7 +608,7 @@ func TestNoCopyWriteAndRead(t *testing.T) {
runtime.KeepAlive(normalBuf)
}

func TestBufferMode(t *testing.T) {
func TestLinkBufferBufferMode(t *testing.T) {
bufnode := newLinkBufferNode(0)
MustTrue(t, !bufnode.getMode(nocopyReadMask))
MustTrue(t, bufnode.getMode(readonlyMask))
Expand Down Expand Up @@ -726,7 +756,7 @@ func BenchmarkStringToCopy(b *testing.B) {
_ = bs
}

func BenchmarkPoolGet(b *testing.B) {
func BenchmarkLinkBufferPoolGet(b *testing.B) {
var v *linkBufferNode
if false {
b.Logf("bs = %v", v)
Expand Down Expand Up @@ -759,7 +789,7 @@ func BenchmarkCopyString(b *testing.B) {
})
}

func BenchmarkNoCopyRead(b *testing.B) {
func BenchmarkLinkBufferNoCopyRead(b *testing.B) {
totalSize := 0
minSize := 32
maxSize := minSize << 9
Expand Down

0 comments on commit c4ec256

Please sign in to comment.