Skip to content

Commit

Permalink
mempool: change []byte to *[]byte to optimize gc
Browse files Browse the repository at this point in the history
  • Loading branch information
lesismal committed Dec 31, 2024
1 parent f3c9a6a commit 03a5bbf
Show file tree
Hide file tree
Showing 20 changed files with 515 additions and 437 deletions.
28 changes: 15 additions & 13 deletions conn_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ func (c *Conn) newToWriteBuf(buf []byte) {
allocator := c.p.g.BodyAllocator
appendBuffer := func() {
t := &toWrite{} // poolToWrite.New().(*toWrite)
b := allocator.Malloc(len(buf))
copy(b, buf)
t.buf = b
pbuf := allocator.Malloc(len(buf))
copy(*pbuf, buf)
t.buf = pbuf
c.writeList = append(c.writeList, t)
}

Expand All @@ -52,15 +52,16 @@ func (c *Conn) newToWriteBuf(buf []byte) {
appendBuffer()
} else {
l := len(buf)
tailLen := len(tail.buf)
tailLen := len(*tail.buf)
if tailLen+l > maxWriteCacheOrFlushSize {
appendBuffer()
} else {
if cap(tail.buf) < tailLen+l {
b := allocator.Malloc(tailLen + l)[:tailLen]
copy(b, tail.buf)
if cap(*tail.buf) < tailLen+l {
pbuf := allocator.Malloc(tailLen + l)
*pbuf = (*pbuf)[:tailLen]
copy(*pbuf, *tail.buf)
allocator.Free(tail.buf)
tail.buf = b
tail.buf = pbuf
}
tail.buf = allocator.Append(tail.buf, buf...)
}
Expand Down Expand Up @@ -91,10 +92,10 @@ func (c *Conn) releaseToWrite(t *toWrite) {
const maxWriteCacheOrFlushSize = 1024 * 64

type toWrite struct {
fd int // file descriptor, used for sendfile
buf []byte // buffer to write
offset int64 // buffer or file offset
remain int64 // buffer or file remain bytes
fd int // file descriptor, used for sendfile
buf *[]byte // buffer to write
offset int64 // buffer or file offset
remain int64 // buffer or file remain bytes
}

// Conn implements net.Conn with non-blocking interfaces.
Expand Down Expand Up @@ -881,7 +882,7 @@ func (c *Conn) flush() error {
// }
writeBuffer := func() error {
head := c.writeList[0]
buf := head.buf[head.offset:]
buf := (*head.buf)[head.offset:]
n, err := syscall.Write(c.fd, buf)
if n > 0 {
if c.p.g.onWrittenSize != nil {
Expand Down Expand Up @@ -910,6 +911,7 @@ func (c *Conn) flush() error {
v.offset += int64(n)
if v.remain <= 0 {
c.releaseToWrite(c.writeList[0])
c.writeList[0] = nil
c.writeList = c.writeList[1:]
}
}
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,4 @@ module github.com/lesismal/nbio

go 1.16

require github.com/lesismal/llib v1.1.13

retract v1.5.4 // Contains body length parsing bug.
require github.com/lesismal/llib v1.2.0
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/lesismal/llib v1.1.13 h1:+w1+t0PykXpj2dXQck0+p6vdC9/mnbEXHgUy/HXDGfE=
github.com/lesismal/llib v1.1.13/go.mod h1:70tFXXe7P1FZ02AU9l8LgSOK7d7sRrpnkUr3rd3gKSg=
github.com/lesismal/llib v1.2.0 h1:76mtWL87Y2XTYSoBXNFMBmUZY6igHbQZW48c0gx32Hc=
github.com/lesismal/llib v1.2.0/go.mod h1:70tFXXe7P1FZ02AU9l8LgSOK7d7sRrpnkUr3rd3gKSg=
golang.org/x/crypto v0.0.0-20210513122933-cd7d49e622d5 h1:N6Jp/LCiEoIBX56BZSR2bepK5GtbSC2DDOYT742mMfE=
golang.org/x/crypto v0.0.0-20210513122933-cd7d49e622d5/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
Expand Down
49 changes: 26 additions & 23 deletions mempool/aligned_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type AlignedAllocator struct {
// Malloc .
//
//go:norace
func (amp *AlignedAllocator) Malloc(size int) []byte {
func (amp *AlignedAllocator) Malloc(size int) *[]byte {
if size < 0 {
return nil
}
Expand All @@ -74,55 +74,58 @@ func (amp *AlignedAllocator) Malloc(size int) []byte {
} else {
ret = make([]byte, size)
}
amp.incrMalloc(ret)
return ret
amp.incrMalloc(&ret)
return &ret
}

// Realloc .
//
//go:norace
func (amp *AlignedAllocator) Realloc(buf []byte, size int) []byte {
if size <= cap(buf) {
return buf[:size]
func (amp *AlignedAllocator) Realloc(pbuf *[]byte, size int) *[]byte {
if size <= cap(*pbuf) {
*pbuf = (*pbuf)[:size]
return pbuf
}
newBuf := amp.Malloc(size)
copy(newBuf, buf)
return newBuf
newBufPtr := amp.Malloc(size)
copy(*newBufPtr, *pbuf)
amp.Free(pbuf)
return newBufPtr
}

// Append .
//
//go:norace
func (amp *AlignedAllocator) Append(buf []byte, more ...byte) []byte {
if cap(buf)-len(buf) >= len(more) {
return append(buf, more...)
func (amp *AlignedAllocator) Append(pbuf *[]byte, more ...byte) *[]byte {
if cap(*pbuf)-len(*pbuf) >= len(more) {
*pbuf = append(*pbuf, more...)
return pbuf
}
newBuf := amp.Malloc(len(buf) + len(more))
copy(newBuf, buf)
copy(newBuf[len(buf):], more)
amp.Free(buf)
return newBuf
newBufPtr := amp.Malloc(len(*pbuf) + len(more))
copy(*newBufPtr, *pbuf)
copy((*newBufPtr)[len(*pbuf):], more)
amp.Free(pbuf)
return newBufPtr
}

// AppendString .
//
//go:norace
func (amp *AlignedAllocator) AppendString(buf []byte, s string) []byte {
func (amp *AlignedAllocator) AppendString(pbuf *[]byte, s string) *[]byte {
x := (*[2]uintptr)(unsafe.Pointer(&s))
h := [3]uintptr{x[0], x[1], x[1]}
more := *(*[]byte)(unsafe.Pointer(&h))
return amp.Append(buf, more...)
return amp.Append(pbuf, more...)
}

// Free .
//
//go:norace
func (amp *AlignedAllocator) Free(buf []byte) {
size := cap(buf)
func (amp *AlignedAllocator) Free(pbuf *[]byte) {
size := cap(*pbuf)
if (size&minAlignedBufferSizeMask) != 0 || size > maxAlignedBufferSize {
return
}
amp.incrFree(buf)
amp.incrFree(pbuf)
idx := alignedIndexes[size]
alignedPools[idx].Put(&buf)
alignedPools[idx].Put(pbuf)
}
28 changes: 14 additions & 14 deletions mempool/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ package mempool
var DefaultMemPool = New(1024, 1024*1024*1024)

type Allocator interface {
Malloc(size int) []byte
Realloc(buf []byte, size int) []byte // deprecated.
Append(buf []byte, more ...byte) []byte
AppendString(buf []byte, more string) []byte
Free(buf []byte)
Malloc(size int) *[]byte
Realloc(buf *[]byte, size int) *[]byte // deprecated.
Append(buf *[]byte, more ...byte) *[]byte
AppendString(buf *[]byte, more string) *[]byte
Free(buf *[]byte)
}

type DebugAllocator interface {
Expand All @@ -18,28 +18,28 @@ type DebugAllocator interface {
}

//go:norace
func Malloc(size int) []byte {
func Malloc(size int) *[]byte {
return DefaultMemPool.Malloc(size)
}

//go:norace
func Realloc(buf []byte, size int) []byte {
return DefaultMemPool.Realloc(buf, size)
func Realloc(pbuf *[]byte, size int) *[]byte {
return DefaultMemPool.Realloc(pbuf, size)
}

//go:norace
func Append(buf []byte, more ...byte) []byte {
return DefaultMemPool.Append(buf, more...)
func Append(pbuf *[]byte, more ...byte) *[]byte {
return DefaultMemPool.Append(pbuf, more...)
}

//go:norace
func AppendString(buf []byte, more string) []byte {
return DefaultMemPool.AppendString(buf, more)
func AppendString(pbuf *[]byte, more string) *[]byte {
return DefaultMemPool.AppendString(pbuf, more)
}

//go:norace
func Free(buf []byte) {
DefaultMemPool.Free(buf)
func Free(pbuf *[]byte) {
DefaultMemPool.Free(pbuf)
}

// func Init(bufSize, freeSize int) {
Expand Down
16 changes: 8 additions & 8 deletions mempool/debugger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ func (d *debugger) SetDebug(dbg bool) {
}

//go:norace
func (d *debugger) incrMalloc(b []byte) {
func (d *debugger) incrMalloc(pbuf *[]byte) {
if d.on {
d.incrMallocSlow(b)
d.incrMallocSlow(pbuf)
}
}

//go:norace
func (d *debugger) incrMallocSlow(b []byte) {
func (d *debugger) incrMallocSlow(pbuf *[]byte) {
atomic.AddInt64(&d.MallocCount, 1)
atomic.AddInt64(&d.NeedFree, 1)
size := cap(b)
size := cap(*pbuf)
d.mux.Lock()
defer d.mux.Unlock()
if d.SizeMap == nil {
Expand All @@ -55,17 +55,17 @@ func (d *debugger) incrMallocSlow(b []byte) {
}

//go:norace
func (d *debugger) incrFree(b []byte) {
func (d *debugger) incrFree(pbuf *[]byte) {
if d.on {
d.incrFreeSlow(b)
d.incrFreeSlow(pbuf)
}
}

//go:norace
func (d *debugger) incrFreeSlow(b []byte) {
func (d *debugger) incrFreeSlow(pbuf *[]byte) {
atomic.AddInt64(&d.FreeCount, 1)
atomic.AddInt64(&d.NeedFree, -1)
size := cap(b)
size := cap(*pbuf)
d.mux.Lock()
defer d.mux.Unlock()
if v, ok := d.SizeMap[size]; ok {
Expand Down
58 changes: 31 additions & 27 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,60 +43,64 @@ func New(bufSize, freeSize int) Allocator {
}

// Malloc .
func (mp *MemPool) Malloc(size int) []byte {
func (mp *MemPool) Malloc(size int) *[]byte {
var ret []byte
if size > mp.freeSize {
ret = make([]byte, size)
mp.incrMalloc(ret)
return ret
mp.incrMalloc(&ret)
return &ret
}
pbuf := mp.pool.Get().(*[]byte)
n := cap(*pbuf)
if n < size {
*pbuf = append((*pbuf)[:n], make([]byte, size-n)...)
}
ret = (*pbuf)[:size]
mp.incrMalloc(ret)
return ret
(*pbuf) = (*pbuf)[:size]
mp.incrMalloc(pbuf)
return pbuf
}

// Realloc .
func (mp *MemPool) Realloc(buf []byte, size int) []byte {
if size <= cap(buf) {
return buf[:size]
func (mp *MemPool) Realloc(pbuf *[]byte, size int) *[]byte {
if size <= cap(*pbuf) {
*pbuf = (*pbuf)[:size]
return pbuf
}

if cap(buf) < mp.freeSize {
pbuf := mp.pool.Get().(*[]byte)
n := cap(buf)
if cap(*pbuf) < mp.freeSize {
newBufPtr := mp.pool.Get().(*[]byte)
n := cap(*newBufPtr)
if n < size {
*pbuf = append((*pbuf)[:n], make([]byte, size-n)...)
*newBufPtr = append((*newBufPtr)[:n], make([]byte, size-n)...)
}
*pbuf = (*pbuf)[:size]
copy(*pbuf, buf)
mp.Free(buf)
return *pbuf
*newBufPtr = (*newBufPtr)[:size]
copy(*newBufPtr, *pbuf)
mp.Free(pbuf)
return newBufPtr
}
return append(buf[:cap(buf)], make([]byte, size-cap(buf))...)[:size]
*pbuf = append((*pbuf)[:cap(*pbuf)], make([]byte, size-cap(*pbuf))...)[:size]
return pbuf
}

// Append .
func (mp *MemPool) Append(buf []byte, more ...byte) []byte {
return append(buf, more...)
func (mp *MemPool) Append(pbuf *[]byte, more ...byte) *[]byte {
*pbuf = append(*pbuf, more...)
return pbuf
}

// AppendString .
func (mp *MemPool) AppendString(buf []byte, more string) []byte {
return append(buf, more...)
func (mp *MemPool) AppendString(pbuf *[]byte, more string) *[]byte {
*pbuf = append(*pbuf, more...)
return pbuf
}

// Free .
func (mp *MemPool) Free(buf []byte) {
if buf != nil && cap(buf) > 0 {
mp.incrFree(buf)
if cap(buf) > mp.freeSize {
func (mp *MemPool) Free(pbuf *[]byte) {
if pbuf != nil && cap(*pbuf) > 0 {
mp.incrFree(pbuf)
if cap(*pbuf) > mp.freeSize {
return
}
mp.pool.Put(&buf)
mp.pool.Put(pbuf)
}
}
Loading

0 comments on commit 03a5bbf

Please sign in to comment.