Skip to content

Commit

Permalink
Merge pull request #400 from lesismal/comments
Browse files Browse the repository at this point in the history
### Socket
1. epoll/kqueue: clear fd from poller automatically by syscall.Close will.
2. Add flow control for write cache flush to avoid large buffer write failure.
3. Optimize the write cache using buffers to avoid a single large buffer cache.

### HTTP: 
1. Change body to [][]byte to avoid single large buffer usage.
2. Optimize body calculation.
3. Add MaxHTTPBodySize config.
4. add Client.Dial configuration.
5. add Engine.SetETAsyncRead/SetLTSyncRead.

### Websocket
1. Add Lock for parsing logic.
2. Optimize session consistency.
  • Loading branch information
lesismal authored Apr 10, 2024
2 parents e4ffe7d + 852d34c commit 4254217
Show file tree
Hide file tree
Showing 41 changed files with 1,290 additions and 879 deletions.
176 changes: 108 additions & 68 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,21 @@ import (
"github.com/lesismal/nbio/logging"
)

// ConnType .
// ConnType is used to identify different types of Conn.
type ConnType = int8

const (
// ConnTypeTCP .
// ConnTypeTCP represents TCP Conn.
ConnTypeTCP ConnType = iota + 1
// ConnTypeUDPServer .
// ConnTypeUDPServer represents UDP Conn used as a listener.
ConnTypeUDPServer
// ConnTypeUDPClientFromRead .
// ConnTypeUDPClientFromRead represents UDP connection that
// is sending data to our UDP Server from peer.
ConnTypeUDPClientFromRead
// ConnTypeUDPClientFromDial .
// ConnTypeUDPClientFromDial represents UDP Conn that is sending
// data to other UDP Server from ourselves.
ConnTypeUDPClientFromDial
// ConnTypeUnix .
// ConnTypeUnix represents Unix Conn.
ConnTypeUnix
)

Expand All @@ -34,12 +36,12 @@ func (c *Conn) Type() ConnType {
return c.typ
}

// IsTCP .
// IsTCP returns whether this Conn is a TCP Conn.
func (c *Conn) IsTCP() bool {
return c.typ == ConnTypeTCP
}

// IsUDP .
// IsUDP returns whether this Conn is a UDP Conn.
func (c *Conn) IsUDP() bool {
switch c.typ {
case ConnTypeUDPServer, ConnTypeUDPClientFromDial, ConnTypeUDPClientFromRead:
Expand All @@ -48,22 +50,36 @@ func (c *Conn) IsUDP() bool {
return false
}

// IsUnix .
// IsUnix returns whether this Conn is a Unix Conn.
func (c *Conn) IsUnix() bool {
return c.typ == ConnTypeUnix
}

// OnData registers callback for data.
// OnData registers Conn's data handler.
// Notice:
// 1. The data readed by the poller is not handled by this Conn's data
// handler by default.
// 2. The data readed by the poller is handled by nbio.Engine's data
// handler which is registered by nbio.Engine.OnData by default.
// 3. This Conn's data handler is used to customize your implementation,
// you can set different data handler for different Conns,
// and call Conn's data handler in nbio.Engine's data handler.
// For example:
// engine.OnData(func(c *nbio.Conn, data byte){
// c.DataHandler()(c, data)
// })
// conn1.OnData(yourDatahandler1)
// conn2.OnData(yourDatahandler2)
func (c *Conn) OnData(h func(conn *Conn, data []byte)) {
c.dataHandler = h
}

// DataHandler returns data handler.
// DataHandler returns Conn's data handler.
func (c *Conn) DataHandler() func(conn *Conn, data []byte) {
return c.dataHandler
}

// Dial wraps net.Dial.
// Dial calls net.Dial to make a net.Conn and convert it to *nbio.Conn.
func Dial(network string, address string) (*Conn, error) {
conn, err := net.Dial(network, address)
if err != nil {
Expand All @@ -72,7 +88,7 @@ func Dial(network string, address string) (*Conn, error) {
return NBConn(conn)
}

// DialTimeout wraps net.DialTimeout.
// Dial calls net.DialTimeout to make a net.Conn and convert it to *nbio.Conn.
func DialTimeout(network string, address string, timeout time.Duration) (*Conn, error) {
conn, err := net.DialTimeout(network, address, timeout)
if err != nil {
Expand All @@ -91,88 +107,112 @@ func (c *Conn) Unlock() {
c.mux.Unlock()
}

// IsClosed .
// IsClosed returns whether the Conn is closed.
func (c *Conn) IsClosed() (bool, error) {
return c.closed, c.closeErr
}

// ExecuteLen .
// ExecuteLen returns the length of the Conn's job list.
func (c *Conn) ExecuteLen() int {
c.mux.Lock()
n := len(c.execList)
n := len(c.jobList)
c.mux.Unlock()
return n
}

// Execute .
func (c *Conn) Execute(f func()) bool {
// Execute is used to run the job.
//
// How it works:
// If the job is the head/first of the Conn's job list, it will call the
// nbio.Engine.Execute to run all the jobs in the job list that include:
// 1. This job
// 2. New jobs that are pushed to the back of the list before this job
// is done.
// 3. nbio.Engine.Execute returns until there's no more jobs in the job
// list.
//
// Else if the job is not the head/first of the job list, it will push the
// job to the back of the job list and wait to be called.
// This guarantees there's at most one flow or goroutine running job/jobs
// for each Conn.
// This guarantees all the jobs are executed in order.
//
// Notice:
// 1. The job wouldn't run or pushed to the back of the job list if the
// connection is closed.
// 2. nbio.Engine.Execute is handled by a goroutine pool by default, users
// can customize it.
func (c *Conn) Execute(job func()) bool {
c.mux.Lock()
if c.closed {
c.mux.Unlock()
return false
}

isHead := (len(c.execList) == 0)
c.execList = append(c.execList, f)
isHead := (len(c.jobList) == 0)
c.jobList = append(c.jobList, job)
c.mux.Unlock()

// If there's no job running, run Engine.Execute to run this job
// and new jobs appended before this head job is done.
if isHead {
c.p.g.Execute(func() {
i := 0
for {
func() {
defer func() {
if err := recover(); err != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
logging.Error("conn execute failed: %v\n%v\n", err, *(*string)(unsafe.Pointer(&buf)))
}
}()
f()
}()

c.mux.Lock()
i++
if len(c.execList) == i {
c.execList[i-1] = nil
c.execList = c.execList[0:0]
c.mux.Unlock()
return
}
f = c.execList[i]
c.execList[i] = nil
c.mux.Unlock()
}
})
c.execute(job)
}

return true
}

// MustExecute .
func (c *Conn) MustExecute(f func()) {
// MustExecute implements a similar function as Execute did,
// but will still execute or push the job to the
// back of the job list no matter whether Conn has been closed,
// it guarantees the job to be executed.
// This is used to handle the close event in nbio/nbhttp.
func (c *Conn) MustExecute(job func()) {
c.mux.Lock()
isHead := (len(c.execList) == 0)
c.execList = append(c.execList, f)
isHead := (len(c.jobList) == 0)
c.jobList = append(c.jobList, job)
c.mux.Unlock()

// If there's no job running, run Engine.Execute to run this job
// and new jobs appended before this head job is done.
if isHead {
c.p.g.Execute(func() {
i := 0
for {
f()

c.mux.Lock()
i++
if len(c.execList) == i {
c.execList = c.execList[0:0]
c.mux.Unlock()
return
}
f = c.execList[i]
c.execute(job)
}
}

func (c *Conn) execute(job func()) {
c.p.g.Execute(func() {
i := 0
for {
func() {
defer func() {
if err := recover(); err != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
logging.Error("conn execute failed: %v\n%v\n",
err,
*(*string)(unsafe.Pointer(&buf)),
)
}
}()
job()
}()

c.mux.Lock()
i++
if len(c.jobList) == i {
// set nil to release the job and gc
c.jobList[i-1] = nil
// reuse the slice
c.jobList = c.jobList[0:0]
c.mux.Unlock()
return
}
})
}
// get next job
job = c.jobList[i]
// set nil to release the job and gc
c.jobList[i] = nil
c.mux.Unlock()
}
})
}
25 changes: 3 additions & 22 deletions conn_std.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Conn struct {
// user session.
session interface{}

execList []func()
jobList []func()

cache *bytes.Buffer

Expand Down Expand Up @@ -69,24 +69,6 @@ func (c *Conn) Read(b []byte) (int, error) {
return nread, err
}

// ReadUDP .
func (c *Conn) ReadUDP(b []byte) (*Conn, int, error) {
if c.closeErr != nil {
return c, 0, c.closeErr
}

var reader io.Reader = c.conn
if c.cache != nil {
reader = c.cache
}
nread, err := reader.Read(b)
if c.closeErr == nil {
c.closeErr = err
}

return c, nread, err
}

func (c *Conn) read(b []byte) (int, error) {
var err error
var nread int
Expand All @@ -104,7 +86,7 @@ func (c *Conn) read(b []byte) (int, error) {

func (c *Conn) readTCP(b []byte) (int, error) {
g := c.p.g
g.beforeRead(c)
// g.beforeRead(c)
nread, err := c.conn.Read(b)
if c.closeErr == nil {
c.closeErr = err
Expand Down Expand Up @@ -187,8 +169,7 @@ func (c *Conn) Write(b []byte) (int, error) {
}

func (c *Conn) writeTCP(b []byte) (int, error) {
c.p.g.beforeWrite(c)

// c.p.g.beforeWrite(c)
nwrite, err := c.conn.Write(b)
if err != nil {
if c.closeErr == nil {
Expand Down
Loading

0 comments on commit 4254217

Please sign in to comment.