From 02875ce32c87c8b5db5d9b62db4ed5bb3d9d046b Mon Sep 17 00:00:00 2001 From: Joway Date: Wed, 2 Nov 2022 13:54:08 +0800 Subject: [PATCH 01/20] chore: fixed skywalking-eyes version (#211) --- .github/workflows/pr-check.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml index e7a52c74..b35a04bf 100644 --- a/.github/workflows/pr-check.yml +++ b/.github/workflows/pr-check.yml @@ -21,7 +21,7 @@ jobs: ${{ runner.os }}-go- - name: Check License Header - uses: apache/skywalking-eyes@main + uses: apache/skywalking-eyes/header@v0.4.0 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} From 7d39e2c3ce4429728edbb52caa29074a520d30c6 Mon Sep 17 00:00:00 2001 From: Joway Date: Fri, 4 Nov 2022 10:52:15 +0800 Subject: [PATCH 02/20] fix: bookAck panic when readv return EINTR (#212) --- connection_impl.go | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/connection_impl.go b/connection_impl.go index 58dbf15b..b4e92394 100644 --- a/connection_impl.go +++ b/connection_impl.go @@ -461,13 +461,24 @@ func (c *connection) fill(need int) (err error) { defer c.unlock(finalizing) var n int + var bs [][]byte for { - n, err = readv(c.fd, c.inputs(c.inputBarrier.bs), c.inputBarrier.ivs) - c.inputAck(n) - err = c.eofError(n, err) + bs = c.inputs(c.inputBarrier.bs) + TryRead: + n, err = readv(c.fd, bs, c.inputBarrier.ivs) if err != nil { + if err == syscall.EINTR { + // if err == EINTR, we must reuse bs that has been booked + // otherwise will mess the input buffer + goto TryRead + } + break + } + if n == 0 { + err = Exception(ErrEOF, "") break } + c.inputAck(n) } if c.inputBuffer.Len() >= need { return nil @@ -475,16 +486,6 @@ func (c *connection) fill(need int) (err error) { return err } -func (c *connection) eofError(n int, err error) error { - if err == syscall.EINTR { - return nil - } - if n == 0 && err == nil { - return Exception(ErrEOF, "") - } - return err -} - // flush write data directly. func (c *connection) flush() error { if c.outputBuffer.IsEmpty() { From 700a7070b6d8db3c0be315c387e69f969dda54a1 Mon Sep 17 00:00:00 2001 From: "abner.chen" Date: Fri, 11 Nov 2022 14:44:15 +0800 Subject: [PATCH 03/20] feat: support linux(loong64) (#199) Signed-off-by: Guoqi Chen Signed-off-by: Guoqi Chen --- sys_epoll_linux.go | 4 +-- sys_epoll_linux_loong64.go | 55 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 2 deletions(-) create mode 100644 sys_epoll_linux_loong64.go diff --git a/sys_epoll_linux.go b/sys_epoll_linux.go index 2c02f38d..5bfa5380 100644 --- a/sys_epoll_linux.go +++ b/sys_epoll_linux.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build !arm64 -// +build !arm64 +//go:build !arm64 && !loong64 +// +build !arm64,!loong64 package netpoll diff --git a/sys_epoll_linux_loong64.go b/sys_epoll_linux_loong64.go new file mode 100644 index 00000000..3645c8a9 --- /dev/null +++ b/sys_epoll_linux_loong64.go @@ -0,0 +1,55 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux && loong64 +// +build linux,loong64 + +package netpoll + +import ( + "syscall" + "unsafe" +) + +const EPOLLET = syscall.EPOLLET + +type epollevent struct { + events uint32 + _ int32 + data [8]byte // unaligned uintptr +} + +// EpollCtl implements epoll_ctl. +func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) { + _, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0) + if err == syscall.Errno(0) { + err = nil + } + return err +} + +// EpollWait implements epoll_wait. +func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) { + var r0 uintptr + var _p0 = unsafe.Pointer(&events[0]) + if msec == 0 { + r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0) + } else { + r0, _, err = syscall.Syscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0) + } + if err == syscall.Errno(0) { + err = nil + } + return int(r0), err +} From a5722440b5025b6d0e429a891a5b583c0ae38cfb Mon Sep 17 00:00:00 2001 From: Joway Date: Thu, 1 Dec 2022 10:45:30 +0800 Subject: [PATCH 04/20] feat(mux): shared queue add Close interface (#218) * feat: shared queue add Close interface * fix: check trigger * fix: reset to closed --- mux/shard_queue.go | 32 +++++++++++++++++++++++++++++++- mux/shard_queue_test.go | 2 ++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/mux/shard_queue.go b/mux/shard_queue.go index d7d6aebb..7c7c1261 100644 --- a/mux/shard_queue.go +++ b/mux/shard_queue.go @@ -15,6 +15,7 @@ package mux import ( + "fmt" "runtime" "sync" "sync/atomic" @@ -71,17 +72,28 @@ type ShardQueue struct { queueTrigger } +const ( + // queueTrigger state + active = 0 + closing = 1 + closed = 2 +) + // here for trigger type queueTrigger struct { trigger int32 + state int32 // 0: active, 1: closing, 2: closed runNum int32 - list []int32 // record the triggered shard w, r int32 // ptr of list + list []int32 // record the triggered shard listLock sync.Mutex // list total lock } // Add adds to q.getters[shard] func (q *ShardQueue) Add(gts ...WriterGetter) { + if atomic.LoadInt32(&q.state) != active { + return + } shard := atomic.AddInt32(&q.idx, 1) % q.size q.lock(shard) trigger := len(q.getters[shard]) == 0 @@ -92,6 +104,21 @@ func (q *ShardQueue) Add(gts ...WriterGetter) { } } +func (q *ShardQueue) Close() error { + if !atomic.CompareAndSwapInt32(&q.state, active, closing) { + return fmt.Errorf("shardQueue has been closed") + } + // wait for all tasks finished + for atomic.LoadInt32(&q.state) != closed { + if atomic.LoadInt32(&q.trigger) == 0 { + atomic.StoreInt32(&q.trigger, closed) + return nil + } + runtime.Gosched() + } + return nil +} + // triggering shard. func (q *ShardQueue) triggering(shard int32) { q.listLock.Lock() @@ -137,7 +164,10 @@ func (q *ShardQueue) foreach() { atomic.StoreInt32(&q.runNum, 0) if atomic.LoadInt32(&q.trigger) > 0 { q.foreach() + return } + // if state is closing, change it to closed + atomic.CompareAndSwapInt32(&q.state, closing, closed) }) } diff --git a/mux/shard_queue_test.go b/mux/shard_queue_test.go index 3bfae3d4..fe5a83ce 100644 --- a/mux/shard_queue_test.go +++ b/mux/shard_queue_test.go @@ -67,6 +67,8 @@ func TestShardQueue(t *testing.T) { queue.Add(getter) } + err = queue.Close() + MustNil(t, err) total := count * pkgsize recv := make([]byte, total) rn, err := svrConn.Read(recv) From 8634b9841afa1ba52cfdf1b4a2aa54448732b193 Mon Sep 17 00:00:00 2001 From: Joway Date: Thu, 1 Dec 2022 15:19:40 +0800 Subject: [PATCH 05/20] fix: check nil in isSingleNode func (#215) --- nocopy_linkbuffer.go | 2 +- nocopy_linkbuffer_race.go | 2 +- nocopy_linkbuffer_test.go | 21 +++++++++++++++++++++ 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index e148e99f..6200bf5b 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -791,7 +791,7 @@ func (b *LinkBuffer) isSingleNode(readN int) (single bool) { return true } l := b.read.Len() - for l == 0 { + for l == 0 && b.read != b.flush { b.read = b.read.next l = b.read.Len() } diff --git a/nocopy_linkbuffer_race.go b/nocopy_linkbuffer_race.go index 6a471d18..7f2f274f 100644 --- a/nocopy_linkbuffer_race.go +++ b/nocopy_linkbuffer_race.go @@ -840,7 +840,7 @@ func (b *LinkBuffer) isSingleNode(readN int) (single bool) { return true } l := b.read.Len() - for l == 0 { + for l == 0 && b.read != b.flush { b.read = b.read.next l = b.read.Len() } diff --git a/nocopy_linkbuffer_test.go b/nocopy_linkbuffer_test.go index 7665edc2..a2f68fb4 100644 --- a/nocopy_linkbuffer_test.go +++ b/nocopy_linkbuffer_test.go @@ -367,6 +367,27 @@ func TestWriteBuffer(t *testing.T) { MustTrue(t, bytes.Equal(buf1.Bytes(), []byte{2, 3})) } +func TestLinkBufferCheckSingleNode(t *testing.T) { + buf := NewLinkBuffer(block4k) + _, err := buf.Malloc(block8k) + MustNil(t, err) + buf.Flush() + MustTrue(t, buf.read.Len() == 0) + is := buf.isSingleNode(block8k) + MustTrue(t, is) + MustTrue(t, buf.read.Len() == block8k) + is = buf.isSingleNode(block8k + 1) + MustTrue(t, !is) + + // cross node malloc, but b.read.Len() still == 0 + buf = NewLinkBuffer(block4k) + _, err = buf.Malloc(block8k) + MustNil(t, err) + // not malloc ack yet + // read function will call isSingleNode inside + buf.isSingleNode(1) +} + func TestWriteMultiFlush(t *testing.T) { buf := NewLinkBuffer() b1, _ := buf.Malloc(4) From 7a34c687b60b88051ea87b9f67f82e51aa9191b5 Mon Sep 17 00:00:00 2001 From: Pure White Date: Tue, 13 Dec 2022 21:08:13 +0800 Subject: [PATCH 06/20] chore: add CODEOWNERS --- .github/CODEOWNERS | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .github/CODEOWNERS diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 00000000..9bc6caf2 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,3 @@ +# For more information, please refer to https://docs.github.com/en/repositories/managing-your-repositorys-settings-and-features/customizing-your-repository/about-code-owners + +* @cloudwego/netpoll-reviewer @cloudwego/netpoll-approver @cloudwego/netpoll-maintainer From f1de6ad3f55d3791bec469717bfbddb711dec0b0 Mon Sep 17 00:00:00 2001 From: Pure White Date: Wed, 14 Dec 2022 11:36:32 +0800 Subject: [PATCH 07/20] chore: fix CODEOWNERS team name --- .github/CODEOWNERS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 9bc6caf2..83055171 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,3 +1,3 @@ # For more information, please refer to https://docs.github.com/en/repositories/managing-your-repositorys-settings-and-features/customizing-your-repository/about-code-owners -* @cloudwego/netpoll-reviewer @cloudwego/netpoll-approver @cloudwego/netpoll-maintainer +* @cloudwego/netpoll-reviewers @cloudwego/netpoll-approvers @cloudwego/netpoll-maintainers From 0e23a9020fb14c16bdc7ab7306130bae0898110e Mon Sep 17 00:00:00 2001 From: Joway Date: Wed, 14 Dec 2022 15:12:53 +0800 Subject: [PATCH 08/20] chore: add logger (#222) --- connection_onevent.go | 3 +-- net_netfd_conn.go | 3 +-- netpoll_options.go | 5 +++++ netpoll_server.go | 3 +-- poll_default_bsd.go | 5 ++--- poll_default_linux.go | 5 ++--- poll_manager.go | 11 ++++++++++- poll_race_bsd.go | 5 ++--- poll_race_linux.go | 5 ++--- 9 files changed, 26 insertions(+), 19 deletions(-) diff --git a/connection_onevent.go b/connection_onevent.go index a2b8a488..b686cfd6 100644 --- a/connection_onevent.go +++ b/connection_onevent.go @@ -19,7 +19,6 @@ package netpoll import ( "context" - "log" "sync/atomic" "github.com/bytedance/gopkg/util/gopool" @@ -235,7 +234,7 @@ func (c *connection) register() (err error) { err = c.operator.Control(PollReadable) } if err != nil { - log.Println("connection register failed:", err.Error()) + logger.Println("connection register failed:", err.Error()) c.Close() return Exception(ErrConnClosed, err.Error()) } diff --git a/net_netfd_conn.go b/net_netfd_conn.go index ce174885..44913f1c 100644 --- a/net_netfd_conn.go +++ b/net_netfd_conn.go @@ -18,7 +18,6 @@ package netpoll import ( - "log" "net" "strings" "sync/atomic" @@ -63,7 +62,7 @@ func (c *netFD) Close() (err error) { if c.fd > 0 { err = syscall.Close(c.fd) if err != nil { - log.Printf("netFD[%d] close error: %s", c.fd, err.Error()) + logger.Printf("netFD[%d] close error: %s", c.fd, err.Error()) } } return err diff --git a/netpoll_options.go b/netpoll_options.go index f3ff0453..f2effafa 100644 --- a/netpoll_options.go +++ b/netpoll_options.go @@ -18,6 +18,7 @@ package netpoll import ( + "io" "time" ) @@ -42,6 +43,10 @@ func SetLoadBalance(lb LoadBalance) error { return setLoadBalance(lb) } +func SetLoggerOutput(w io.Writer) { + setLoggerOutput(w) +} + // DisableGopool will remove gopool(the goroutine pool used to run OnRequest), // which means that OnRequest will be run via `go OnRequest(...)`. // Usually, OnRequest will cause stack expansion, which can be solved by reusing goroutine. diff --git a/netpoll_server.go b/netpoll_server.go index 3a627aff..baf5208f 100644 --- a/netpoll_server.go +++ b/netpoll_server.go @@ -20,7 +20,6 @@ package netpoll import ( "context" "errors" - "log" "strings" "sync" "time" @@ -100,7 +99,7 @@ func (s *server) OnRead(p Poll) error { s.onQuit(err) return err } - log.Println("accept conn failed:", err.Error()) + logger.Println("accept conn failed:", err.Error()) return err } if conn == nil { diff --git a/poll_default_bsd.go b/poll_default_bsd.go index af55ee0c..d2f087dd 100644 --- a/poll_default_bsd.go +++ b/poll_default_bsd.go @@ -19,7 +19,6 @@ package netpoll import ( - "log" "sync/atomic" "syscall" "unsafe" @@ -96,7 +95,7 @@ func (p *defaultPoll) Wait() error { var n, err = readv(operator.FD, bs, barriers[i].ivs) operator.InputAck(n) if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { - log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } @@ -123,7 +122,7 @@ func (p *defaultPoll) Wait() error { var n, err = sendmsg(operator.FD, bs, barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) if err != nil && err != syscall.EAGAIN { - log.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } diff --git a/poll_default_linux.go b/poll_default_linux.go index 09e6009b..7ff6feff 100644 --- a/poll_default_linux.go +++ b/poll_default_linux.go @@ -18,7 +18,6 @@ package netpoll import ( - "log" "runtime" "sync/atomic" "syscall" @@ -141,7 +140,7 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { var n, err = readv(operator.FD, bs, p.barriers[i].ivs) operator.InputAck(n) if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { - log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } @@ -177,7 +176,7 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) if err != nil && err != syscall.EAGAIN { - log.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } diff --git a/poll_manager.go b/poll_manager.go index a9f0b0fe..9f6f500d 100644 --- a/poll_manager.go +++ b/poll_manager.go @@ -19,7 +19,9 @@ package netpoll import ( "fmt" + "io" "log" + "os" "runtime" ) @@ -31,14 +33,21 @@ func setLoadBalance(lb LoadBalance) error { return pollmanager.SetLoadBalance(lb) } +func setLoggerOutput(w io.Writer) { + logger = log.New(w, "", log.LstdFlags) +} + // manage all pollers var pollmanager *manager +var logger *log.Logger func init() { var loops = runtime.GOMAXPROCS(0)/20 + 1 pollmanager = &manager{} pollmanager.SetLoadBalance(RoundRobin) pollmanager.SetNumLoops(loops) + + setLoggerOutput(os.Stderr) } // LoadBalance is used to do load balancing among multiple pollers. @@ -63,7 +72,7 @@ func (m *manager) SetNumLoops(numLoops int) error { polls[idx] = m.polls[idx] } else { if err := m.polls[idx].Close(); err != nil { - log.Printf("poller close failed: %v\n", err) + logger.Printf("poller close failed: %v\n", err) } } } diff --git a/poll_race_bsd.go b/poll_race_bsd.go index b53a51b8..39540717 100644 --- a/poll_race_bsd.go +++ b/poll_race_bsd.go @@ -19,7 +19,6 @@ package netpoll import ( - "log" "sync" "sync/atomic" "syscall" @@ -103,7 +102,7 @@ func (p *defaultPoll) Wait() error { var n, err = readv(operator.FD, bs, barriers[i].ivs) operator.InputAck(n) if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { - log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } @@ -130,7 +129,7 @@ func (p *defaultPoll) Wait() error { var n, err = sendmsg(operator.FD, bs, barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) if err != nil && err != syscall.EAGAIN { - log.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } diff --git a/poll_race_linux.go b/poll_race_linux.go index 060d7ba3..45b848b7 100644 --- a/poll_race_linux.go +++ b/poll_race_linux.go @@ -18,7 +18,6 @@ package netpoll import ( - "log" "runtime" "sync" "sync/atomic" @@ -138,7 +137,7 @@ func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) { var n, err = readv(operator.FD, bs, p.barriers[i].ivs) operator.InputAck(n) if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { - log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } @@ -175,7 +174,7 @@ func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) { var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) if err != nil && err != syscall.EAGAIN { - log.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } From de94204c096601635402930fd1a9ab63492addb3 Mon Sep 17 00:00:00 2001 From: Joway Date: Wed, 14 Dec 2022 15:15:22 +0800 Subject: [PATCH 09/20] chore: add epoll tests (#221) --- go.mod | 5 +- go.sum | 1 + mux/shard_queue_test.go | 7 +- poll_default_linux.go | 2 +- poll_default_linux_test.go | 160 +++++++++++++++++++++++++++++++++++++ poll_race_linux.go | 2 +- sys_epoll_linux.go | 10 +++ sys_epoll_linux_arm64.go | 10 +++ sys_epoll_linux_loong64.go | 10 +++ 9 files changed, 200 insertions(+), 7 deletions(-) create mode 100644 poll_default_linux_test.go diff --git a/go.mod b/go.mod index 800411e6..2e73daa5 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module github.com/cloudwego/netpoll go 1.15 -require github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7 +require ( + github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7 + golang.org/x/sys v0.0.0-20220110181412-a018aaa089fe +) diff --git a/go.sum b/go.sum index deaed30b..32a454e1 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20220110181412-a018aaa089fe h1:W8vbETX/n8S6EmY0Pu4Ix7VvpsJUESTwl0oCK8MJOgk= golang.org/x/sys v0.0.0-20220110181412-a018aaa089fe/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/mux/shard_queue_test.go b/mux/shard_queue_test.go index fe5a83ce..7a595d21 100644 --- a/mux/shard_queue_test.go +++ b/mux/shard_queue_test.go @@ -19,7 +19,6 @@ package mux import ( "net" - "runtime" "testing" "time" @@ -28,6 +27,7 @@ import ( func TestShardQueue(t *testing.T) { var svrConn net.Conn + accepted := make(chan struct{}) network, address := "tcp", ":18888" ln, err := net.Listen("tcp", ":18888") @@ -46,14 +46,13 @@ func TestShardQueue(t *testing.T) { } svrConn, err = ln.Accept() MustNil(t, err) + accepted <- struct{}{} } }() conn, err := netpoll.DialConnection(network, address, time.Second) MustNil(t, err) - for svrConn == nil { - runtime.Gosched() - } + <-accepted // test queue := NewShardQueue(4, conn) diff --git a/poll_default_linux.go b/poll_default_linux.go index 7ff6feff..fdb10e45 100644 --- a/poll_default_linux.go +++ b/poll_default_linux.go @@ -32,7 +32,7 @@ func openPoll() Poll { func openDefaultPoll() *defaultPoll { var poll = defaultPoll{} poll.buf = make([]byte, 8) - var p, err = syscall.EpollCreate1(0) + var p, err = EpollCreate(0) if err != nil { panic(err) } diff --git a/poll_default_linux_test.go b/poll_default_linux_test.go new file mode 100644 index 00000000..7b6bdc23 --- /dev/null +++ b/poll_default_linux_test.go @@ -0,0 +1,160 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux +// +build linux + +package netpoll + +import ( + "syscall" + "testing" + + "golang.org/x/sys/unix" +) + +func TestEpollEvent(t *testing.T) { + var epollfd, err = EpollCreate(0) + MustNil(t, err) + defer syscall.Close(epollfd) + + rfd, wfd := GetSysFdPairs() + defer syscall.Close(rfd) + defer syscall.Close(wfd) + + send := []byte("hello") + recv := make([]byte, 5) + events := make([]epollevent, 128) + eventdata1 := [8]byte{0, 0, 0, 0, 0, 0, 0, 1} + eventdata2 := [8]byte{0, 0, 0, 0, 0, 0, 0, 2} + eventdata3 := [8]byte{0, 0, 0, 0, 0, 0, 0, 3} + event1 := &epollevent{ + events: syscall.EPOLLIN, + data: eventdata1, + } + event2 := &epollevent{ + events: syscall.EPOLLIN, + data: eventdata2, + } + event3 := &epollevent{ + events: syscall.EPOLLIN | syscall.EPOLLOUT, + data: eventdata3, + } + + // EPOLL: add ,del and add + err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event1) + MustNil(t, err) + err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event1) + MustNil(t, err) + err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event2) + MustNil(t, err) + _, err = syscall.Write(wfd, send) + MustNil(t, err) + n, err := EpollWait(epollfd, events, -1) + MustNil(t, err) + Equal(t, n, 1) + Equal(t, events[0].data, eventdata2) + _, err = syscall.Read(rfd, recv) + MustTrue(t, err == nil && string(recv) == string(send)) + err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event2) + MustNil(t, err) + + // EPOLL: add ,mod and mod + err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event1) + MustNil(t, err) + err = EpollCtl(epollfd, unix.EPOLL_CTL_MOD, rfd, event2) + MustNil(t, err) + err = EpollCtl(epollfd, unix.EPOLL_CTL_MOD, rfd, event3) + MustNil(t, err) + _, err = syscall.Write(wfd, send) + MustNil(t, err) + n, err = EpollWait(epollfd, events, -1) + MustNil(t, err) + Equal(t, events[0].data, eventdata3) + _, err = syscall.Read(rfd, recv) + MustTrue(t, err == nil && string(recv) == string(send)) + Assert(t, events[0].events&syscall.EPOLLIN != 0) + Assert(t, events[0].events&syscall.EPOLLOUT != 0) + + err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event2) + MustNil(t, err) +} + +func TestEpollWait(t *testing.T) { + var epollfd, err = EpollCreate(0) + MustNil(t, err) + defer syscall.Close(epollfd) + + rfd, wfd := GetSysFdPairs() + defer syscall.Close(wfd) + + send := []byte("hello") + recv := make([]byte, 5) + events := make([]epollevent, 128) + eventdata := [8]byte{0, 0, 0, 0, 0, 0, 0, 1} + + // EPOLL: init state + event := &epollevent{ + events: syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR, + data: eventdata, + } + err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event) + MustNil(t, err) + _, err = EpollWait(epollfd, events, -1) + MustNil(t, err) + Assert(t, events[0].events&syscall.EPOLLIN == 0) + Assert(t, events[0].events&syscall.EPOLLOUT != 0) + + // EPOLL: readable + _, err = syscall.Write(wfd, send) + MustNil(t, err) + _, err = EpollWait(epollfd, events, -1) + MustNil(t, err) + Assert(t, events[0].events&syscall.EPOLLIN != 0) + Assert(t, events[0].events&syscall.EPOLLOUT != 0) + _, err = syscall.Read(rfd, recv) + MustTrue(t, err == nil && string(recv) == string(send)) + + // EPOLL: read finished + _, err = EpollWait(epollfd, events, -1) + MustNil(t, err) + Assert(t, events[0].events&syscall.EPOLLIN == 0) + Assert(t, events[0].events&syscall.EPOLLOUT != 0) + + // EPOLL: close peer fd + err = syscall.Close(wfd) + MustNil(t, err) + _, err = EpollWait(epollfd, events, -1) + MustNil(t, err) + Assert(t, events[0].events&syscall.EPOLLIN != 0) + Assert(t, events[0].events&syscall.EPOLLOUT != 0) + Assert(t, events[0].events&syscall.EPOLLRDHUP != 0) + Assert(t, events[0].events&syscall.EPOLLERR == 0) + + // EPOLL: close current fd + rfd2, wfd2 := GetSysFdPairs() + defer syscall.Close(wfd2) + err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd2, event) + err = syscall.Close(rfd2) + MustNil(t, err) + _, err = EpollWait(epollfd, events, -1) + MustNil(t, err) + Assert(t, events[0].events&syscall.EPOLLIN != 0) + Assert(t, events[0].events&syscall.EPOLLOUT != 0) + Assert(t, events[0].events&syscall.EPOLLRDHUP != 0) + Assert(t, events[0].events&syscall.EPOLLERR == 0) + + err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event) + MustNil(t, err) +} diff --git a/poll_race_linux.go b/poll_race_linux.go index 45b848b7..8ede0d99 100644 --- a/poll_race_linux.go +++ b/poll_race_linux.go @@ -32,7 +32,7 @@ func openPoll() Poll { func openDefaultPoll() *defaultPoll { var poll = defaultPoll{} poll.buf = make([]byte, 8) - var p, err = syscall.EpollCreate1(0) + var p, err = EpollCreate(0) if err != nil { panic(err) } diff --git a/sys_epoll_linux.go b/sys_epoll_linux.go index 5bfa5380..528c34ed 100644 --- a/sys_epoll_linux.go +++ b/sys_epoll_linux.go @@ -29,6 +29,16 @@ type epollevent struct { data [8]byte // unaligned uintptr } +// EpollCreate implements epoll_create1. +func EpollCreate(flag int) (fd int, err error) { + var r0 uintptr + r0, _, err = syscall.RawSyscall(syscall.SYS_EPOLL_CREATE1, uintptr(flag), 0, 0) + if err == syscall.Errno(0) { + err = nil + } + return int(r0), err +} + // EpollCtl implements epoll_ctl. func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) { _, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0) diff --git a/sys_epoll_linux_arm64.go b/sys_epoll_linux_arm64.go index c83781b5..e8d6094d 100644 --- a/sys_epoll_linux_arm64.go +++ b/sys_epoll_linux_arm64.go @@ -27,6 +27,16 @@ type epollevent struct { data [8]byte // unaligned uintptr } +// EpollCreate implements epoll_create1. +func EpollCreate(flag int) (fd int, err error) { + var r0 uintptr + r0, _, err = syscall.RawSyscall(syscall.SYS_EPOLL_CREATE1, uintptr(flag), 0, 0) + if err == syscall.Errno(0) { + err = nil + } + return int(r0), err +} + // EpollCtl implements epoll_ctl. func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) { _, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0) diff --git a/sys_epoll_linux_loong64.go b/sys_epoll_linux_loong64.go index 3645c8a9..ecf36c13 100644 --- a/sys_epoll_linux_loong64.go +++ b/sys_epoll_linux_loong64.go @@ -30,6 +30,16 @@ type epollevent struct { data [8]byte // unaligned uintptr } +// EpollCreate implements epoll_create1. +func EpollCreate(flag int) (fd int, err error) { + var r0 uintptr + r0, _, err = syscall.RawSyscall(syscall.SYS_EPOLL_CREATE1, uintptr(flag), 0, 0) + if err == syscall.Errno(0) { + err = nil + } + return int(r0), err +} + // EpollCtl implements epoll_ctl. func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) { _, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0) From 4d53b80e2ca7c44165de417dd2eada90c404aec9 Mon Sep 17 00:00:00 2001 From: Joway Date: Tue, 20 Dec 2022 16:24:48 +0800 Subject: [PATCH 10/20] refactor: free operator in poller (#223) --- .github/workflows/pr-check.yml | 36 +++++--- connection_impl.go | 21 +++-- connection_onevent.go | 16 ++-- fd_operator.go | 5 + fd_operator_cache.go | 69 ++++++++------ fd_operator_cache_test.go | 25 +++-- net_listener_test.go | 15 +-- net_netfd.go | 41 +-------- net_netfd_conn.go | 2 +- net_polldesc.go | 62 +++++++------ netpoll_server.go | 2 +- poll.go | 6 ++ poll_default_bsd.go | 21 ++++- poll_default_linux.go | 53 +++++++---- poll_default_linux_test.go | 163 +++++++++++++++++++++++++++++++++ poll_manager.go | 2 +- poll_race_bsd.go | 21 ++++- poll_race_linux.go | 34 +++++-- poll_test.go | 17 +++- 19 files changed, 426 insertions(+), 185 deletions(-) diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml index b35a04bf..9484b71c 100644 --- a/.github/workflows/pr-check.yml +++ b/.github/workflows/pr-check.yml @@ -3,35 +3,41 @@ name: Push and Pull Request Check on: [ push, pull_request ] jobs: - build: - runs-on: ubuntu-latest + compatibility-test: + strategy: + matrix: + go: [ 1.15, 1.19 ] + os: [ X64, ARM64 ] + runs-on: ${{ matrix.os }} steps: - - uses: actions/checkout@v2 - + - uses: actions/checkout@v3 - name: Set up Go - uses: actions/setup-go@v2 + uses: actions/setup-go@v3 with: - go-version: 1.16 - + go-version: ${{ matrix.go }} - uses: actions/cache@v2 with: path: ~/go/pkg/mod key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} restore-keys: | ${{ runner.os }}-go- - + - name: Unit Test + run: go test -v -race -covermode=atomic -coverprofile=coverage.out ./... + - name: Benchmark + run: go test -bench=. -benchmem -run=none ./... + style-test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: 1.16 - name: Check License Header uses: apache/skywalking-eyes/header@v0.4.0 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Lint run: | test -z "$(gofmt -s -l .)" go vet -stdmethods=false $(go list ./...) - - - name: Unit Test - run: go test -v -race -covermode=atomic -coverprofile=coverage.out ./... - - - name: Benchmark - run: go test -bench=. -benchmem -run=none ./... diff --git a/connection_impl.go b/connection_impl.go index b4e92394..beb11d23 100644 --- a/connection_impl.go +++ b/connection_impl.go @@ -349,26 +349,31 @@ func (c *connection) initNetFD(conn Conn) { } func (c *connection) initFDOperator() { - op := allocop() + var op *FDOperator + if c.pd != nil && c.pd.operator != nil { + // reuse operator created at connect step + op = c.pd.operator + } else { + poll := pollmanager.Pick() + op = poll.Alloc() + } op.FD = c.fd op.OnRead, op.OnWrite, op.OnHup = nil, nil, c.onHup op.Inputs, op.InputAck = c.inputs, c.inputAck op.Outputs, op.OutputAck = c.outputs, c.outputAck - // if connection has been registered, must reuse poll here. - if c.pd != nil && c.pd.operator != nil { - op.poll = c.pd.operator.poll - } c.operator = op } func (c *connection) initFinalizer() { - c.AddCloseCallback(func(connection Connection) error { + c.AddCloseCallback(func(connection Connection) (err error) { c.stop(flushing) // stop the finalizing state to prevent conn.fill function to be performed c.stop(finalizing) - freeop(c.operator) - c.netFD.Close() + c.operator.Free() + if err = c.netFD.Close(); err != nil { + logger.Printf("NETPOLL: netFD close failed: %v", err) + } c.closeBuffer() return nil }) diff --git a/connection_onevent.go b/connection_onevent.go index b686cfd6..e8567ba9 100644 --- a/connection_onevent.go +++ b/connection_onevent.go @@ -213,7 +213,9 @@ func (c *connection) closeCallback(needLock bool) (err error) { } // If Close is called during OnPrepare, poll is not registered. if c.isCloseBy(user) && c.operator.poll != nil { - c.operator.Control(PollDetach) + if err = c.operator.Control(PollDetach); err != nil { + logger.Printf("NETPOLL: closeCallback detach operator failed: %v", err) + } } var latest = c.closeCallbacks.Load() if latest == nil { @@ -227,14 +229,16 @@ func (c *connection) closeCallback(needLock bool) (err error) { // register only use for connection register into poll. func (c *connection) register() (err error) { - if c.operator.poll != nil { - err = c.operator.Control(PollModReadable) - } else { - c.operator.poll = pollmanager.Pick() + if c.operator.isUnused() { + // operator is not registered err = c.operator.Control(PollReadable) + } else { + // operator is already registered + // change event to wait read new data + err = c.operator.Control(PollModReadable) } if err != nil { - logger.Println("connection register failed:", err.Error()) + logger.Printf("NETPOLL: connection register failed: %v", err) c.Close() return Exception(ErrConnClosed, err.Error()) } diff --git a/fd_operator.go b/fd_operator.go index 1b59967c..4132fe9c 100644 --- a/fd_operator.go +++ b/fd_operator.go @@ -45,12 +45,17 @@ type FDOperator struct { // private, used by operatorCache next *FDOperator state int32 // CAS: 0(unused) 1(inuse) 2(do-done) + index int32 // index in operatorCache } func (op *FDOperator) Control(event PollEvent) error { return op.poll.Control(op, event) } +func (op *FDOperator) Free() { + op.poll.Free(op) +} + func (op *FDOperator) do() (can bool) { return atomic.CompareAndSwapInt32(&op.state, 1, 2) } diff --git a/fd_operator_cache.go b/fd_operator_cache.go index 6e93fa0d..0e37ddeb 100644 --- a/fd_operator_cache.go +++ b/fd_operator_cache.go @@ -20,69 +20,80 @@ import ( "unsafe" ) -func allocop() *FDOperator { - return opcache.alloc() -} - -func freeop(op *FDOperator) { - opcache.free(op) -} - -func init() { - opcache = &operatorCache{ - // cache: make(map[int][]byte), - cache: make([]*FDOperator, 0, 1024), +func newOperatorCache() *operatorCache { + return &operatorCache{ + cache: make([]*FDOperator, 0, 1024), + freelist: make([]int32, 0, 1024), } - runtime.KeepAlive(opcache) } -var opcache *operatorCache - type operatorCache struct { locked int32 first *FDOperator cache []*FDOperator + // freelist store the freeable operator + // to reduce GC pressure, we only store op index here + freelist []int32 + freelocked int32 } func (c *operatorCache) alloc() *FDOperator { - c.lock() + lock(&c.locked) if c.first == nil { const opSize = unsafe.Sizeof(FDOperator{}) n := block4k / opSize if n == 0 { n = 1 } - // Must be in non-GC memory because can be referenced - // only from epoll/kqueue internals. + index := int32(len(c.cache)) for i := uintptr(0); i < n; i++ { - pd := &FDOperator{} + pd := &FDOperator{index: index} c.cache = append(c.cache, pd) pd.next = c.first c.first = pd + index++ } } op := c.first c.first = op.next - c.unlock() + unlock(&c.locked) return op } -func (c *operatorCache) free(op *FDOperator) { +// freeable mark the operator that could be freed +// only poller could do the real free action +func (c *operatorCache) freeable(op *FDOperator) { + // reset all state op.unused() op.reset() + lock(&c.freelocked) + c.freelist = append(c.freelist, op.index) + unlock(&c.freelocked) +} + +func (c *operatorCache) free() { + lock(&c.freelocked) + defer unlock(&c.freelocked) + if len(c.freelist) == 0 { + return + } - c.lock() - op.next = c.first - c.first = op - c.unlock() + lock(&c.locked) + for _, idx := range c.freelist { + op := c.cache[idx] + op.next = c.first + c.first = op + } + c.freelist = c.freelist[:0] + unlock(&c.locked) } -func (c *operatorCache) lock() { - for !atomic.CompareAndSwapInt32(&c.locked, 0, 1) { +func lock(locked *int32) { + for !atomic.CompareAndSwapInt32(locked, 0, 1) { runtime.Gosched() } } -func (c *operatorCache) unlock() { - atomic.StoreInt32(&c.locked, 0) +func unlock(locked *int32) { + atomic.StoreInt32(locked, 0) } diff --git a/fd_operator_cache_test.go b/fd_operator_cache_test.go index 3e2838c7..a92f15fb 100644 --- a/fd_operator_cache_test.go +++ b/fd_operator_cache_test.go @@ -24,14 +24,16 @@ import ( // go test -v -gcflags=-d=checkptr -run=TestPersistFDOperator func TestPersistFDOperator(t *testing.T) { + opcache := newOperatorCache() // init - size := 1000 + size := 2048 var ops = make([]*FDOperator, size) for i := 0; i < size; i++ { - op := allocop() + op := opcache.alloc() op.FD = i ops[i] = op } + Equal(t, len(opcache.freelist), 0) // gc for i := 0; i < 4; i++ { runtime.GC() @@ -39,16 +41,23 @@ func TestPersistFDOperator(t *testing.T) { // check alloc for i := range ops { Equal(t, ops[i].FD, i) - freeop(ops[i]) + opcache.freeable(ops[i]) + Equal(t, len(opcache.freelist), i+1) } + Equal(t, len(opcache.freelist), size) + opcache.free() + Equal(t, len(opcache.freelist), 0) + Assert(t, len(opcache.cache) >= size) } func BenchmarkPersistFDOperator1(b *testing.B) { b.ReportAllocs() b.ResetTimer() + opcache := newOperatorCache() for i := 0; i < b.N; i++ { - op := allocop() - freeop(op) + op := opcache.alloc() + opcache.freeable(op) + opcache.free() } } @@ -57,10 +66,12 @@ func BenchmarkPersistFDOperator2(b *testing.B) { b.ReportAllocs() b.SetParallelism(128) b.ResetTimer() + opcache := newOperatorCache() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - op := allocop() - freeop(op) + op := opcache.alloc() + opcache.freeable(op) + opcache.free() } }) } diff --git a/net_listener_test.go b/net_listener_test.go index 315bc9ea..9516f8e5 100644 --- a/net_listener_test.go +++ b/net_listener_test.go @@ -30,28 +30,19 @@ func TestListenerDialer(t *testing.T) { addr := ":1234" ln, err := CreateListener(network, addr) MustNil(t, err) - defer time.Sleep(10 * time.Millisecond) defer ln.Close() - - stop := make(chan int) trigger := make(chan int) - defer close(stop) - defer close(trigger) msg := []byte("0123456789") go func() { for { - select { - case <-stop: - err := ln.Close() - MustNil(t, err) - return - default: - } conn, err := ln.Accept() if conn == nil && err == nil { continue } + if err != nil { + return + } go func(conn net.Conn) { <-trigger buf := make([]byte, 10) diff --git a/net_netfd.go b/net_netfd.go index 0aa09e93..cea01a77 100644 --- a/net_netfd.go +++ b/net_netfd.go @@ -107,7 +107,7 @@ func (c *netFD) dial(ctx context.Context, laddr, raddr sockaddr) (err error) { return nil } -func (c *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, ret error) { +func (c *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, retErr error) { // Do not need to call c.writing here, // because c is not yet accessible to user, // so no concurrent operations are possible. @@ -134,45 +134,6 @@ func (c *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa sysca return nil, os.NewSyscallError("connect", err) } - // TODO: can't support interrupter now. - // Start the "interrupter" goroutine, if this context might be canceled. - // (The background context cannot) - // - // The interrupter goroutine waits for the context to be done and - // interrupts the dial (by altering the c's write deadline, which - // wakes up waitWrite). - if ctx != context.Background() { - // Wait for the interrupter goroutine to exit before returning - // from connect. - done := make(chan struct{}) - interruptRes := make(chan error) - defer func() { - close(done) - if ctxErr := <-interruptRes; ctxErr != nil && ret == nil { - // The interrupter goroutine called SetWriteDeadline, - // but the connect code below had returned from - // waitWrite already and did a successful connect (ret - // == nil). Because we've now poisoned the connection - // by making it unwritable, don't return a successful - // dial. This was issue 16523. - ret = mapErr(ctxErr) - c.Close() // prevent a leak - } - }() - go func() { - select { - case <-ctx.Done(): - // Force the runtime's poller to immediately give up - // waiting for writability, unblocking waitWrite - // below. - c.SetWriteDeadline(aLongTimeAgo) - interruptRes <- ctx.Err() - case <-done: - interruptRes <- nil - } - }() - } - c.pd = newPollDesc(c.fd) for { // Performing multiple connect system calls on a diff --git a/net_netfd_conn.go b/net_netfd_conn.go index 44913f1c..c2ab43e0 100644 --- a/net_netfd_conn.go +++ b/net_netfd_conn.go @@ -62,7 +62,7 @@ func (c *netFD) Close() (err error) { if c.fd > 0 { err = syscall.Close(c.fd) if err != nil { - logger.Printf("netFD[%d] close error: %s", c.fd, err.Error()) + logger.Printf("NETPOLL: netFD[%d] close error: %s", c.fd, err.Error()) } } return err diff --git a/net_polldesc.go b/net_polldesc.go index 3a5ee32d..0b78c653 100644 --- a/net_polldesc.go +++ b/net_polldesc.go @@ -19,12 +19,13 @@ package netpoll import ( "context" - "sync" ) // TODO: recycle *pollDesc func newPollDesc(fd int) *pollDesc { - pd, op := &pollDesc{}, &FDOperator{} + pd := &pollDesc{} + poll := pollmanager.Pick() + op := poll.Alloc() op.FD = fd op.OnWrite = pd.onwrite op.OnHup = pd.onhup @@ -36,44 +37,46 @@ func newPollDesc(fd int) *pollDesc { } type pollDesc struct { - once sync.Once operator *FDOperator - // The write event is OneShot, then mark the writable to skip duplicate calling. writeTrigger chan struct{} closeTrigger chan struct{} } // WaitWrite . -// TODO: implement - poll support timeout hung up. -func (pd *pollDesc) WaitWrite(ctx context.Context) error { - var err error - pd.once.Do(func() { - // add ET|Write|Hup - pd.operator.poll = pollmanager.Pick() - err = pd.operator.Control(PollWritable) +func (pd *pollDesc) WaitWrite(ctx context.Context) (err error) { + defer func() { + // if return err != nil, upper caller function will close the connection if err != nil { - pd.detach() + pd.operator.Free() + } + }() + + if pd.operator.isUnused() { + // add ET|Write|Hup + if err = pd.operator.Control(PollWritable); err != nil { + logger.Printf("NETPOLL: pollDesc register operator failed: %v", err) + return err } - }) - if err != nil { - return err } select { + case <-pd.closeTrigger: + // no need to detach, since poller has done it in OnHup. + return Exception(ErrConnClosed, "by peer") + case <-pd.writeTrigger: + err = nil case <-ctx.Done(): + // deregister from poller, upper caller function will close fd pd.detach() - return mapErr(ctx.Err()) + err = mapErr(ctx.Err()) + } + // double check close trigger + select { case <-pd.closeTrigger: return Exception(ErrConnClosed, "by peer") - case <-pd.writeTrigger: - // if writable, check hup by select - select { - case <-pd.closeTrigger: - return Exception(ErrConnClosed, "by peer") - default: - return nil - } + default: + return err } } @@ -87,11 +90,16 @@ func (pd *pollDesc) onwrite(p Poll) error { } func (pd *pollDesc) onhup(p Poll) error { - close(pd.closeTrigger) + select { + case <-pd.closeTrigger: + default: + close(pd.closeTrigger) + } return nil } func (pd *pollDesc) detach() { - pd.operator.Control(PollDetach) - pd.operator.unused() + if err := pd.operator.Control(PollDetach); err != nil { + logger.Printf("NETPOLL: pollDesc detach operator failed: %v", err) + } } diff --git a/netpoll_server.go b/netpoll_server.go index baf5208f..2d6c5709 100644 --- a/netpoll_server.go +++ b/netpoll_server.go @@ -99,7 +99,7 @@ func (s *server) OnRead(p Poll) error { s.onQuit(err) return err } - logger.Println("accept conn failed:", err.Error()) + logger.Println("NETPOLL: accept conn failed:", err.Error()) return err } if conn == nil { diff --git a/poll.go b/poll.go index 6e72002c..1d5c42fb 100644 --- a/poll.go +++ b/poll.go @@ -34,6 +34,12 @@ type Poll interface { // Control the event of file descriptor and the operations is defined by PollEvent. Control(operator *FDOperator, event PollEvent) error + + // Alloc the operator from cache. + Alloc() (operator *FDOperator) + + // Free the operator from cache. + Free(operator *FDOperator) } // PollEvent defines the operation of poll.Control. diff --git a/poll_default_bsd.go b/poll_default_bsd.go index d2f087dd..62911bbd 100644 --- a/poll_default_bsd.go +++ b/poll_default_bsd.go @@ -43,12 +43,14 @@ func openDefaultPoll() *defaultPoll { if err != nil { panic(err) } + l.opcache = newOperatorCache() return l } type defaultPoll struct { fd int trigger uint32 + opcache *operatorCache // operator cache hups []func(p Poll) error } @@ -95,7 +97,7 @@ func (p *defaultPoll) Wait() error { var n, err = readv(operator.FD, bs, barriers[i].ivs) operator.InputAck(n) if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { - logger.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("NETPOLL: readv(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } @@ -122,7 +124,7 @@ func (p *defaultPoll) Wait() error { var n, err = sendmsg(operator.FD, bs, barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) if err != nil && err != syscall.EAGAIN { - logger.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("NETPOLL: sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } @@ -133,6 +135,7 @@ func (p *defaultPoll) Wait() error { } // hup conns together to avoid blocking the poll. p.detaches() + p.opcache.free() } } @@ -164,11 +167,11 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { case PollReadable, PollModReadable: operator.inuse() evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_ADD|syscall.EV_ENABLE - case PollDetach: - evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_DELETE|syscall.EV_ONESHOT case PollWritable: operator.inuse() evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE|syscall.EV_ONESHOT + case PollDetach: + evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_DELETE|syscall.EV_ONESHOT case PollR2RW: evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE case PollRW2R: @@ -178,6 +181,16 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { return err } +func (p *defaultPoll) Alloc() (operator *FDOperator) { + op := p.opcache.alloc() + op.poll = p + return op +} + +func (p *defaultPoll) Free(operator *FDOperator) { + p.opcache.freeable(operator) +} + func (p *defaultPoll) appendHup(operator *FDOperator) { p.hups = append(p.hups, operator.OnHup) operator.Control(PollDetach) diff --git a/poll_default_linux.go b/poll_default_linux.go index fdb10e45..290f33f2 100644 --- a/poll_default_linux.go +++ b/poll_default_linux.go @@ -48,15 +48,17 @@ func openDefaultPoll() *defaultPoll { poll.wop = &FDOperator{FD: int(r0)} poll.Control(poll.wop, PollReadable) + poll.opcache = newOperatorCache() return &poll } type defaultPoll struct { pollArgs - fd int // epoll fd - wop *FDOperator // eventfd, wake epoll_wait - buf []byte // read wfd trigger msg - trigger uint32 // trigger flag + fd int // epoll fd + wop *FDOperator // eventfd, wake epoll_wait + buf []byte // read wfd trigger msg + trigger uint32 // trigger flag + opcache *operatorCache // operator cache // fns for handle events Reset func(size, caps int) Handler func(events []epollevent) (closed bool) @@ -102,6 +104,8 @@ func (p *defaultPoll) Wait() (err error) { if p.Handler(p.events[:n]) { return nil } + // we can make sure that there is no op remaining if Handler finished + p.opcache.free() } } @@ -133,18 +137,20 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { if operator.OnRead != nil { // for non-connection operator.OnRead(p) - } else { + } else if operator.Inputs != nil { // for connection var bs = operator.Inputs(p.barriers[i].bs) if len(bs) > 0 { var n, err = readv(operator.FD, bs, p.barriers[i].ivs) operator.InputAck(n) if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { - logger.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("NETPOLL: readv(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } } + } else { + logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator) } } @@ -168,7 +174,7 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { if operator.OnWrite != nil { // for non-connection operator.OnWrite(p) - } else { + } else if operator.Outputs != nil { // for connection var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs) if len(bs) > 0 { @@ -176,11 +182,13 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) if err != nil && err != syscall.EAGAIN { - logger.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("NETPOLL: sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } } + } else { + logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator) } } operator.done() @@ -212,28 +220,39 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { var evt epollevent *(**FDOperator)(unsafe.Pointer(&evt.data)) = operator switch event { - case PollReadable: + case PollReadable: // server accept a new connection and wait read operator.inuse() op, evt.events = syscall.EPOLL_CTL_ADD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollModReadable: + case PollWritable: // client create a new connection and wait connect finished operator.inuse() + op, evt.events = syscall.EPOLL_CTL_ADD, EPOLLET|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR + case PollModReadable: // client wait read/write op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollDetach: + case PollDetach: // deregister op, evt.events = syscall.EPOLL_CTL_DEL, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollWritable: - operator.inuse() - op, evt.events = syscall.EPOLL_CTL_ADD, EPOLLET|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollR2RW: + case PollR2RW: // connection wait read/write op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollRW2R: + case PollRW2R: // connection wait read op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR } return EpollCtl(p.fd, op, operator.FD, &evt) } +func (p *defaultPoll) Alloc() (operator *FDOperator) { + op := p.opcache.alloc() + op.poll = p + return op +} + +func (p *defaultPoll) Free(operator *FDOperator) { + p.opcache.freeable(operator) +} + func (p *defaultPoll) appendHup(operator *FDOperator) { p.hups = append(p.hups, operator.OnHup) - operator.Control(PollDetach) + if err := operator.Control(PollDetach); err != nil { + logger.Printf("NETPOLL: poller detach operator failed: %v", err) + } operator.done() } diff --git a/poll_default_linux_test.go b/poll_default_linux_test.go index 7b6bdc23..4cb72eed 100644 --- a/poll_default_linux_test.go +++ b/poll_default_linux_test.go @@ -158,3 +158,166 @@ func TestEpollWait(t *testing.T) { err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event) MustNil(t, err) } + +func TestEpollETClose(t *testing.T) { + var epollfd, err = EpollCreate(0) + MustNil(t, err) + defer syscall.Close(epollfd) + rfd, wfd := GetSysFdPairs() + events := make([]epollevent, 128) + eventdata := [8]byte{0, 0, 0, 0, 0, 0, 0, 1} + event := &epollevent{ + events: EPOLLET | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR, + data: eventdata, + } + + // EPOLL: init state + err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event) + _, err = EpollWait(epollfd, events, -1) + MustNil(t, err) + Assert(t, events[0].events&syscall.EPOLLOUT != 0) + Assert(t, events[0].events&syscall.EPOLLRDHUP == 0) + Assert(t, events[0].events&syscall.EPOLLERR == 0) + + // EPOLL: close current fd + // nothing will happen + err = syscall.Close(rfd) + MustNil(t, err) + n, err := EpollWait(epollfd, events, 100) + MustNil(t, err) + Assert(t, n == 0, n) + err = syscall.Close(wfd) + MustNil(t, err) + + // EPOLL: close peer fd + // EPOLLOUT + rfd, wfd = GetSysFdPairs() + err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event) + err = syscall.Close(wfd) + MustNil(t, err) + n, err = EpollWait(epollfd, events, 100) + MustNil(t, err) + Assert(t, n == 1, n) + Assert(t, events[0].events&syscall.EPOLLOUT != 0) + Assert(t, events[0].events&syscall.EPOLLRDHUP != 0) + Assert(t, events[0].events&syscall.EPOLLERR == 0) +} + +func TestEpollETDel(t *testing.T) { + var epollfd, err = EpollCreate(0) + MustNil(t, err) + defer syscall.Close(epollfd) + rfd, wfd := GetSysFdPairs() + send := []byte("hello") + events := make([]epollevent, 128) + eventdata := [8]byte{0, 0, 0, 0, 0, 0, 0, 1} + event := &epollevent{ + events: EPOLLET | syscall.EPOLLIN | syscall.EPOLLRDHUP | syscall.EPOLLERR, + data: eventdata, + } + + // EPOLL: del partly + err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event) + MustNil(t, err) + event.events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR + err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event) + MustNil(t, err) + _, err = syscall.Write(wfd, send) + MustNil(t, err) + _, err = EpollWait(epollfd, events, 100) + MustNil(t, err) + Assert(t, events[0].events&syscall.EPOLLIN == 0) + Assert(t, events[0].events&syscall.EPOLLRDHUP == 0) + Assert(t, events[0].events&syscall.EPOLLERR == 0) +} + +func TestEpollConnectSameFD(t *testing.T) { + var epollfd, err = EpollCreate(0) + MustNil(t, err) + defer syscall.Close(epollfd) + events := make([]epollevent, 128) + eventdata1 := [8]byte{0, 0, 0, 0, 0, 0, 0, 1} + eventdata2 := [8]byte{0, 0, 0, 0, 0, 0, 0, 2} + event1 := &epollevent{ + events: EPOLLET | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR, + data: eventdata1, + } + event2 := &epollevent{ + events: EPOLLET | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR, + data: eventdata2, + } + eventin := &epollevent{ + events: syscall.EPOLLIN | syscall.EPOLLRDHUP | syscall.EPOLLERR, + data: eventdata1, + } + addr := syscall.SockaddrInet4{ + Port: 53, + Addr: [4]byte{8, 8, 8, 8}, + } + + // connect non-block socket + fd1, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP) + MustNil(t, err) + t.Logf("create fd: %d", fd1) + err = syscall.SetNonblock(fd1, true) + MustNil(t, err) + err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, fd1, event1) + MustNil(t, err) + err = syscall.Connect(fd1, &addr) + t.Log(err) + _, err = EpollWait(epollfd, events, -1) + MustNil(t, err) + Assert(t, events[0].events&syscall.EPOLLOUT != 0) + Assert(t, events[0].events&syscall.EPOLLRDHUP == 0) + Assert(t, events[0].events&syscall.EPOLLERR == 0) + // forget to del fd + //err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, fd1, event1) + //MustNil(t, err) + err = syscall.Close(fd1) // close fd1 + MustNil(t, err) + + // connect non-block socket with same fd + fd2, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP) + MustNil(t, err) + t.Logf("create fd: %d", fd2) + err = syscall.SetNonblock(fd2, true) + MustNil(t, err) + err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, fd2, event2) + MustNil(t, err) + err = syscall.Connect(fd2, &addr) + t.Log(err) + _, err = EpollWait(epollfd, events, -1) + MustNil(t, err) + Assert(t, events[0].events&syscall.EPOLLOUT != 0) + Assert(t, events[0].events&syscall.EPOLLRDHUP == 0) + Assert(t, events[0].events&syscall.EPOLLERR == 0) + err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, fd2, event2) + MustNil(t, err) + err = syscall.Close(fd2) // close fd2 + MustNil(t, err) + Equal(t, events[0].data, eventdata2) + + // no event after close fd + fd3, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP) + MustNil(t, err) + t.Logf("create fd: %d", fd3) + err = syscall.SetNonblock(fd3, true) + MustNil(t, err) + err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, fd3, event1) + MustNil(t, err) + err = syscall.Connect(fd3, &addr) + t.Log(err) + _, err = EpollWait(epollfd, events, -1) + MustNil(t, err) + Assert(t, events[0].events&syscall.EPOLLOUT != 0) + Assert(t, events[0].events&syscall.EPOLLRDHUP == 0) + Assert(t, events[0].events&syscall.EPOLLERR == 0) + MustNil(t, err) + err = EpollCtl(epollfd, unix.EPOLL_CTL_MOD, fd3, eventin) + MustNil(t, err) + err = syscall.Close(fd3) // close fd3 + MustNil(t, err) + n, err := EpollWait(epollfd, events, 100) + MustNil(t, err) + Assert(t, n == 0) +} diff --git a/poll_manager.go b/poll_manager.go index 9f6f500d..2c2e8097 100644 --- a/poll_manager.go +++ b/poll_manager.go @@ -72,7 +72,7 @@ func (m *manager) SetNumLoops(numLoops int) error { polls[idx] = m.polls[idx] } else { if err := m.polls[idx].Close(); err != nil { - logger.Printf("poller close failed: %v\n", err) + logger.Printf("NETPOLL: poller close failed: %v\n", err) } } } diff --git a/poll_race_bsd.go b/poll_race_bsd.go index 39540717..5caf393d 100644 --- a/poll_race_bsd.go +++ b/poll_race_bsd.go @@ -44,6 +44,7 @@ func openDefaultPoll() *defaultPoll { if err != nil { panic(err) } + l.opcache = newOperatorCache() return l } @@ -51,6 +52,7 @@ type defaultPoll struct { fd int trigger uint32 m sync.Map + opcache *operatorCache // operator cache hups []func(p Poll) error } @@ -102,7 +104,7 @@ func (p *defaultPoll) Wait() error { var n, err = readv(operator.FD, bs, barriers[i].ivs) operator.InputAck(n) if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { - logger.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("NETPOLL: readv(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } @@ -129,7 +131,7 @@ func (p *defaultPoll) Wait() error { var n, err = sendmsg(operator.FD, bs, barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) if err != nil && err != syscall.EAGAIN { - logger.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("NETPOLL: sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } @@ -140,6 +142,7 @@ func (p *defaultPoll) Wait() error { } // hup conns together to avoid blocking the poll. p.detaches() + p.opcache.free() } } @@ -195,9 +198,21 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { return err } +func (p *defaultPoll) Alloc() (operator *FDOperator) { + op := p.opcache.alloc() + op.poll = p + return op +} + +func (p *defaultPoll) Free(operator *FDOperator) { + p.opcache.freeable(operator) +} + func (p *defaultPoll) appendHup(operator *FDOperator) { p.hups = append(p.hups, operator.OnHup) - operator.Control(PollDetach) + if err := operator.Control(PollDetach); err != nil { + logger.Printf("NETPOLL: poller detach operator failed: %v", err) + } operator.done() } diff --git a/poll_race_linux.go b/poll_race_linux.go index 8ede0d99..254e5c89 100644 --- a/poll_race_linux.go +++ b/poll_race_linux.go @@ -44,6 +44,7 @@ func openDefaultPoll() *defaultPoll { } poll.wfd = int(r0) poll.Control(&FDOperator{FD: poll.wfd}, PollReadable) + poll.opcache = newOperatorCache() return &poll } @@ -54,6 +55,7 @@ type defaultPoll struct { buf []byte // read wfd trigger msg trigger uint32 // trigger flag m sync.Map + opcache *operatorCache // operator cache } type pollArgs struct { @@ -96,6 +98,7 @@ func (p *defaultPoll) Wait() (err error) { if p.handler(p.events[:n]) { return nil } + p.opcache.free() } } @@ -130,18 +133,20 @@ func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) { if operator.OnRead != nil { // for non-connection operator.OnRead(p) - } else { + } else if operator.Inputs != nil { // for connection var bs = operator.Inputs(p.barriers[i].bs) if len(bs) > 0 { var n, err = readv(operator.FD, bs, p.barriers[i].ivs) operator.InputAck(n) if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { - logger.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("NETPOLL: readv(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } } + } else { + logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator) } } @@ -166,7 +171,7 @@ func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) { if operator.OnWrite != nil { // for non-connection operator.OnWrite(p) - } else { + } else if operator.Outputs != nil { // for connection var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs) if len(bs) > 0 { @@ -174,11 +179,13 @@ func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) { var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) if err != nil && err != syscall.EAGAIN { - logger.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) + logger.Printf("NETPOLL: sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue } } + } else { + logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator) } } operator.done() @@ -222,17 +229,16 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { operator.inuse() p.m.Store(operator.FD, operator) op, evt.Events = syscall.EPOLL_CTL_ADD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollModReadable: + case PollWritable: operator.inuse() + p.m.Store(operator.FD, operator) + op, evt.Events = syscall.EPOLL_CTL_ADD, EPOLLET|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR + case PollModReadable: p.m.Store(operator.FD, operator) op, evt.Events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR case PollDetach: p.m.Delete(operator.FD) op, evt.Events = syscall.EPOLL_CTL_DEL, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollWritable: - operator.inuse() - p.m.Store(operator.FD, operator) - op, evt.Events = syscall.EPOLL_CTL_ADD, EPOLLET|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR case PollR2RW: op, evt.Events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR case PollRW2R: @@ -241,6 +247,16 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { return syscall.EpollCtl(p.fd, op, operator.FD, &evt) } +func (p *defaultPoll) Alloc() (operator *FDOperator) { + op := p.opcache.alloc() + op.poll = p + return op +} + +func (p *defaultPoll) Free(operator *FDOperator) { + p.opcache.freeable(operator) +} + func (p *defaultPoll) appendHup(operator *FDOperator) { p.hups = append(p.hups, operator.OnHup) operator.Control(PollDetach) diff --git a/poll_test.go b/poll_test.go index 6775b973..c30dab83 100644 --- a/poll_test.go +++ b/poll_test.go @@ -18,6 +18,7 @@ package netpoll import ( + "runtime" "sync" "sync/atomic" "syscall" @@ -83,22 +84,28 @@ func TestPollMod(t *testing.T) { err = p.Control(wop, PollWritable) // trigger one shot MustNil(t, err) - time.Sleep(50 * time.Millisecond) + for atomic.LoadInt32(&wn) == 0 { + runtime.Gosched() + } r, w, h = atomic.LoadInt32(&rn), atomic.LoadInt32(&wn), atomic.LoadInt32(&hn) Assert(t, r == 0 && w == 1 && h == 0, r, w, h) err = p.Control(rop, PollR2RW) // trigger write MustNil(t, err) - time.Sleep(time.Millisecond) + for atomic.LoadInt32(&wn) <= 1 { + runtime.Gosched() + } r, w, h = atomic.LoadInt32(&rn), atomic.LoadInt32(&wn), atomic.LoadInt32(&hn) Assert(t, r == 0 && w >= 2 && h == 0, r, w, h) // close wfd, then trigger hup rfd err = syscall.Close(wfd) // trigger hup MustNil(t, err) - time.Sleep(time.Millisecond) - r, w, h = atomic.LoadInt32(&rn), atomic.LoadInt32(&wn), atomic.LoadInt32(&hn) - Assert(t, r == 1 && w >= 2 && h >= 1, r, w, h) + for atomic.LoadInt32(&hn) == 0 { + runtime.Gosched() + } + w, h = atomic.LoadInt32(&wn), atomic.LoadInt32(&hn) + Assert(t, w >= 2 && h >= 1, r, w, h) p.Close() err = <-stop From 35578a79f9ffb65fd3cacf9ad73be1db0eb93088 Mon Sep 17 00:00:00 2001 From: Joway Date: Wed, 15 Feb 2023 16:26:58 +0800 Subject: [PATCH 11/20] fix: close connection when readv syscall return 0, nil (#189) --- connection_impl.go | 21 ++++++++----------- net_io.go | 42 ++++++++++++++++++++++++++++++++++++++ poll_default_bsd.go | 10 ++++----- poll_default_linux.go | 10 ++++----- poll_default_linux_test.go | 11 ++++++++-- poll_race_bsd.go | 10 ++++----- poll_race_linux.go | 10 ++++----- 7 files changed, 76 insertions(+), 38 deletions(-) create mode 100644 net_io.go diff --git a/connection_impl.go b/connection_impl.go index beb11d23..e22a137e 100644 --- a/connection_impl.go +++ b/connection_impl.go @@ -50,9 +50,11 @@ type connection struct { bookSize int // The size of data that can be read at once. } -var _ Connection = &connection{} -var _ Reader = &connection{} -var _ Writer = &connection{} +var ( + _ Connection = &connection{} + _ Reader = &connection{} + _ Writer = &connection{} +) // Reader implements Connection. func (c *connection) Reader() Reader { @@ -168,7 +170,7 @@ func (c *connection) Until(delim byte) (line []byte, err error) { l = c.inputBuffer.Len() i := c.inputBuffer.indexByte(delim, n) if i < 0 { - n = l //skip all exists bytes + n = l // skip all exists bytes continue } return c.Next(i + 1) @@ -470,18 +472,13 @@ func (c *connection) fill(need int) (err error) { for { bs = c.inputs(c.inputBarrier.bs) TryRead: - n, err = readv(c.fd, bs, c.inputBarrier.ivs) + n, err = ioread(c.fd, bs, c.inputBarrier.ivs) if err != nil { - if err == syscall.EINTR { - // if err == EINTR, we must reuse bs that has been booked - // otherwise will mess the input buffer - goto TryRead - } break } if n == 0 { - err = Exception(ErrEOF, "") - break + // we must reuse bs that has been booked, otherwise will mess the input buffer + goto TryRead } c.inputAck(n) } diff --git a/net_io.go b/net_io.go new file mode 100644 index 00000000..c7322fd2 --- /dev/null +++ b/net_io.go @@ -0,0 +1,42 @@ +// Copyright 2023 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package netpoll + +import "syscall" + +// return value: +// - n: n == 0 but err == nil, retry syscall +// - err: if not nil, connection should be closed. +func ioread(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) { + n, err = readv(fd, bs, ivs) + if n == 0 && err == nil { // means EOF + return 0, Exception(ErrEOF, "") + } + if err == syscall.EINTR || err == syscall.EAGAIN { + return 0, nil + } + return n, err +} + +// return value: +// - n: n == 0 but err == nil, retry syscall +// - err: if not nil, connection should be closed. +func iosend(fd int, bs [][]byte, ivs []syscall.Iovec, zerocopy bool) (n int, err error) { + n, err = sendmsg(fd, bs, ivs, zerocopy) + if err == syscall.EAGAIN { + return 0, nil + } + return n, err +} diff --git a/poll_default_bsd.go b/poll_default_bsd.go index 62911bbd..20c9dee3 100644 --- a/poll_default_bsd.go +++ b/poll_default_bsd.go @@ -94,10 +94,9 @@ func (p *defaultPoll) Wait() error { // only for connection var bs = operator.Inputs(barriers[i].bs) if len(bs) > 0 { - var n, err = readv(operator.FD, bs, barriers[i].ivs) + var n, err = ioread(operator.FD, bs, barriers[i].ivs) operator.InputAck(n) - if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { - logger.Printf("NETPOLL: readv(fd=%d) failed: %s", operator.FD, err.Error()) + if err != nil { p.appendHup(operator) continue } @@ -121,10 +120,9 @@ func (p *defaultPoll) Wait() error { var bs, supportZeroCopy = operator.Outputs(barriers[i].bs) if len(bs) > 0 { // TODO: Let the upper layer pass in whether to use ZeroCopy. - var n, err = sendmsg(operator.FD, bs, barriers[i].ivs, false && supportZeroCopy) + var n, err = iosend(operator.FD, bs, barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) - if err != nil && err != syscall.EAGAIN { - logger.Printf("NETPOLL: sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) + if err != nil { p.appendHup(operator) continue } diff --git a/poll_default_linux.go b/poll_default_linux.go index 290f33f2..bc7a23ab 100644 --- a/poll_default_linux.go +++ b/poll_default_linux.go @@ -141,10 +141,9 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { // for connection var bs = operator.Inputs(p.barriers[i].bs) if len(bs) > 0 { - var n, err = readv(operator.FD, bs, p.barriers[i].ivs) + var n, err = ioread(operator.FD, bs, p.barriers[i].ivs) operator.InputAck(n) - if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { - logger.Printf("NETPOLL: readv(fd=%d) failed: %s", operator.FD, err.Error()) + if err != nil { p.appendHup(operator) continue } @@ -179,10 +178,9 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs) if len(bs) > 0 { // TODO: Let the upper layer pass in whether to use ZeroCopy. - var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) + var n, err = iosend(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) - if err != nil && err != syscall.EAGAIN { - logger.Printf("NETPOLL: sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) + if err != nil { p.appendHup(operator) continue } diff --git a/poll_default_linux_test.go b/poll_default_linux_test.go index 4cb72eed..acd0afc9 100644 --- a/poll_default_linux_test.go +++ b/poll_default_linux_test.go @@ -18,6 +18,7 @@ package netpoll import ( + "errors" "syscall" "testing" @@ -167,7 +168,7 @@ func TestEpollETClose(t *testing.T) { events := make([]epollevent, 128) eventdata := [8]byte{0, 0, 0, 0, 0, 0, 0, 1} event := &epollevent{ - events: EPOLLET | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR, + events: EPOLLET | syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR, data: eventdata, } @@ -175,6 +176,7 @@ func TestEpollETClose(t *testing.T) { err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event) _, err = EpollWait(epollfd, events, -1) MustNil(t, err) + Assert(t, events[0].events&syscall.EPOLLIN == 0) Assert(t, events[0].events&syscall.EPOLLOUT != 0) Assert(t, events[0].events&syscall.EPOLLRDHUP == 0) Assert(t, events[0].events&syscall.EPOLLERR == 0) @@ -190,7 +192,7 @@ func TestEpollETClose(t *testing.T) { MustNil(t, err) // EPOLL: close peer fd - // EPOLLOUT + // EPOLLIN and EPOLLOUT rfd, wfd = GetSysFdPairs() err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event) err = syscall.Close(wfd) @@ -198,9 +200,14 @@ func TestEpollETClose(t *testing.T) { n, err = EpollWait(epollfd, events, 100) MustNil(t, err) Assert(t, n == 1, n) + Assert(t, events[0].events&syscall.EPOLLIN != 0) Assert(t, events[0].events&syscall.EPOLLOUT != 0) Assert(t, events[0].events&syscall.EPOLLRDHUP != 0) Assert(t, events[0].events&syscall.EPOLLERR == 0) + buf := make([]byte, 1024) + ivs := make([]syscall.Iovec, 1) + n, err = ioread(rfd, [][]byte{buf}, ivs) // EOF + Assert(t, n == 0 && errors.Is(err, ErrEOF), n, err) } func TestEpollETDel(t *testing.T) { diff --git a/poll_race_bsd.go b/poll_race_bsd.go index 5caf393d..2e9fa34b 100644 --- a/poll_race_bsd.go +++ b/poll_race_bsd.go @@ -101,10 +101,9 @@ func (p *defaultPoll) Wait() error { // only for connection var bs = operator.Inputs(barriers[i].bs) if len(bs) > 0 { - var n, err = readv(operator.FD, bs, barriers[i].ivs) + var n, err = ioread(operator.FD, bs, barriers[i].ivs) operator.InputAck(n) - if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { - logger.Printf("NETPOLL: readv(fd=%d) failed: %s", operator.FD, err.Error()) + if err != nil { p.appendHup(operator) continue } @@ -128,10 +127,9 @@ func (p *defaultPoll) Wait() error { var bs, supportZeroCopy = operator.Outputs(barriers[i].bs) if len(bs) > 0 { // TODO: Let the upper layer pass in whether to use ZeroCopy. - var n, err = sendmsg(operator.FD, bs, barriers[i].ivs, false && supportZeroCopy) + var n, err = iosend(operator.FD, bs, barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) - if err != nil && err != syscall.EAGAIN { - logger.Printf("NETPOLL: sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) + if err != nil { p.appendHup(operator) continue } diff --git a/poll_race_linux.go b/poll_race_linux.go index 254e5c89..0bbd4049 100644 --- a/poll_race_linux.go +++ b/poll_race_linux.go @@ -137,10 +137,9 @@ func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) { // for connection var bs = operator.Inputs(p.barriers[i].bs) if len(bs) > 0 { - var n, err = readv(operator.FD, bs, p.barriers[i].ivs) + var n, err = ioread(operator.FD, bs, p.barriers[i].ivs) operator.InputAck(n) - if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { - logger.Printf("NETPOLL: readv(fd=%d) failed: %s", operator.FD, err.Error()) + if err != nil { p.appendHup(operator) continue } @@ -176,10 +175,9 @@ func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) { var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs) if len(bs) > 0 { // TODO: Let the upper layer pass in whether to use ZeroCopy. - var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) + var n, err = iosend(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) - if err != nil && err != syscall.EAGAIN { - logger.Printf("NETPOLL: sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) + if err != nil { p.appendHup(operator) continue } From ea6d472b3e8ae5468b147b9fde2d9338bb02926a Mon Sep 17 00:00:00 2001 From: Joway Date: Fri, 17 Feb 2023 10:41:44 +0800 Subject: [PATCH 12/20] refactor: simplify race and norace event loop (#233) --- poll_default.go | 52 +++++++ poll_default_bsd.go | 48 ++---- poll_default_bsd_norace.go | 33 +++++ poll_default_bsd_race.go | 37 +++++ poll_default_linux.go | 47 +----- poll_default_linux_norace.go | 32 ++++ poll_default_linux_race.go | 43 ++++++ poll_race_bsd.go | 230 ----------------------------- poll_race_linux.go | 277 ----------------------------------- 9 files changed, 214 insertions(+), 585 deletions(-) create mode 100644 poll_default.go create mode 100644 poll_default_bsd_norace.go create mode 100644 poll_default_bsd_race.go create mode 100644 poll_default_linux_norace.go create mode 100644 poll_default_linux_race.go delete mode 100644 poll_race_bsd.go delete mode 100644 poll_race_linux.go diff --git a/poll_default.go b/poll_default.go new file mode 100644 index 00000000..0372d841 --- /dev/null +++ b/poll_default.go @@ -0,0 +1,52 @@ +// Copyright 2023 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package netpoll + +func (p *defaultPoll) Alloc() (operator *FDOperator) { + op := p.opcache.alloc() + op.poll = p + return op +} + +func (p *defaultPoll) Free(operator *FDOperator) { + p.opcache.freeable(operator) +} + +func (p *defaultPoll) appendHup(operator *FDOperator) { + p.hups = append(p.hups, operator.OnHup) + p.detach(operator) + operator.done() +} + +func (p *defaultPoll) detach(operator *FDOperator) { + if err := operator.Control(PollDetach); err != nil { + logger.Printf("NETPOLL: poller detach operator failed: %v", err) + } +} + +func (p *defaultPoll) onhups() { + if len(p.hups) == 0 { + return + } + hups := p.hups + p.hups = nil + go func(onhups []func(p Poll) error) { + for i := range onhups { + if onhups[i] != nil { + onhups[i](p) + } + } + }(hups) +} diff --git a/poll_default_bsd.go b/poll_default_bsd.go index 20c9dee3..591b3d1e 100644 --- a/poll_default_bsd.go +++ b/poll_default_bsd.go @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build (darwin || netbsd || freebsd || openbsd || dragonfly) && !race +//go:build darwin || netbsd || freebsd || openbsd || dragonfly // +build darwin netbsd freebsd openbsd dragonfly -// +build !race package netpoll import ( + "sync" "sync/atomic" "syscall" "unsafe" @@ -50,6 +50,7 @@ func openDefaultPoll() *defaultPoll { type defaultPoll struct { fd int trigger uint32 + m sync.Map // only used in go:race opcache *operatorCache // operator cache hups []func(p Poll) error } @@ -74,14 +75,15 @@ func (p *defaultPoll) Wait() error { return err } for i := 0; i < n; i++ { + var fd = int(events[i].Ident) // trigger - if events[i].Ident == 0 { + if fd == 0 { // clean trigger atomic.StoreUint32(&p.trigger, 0) continue } - var operator = *(**FDOperator)(unsafe.Pointer(&events[i].Udata)) - if !operator.do() { + var operator = p.getOperator(fd, unsafe.Pointer(&events[i].Udata)) + if operator == nil || !operator.do() { continue } @@ -132,7 +134,7 @@ func (p *defaultPoll) Wait() error { operator.done() } // hup conns together to avoid blocking the poll. - p.detaches() + p.onhups() p.opcache.free() } } @@ -160,7 +162,7 @@ func (p *defaultPoll) Trigger() error { func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { var evs = make([]syscall.Kevent_t, 1) evs[0].Ident = uint64(operator.FD) - *(**FDOperator)(unsafe.Pointer(&evs[0].Udata)) = operator + p.setOperator(unsafe.Pointer(&evs[0].Udata), operator) switch event { case PollReadable, PollModReadable: operator.inuse() @@ -169,6 +171,7 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { operator.inuse() evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE|syscall.EV_ONESHOT case PollDetach: + p.delOperator(operator) evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_DELETE|syscall.EV_ONESHOT case PollR2RW: evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE @@ -178,34 +181,3 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { _, err := syscall.Kevent(p.fd, evs, nil, nil) return err } - -func (p *defaultPoll) Alloc() (operator *FDOperator) { - op := p.opcache.alloc() - op.poll = p - return op -} - -func (p *defaultPoll) Free(operator *FDOperator) { - p.opcache.freeable(operator) -} - -func (p *defaultPoll) appendHup(operator *FDOperator) { - p.hups = append(p.hups, operator.OnHup) - operator.Control(PollDetach) - operator.done() -} - -func (p *defaultPoll) detaches() { - if len(p.hups) == 0 { - return - } - hups := p.hups - p.hups = nil - go func(onhups []func(p Poll) error) { - for i := range onhups { - if onhups[i] != nil { - onhups[i](p) - } - } - }(hups) -} diff --git a/poll_default_bsd_norace.go b/poll_default_bsd_norace.go new file mode 100644 index 00000000..8a0266d0 --- /dev/null +++ b/poll_default_bsd_norace.go @@ -0,0 +1,33 @@ +// Copyright 2023 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build (darwin || netbsd || freebsd || openbsd || dragonfly) && !race +// +build darwin netbsd freebsd openbsd dragonfly +// +build !race + +package netpoll + +import "unsafe" + +func (p *defaultPoll) getOperator(fd int, ptr unsafe.Pointer) *FDOperator { + return *(**FDOperator)(ptr) +} + +func (p *defaultPoll) setOperator(ptr unsafe.Pointer, operator *FDOperator) { + *(**FDOperator)(ptr) = operator +} + +func (p *defaultPoll) delOperator(operator *FDOperator) { + +} diff --git a/poll_default_bsd_race.go b/poll_default_bsd_race.go new file mode 100644 index 00000000..30baf6e0 --- /dev/null +++ b/poll_default_bsd_race.go @@ -0,0 +1,37 @@ +// Copyright 2023 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build (darwin || netbsd || freebsd || openbsd || dragonfly) && race +// +build darwin netbsd freebsd openbsd dragonfly +// +build race + +package netpoll + +import "unsafe" + +func (p *defaultPoll) getOperator(fd int, ptr unsafe.Pointer) *FDOperator { + tmp, _ := p.m.Load(fd) + if tmp == nil { + return nil + } + return tmp.(*FDOperator) +} + +func (p *defaultPoll) setOperator(ptr unsafe.Pointer, operator *FDOperator) { + p.m.Store(operator.FD, operator) +} + +func (p *defaultPoll) delOperator(operator *FDOperator) { + p.m.Delete(operator.FD) +} diff --git a/poll_default_linux.go b/poll_default_linux.go index bc7a23ab..2905bbac 100644 --- a/poll_default_linux.go +++ b/poll_default_linux.go @@ -12,13 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build !race -// +build !race - package netpoll import ( "runtime" + "sync" "sync/atomic" "syscall" "unsafe" @@ -58,6 +56,7 @@ type defaultPoll struct { wop *FDOperator // eventfd, wake epoll_wait buf []byte // read wfd trigger msg trigger uint32 // trigger flag + m sync.Map // only used in go:race opcache *operatorCache // operator cache // fns for handle events Reset func(size, caps int) @@ -111,8 +110,8 @@ func (p *defaultPoll) Wait() (err error) { func (p *defaultPoll) handler(events []epollevent) (closed bool) { for i := range events { - var operator = *(**FDOperator)(unsafe.Pointer(&events[i].data)) - if !operator.do() { + operator := p.getOperator(0, unsafe.Pointer(&events[i].data)) + if operator == nil || !operator.do() { continue } // trigger or exit gracefully @@ -192,7 +191,7 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { operator.done() } // hup conns together to avoid blocking the poll. - p.detaches() + p.onhups() return false } @@ -216,7 +215,7 @@ func (p *defaultPoll) Trigger() error { func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { var op int var evt epollevent - *(**FDOperator)(unsafe.Pointer(&evt.data)) = operator + p.setOperator(unsafe.Pointer(&evt.data), operator) switch event { case PollReadable: // server accept a new connection and wait read operator.inuse() @@ -227,6 +226,7 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { case PollModReadable: // client wait read/write op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR case PollDetach: // deregister + p.delOperator(operator) op, evt.events = syscall.EPOLL_CTL_DEL, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR case PollR2RW: // connection wait read/write op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR @@ -235,36 +235,3 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { } return EpollCtl(p.fd, op, operator.FD, &evt) } - -func (p *defaultPoll) Alloc() (operator *FDOperator) { - op := p.opcache.alloc() - op.poll = p - return op -} - -func (p *defaultPoll) Free(operator *FDOperator) { - p.opcache.freeable(operator) -} - -func (p *defaultPoll) appendHup(operator *FDOperator) { - p.hups = append(p.hups, operator.OnHup) - if err := operator.Control(PollDetach); err != nil { - logger.Printf("NETPOLL: poller detach operator failed: %v", err) - } - operator.done() -} - -func (p *defaultPoll) detaches() { - if len(p.hups) == 0 { - return - } - hups := p.hups - p.hups = nil - go func(onhups []func(p Poll) error) { - for i := range onhups { - if onhups[i] != nil { - onhups[i](p) - } - } - }(hups) -} diff --git a/poll_default_linux_norace.go b/poll_default_linux_norace.go new file mode 100644 index 00000000..29d5e6be --- /dev/null +++ b/poll_default_linux_norace.go @@ -0,0 +1,32 @@ +// Copyright 2023 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux && !race +// +build linux,!race + +package netpoll + +import "unsafe" + +func (p *defaultPoll) getOperator(fd int, ptr unsafe.Pointer) *FDOperator { + return *(**FDOperator)(ptr) +} + +func (p *defaultPoll) setOperator(ptr unsafe.Pointer, operator *FDOperator) { + *(**FDOperator)(ptr) = operator +} + +func (p *defaultPoll) delOperator(operator *FDOperator) { + +} diff --git a/poll_default_linux_race.go b/poll_default_linux_race.go new file mode 100644 index 00000000..775b587b --- /dev/null +++ b/poll_default_linux_race.go @@ -0,0 +1,43 @@ +// Copyright 2023 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux && race +// +build linux,race + +package netpoll + +import "unsafe" + +type eventdata struct { + fd int32 + pad int32 +} + +func (p *defaultPoll) getOperator(fd int, ptr unsafe.Pointer) *FDOperator { + data := *(*eventdata)(ptr) + tmp, _ := p.m.Load(int(data.fd)) + if tmp == nil { + return nil + } + return tmp.(*FDOperator) +} + +func (p *defaultPoll) setOperator(ptr unsafe.Pointer, operator *FDOperator) { + *(*eventdata)(ptr) = eventdata{fd: int32(operator.FD)} + p.m.Store(operator.FD, operator) +} + +func (p *defaultPoll) delOperator(operator *FDOperator) { + p.m.Delete(operator.FD) +} diff --git a/poll_race_bsd.go b/poll_race_bsd.go deleted file mode 100644 index 2e9fa34b..00000000 --- a/poll_race_bsd.go +++ /dev/null @@ -1,230 +0,0 @@ -// Copyright 2022 CloudWeGo Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build (darwin || netbsd || freebsd || openbsd || dragonfly) && race -// +build darwin netbsd freebsd openbsd dragonfly -// +build race - -package netpoll - -import ( - "sync" - "sync/atomic" - "syscall" -) - -// mock no race poll -func openPoll() Poll { - return openDefaultPoll() -} - -func openDefaultPoll() *defaultPoll { - l := new(defaultPoll) - p, err := syscall.Kqueue() - if err != nil { - panic(err) - } - l.fd = p - _, err = syscall.Kevent(l.fd, []syscall.Kevent_t{{ - Ident: 0, - Filter: syscall.EVFILT_USER, - Flags: syscall.EV_ADD | syscall.EV_CLEAR, - }}, nil, nil) - if err != nil { - panic(err) - } - l.opcache = newOperatorCache() - return l -} - -type defaultPoll struct { - fd int - trigger uint32 - m sync.Map - opcache *operatorCache // operator cache - hups []func(p Poll) error -} - -// Wait implements Poll. -func (p *defaultPoll) Wait() error { - // init - var size, caps = 1024, barriercap - var events, barriers = make([]syscall.Kevent_t, size), make([]barrier, size) - for i := range barriers { - barriers[i].bs = make([][]byte, caps) - barriers[i].ivs = make([]syscall.Iovec, caps) - } - // wait - for { - n, err := syscall.Kevent(p.fd, nil, events, nil) - if err != nil && err != syscall.EINTR { - // exit gracefully - if err == syscall.EBADF { - return nil - } - return err - } - for i := 0; i < n; i++ { - var fd = int(events[i].Ident) - // trigger - if fd == 0 { - // clean trigger - atomic.StoreUint32(&p.trigger, 0) - continue - } - tmp, ok := p.m.Load(fd) - if !ok { - continue - } - operator := tmp.(*FDOperator) - if !operator.do() { - continue - } - - // check poll in - if events[i].Filter == syscall.EVFILT_READ && events[i].Flags&syscall.EV_ENABLE != 0 { - if operator.OnRead != nil { - // for non-connection - operator.OnRead(p) - } else { - // only for connection - var bs = operator.Inputs(barriers[i].bs) - if len(bs) > 0 { - var n, err = ioread(operator.FD, bs, barriers[i].ivs) - operator.InputAck(n) - if err != nil { - p.appendHup(operator) - continue - } - } - } - } - - // check hup - if events[i].Flags&syscall.EV_EOF != 0 { - p.appendHup(operator) - continue - } - - // check poll out - if events[i].Filter == syscall.EVFILT_WRITE && events[i].Flags&syscall.EV_ENABLE != 0 { - if operator.OnWrite != nil { - // for non-connection - operator.OnWrite(p) - } else { - // only for connection - var bs, supportZeroCopy = operator.Outputs(barriers[i].bs) - if len(bs) > 0 { - // TODO: Let the upper layer pass in whether to use ZeroCopy. - var n, err = iosend(operator.FD, bs, barriers[i].ivs, false && supportZeroCopy) - operator.OutputAck(n) - if err != nil { - p.appendHup(operator) - continue - } - } - } - } - operator.done() - } - // hup conns together to avoid blocking the poll. - p.detaches() - p.opcache.free() - } -} - -// TODO: Close will bad file descriptor here -func (p *defaultPoll) Close() error { - var err = syscall.Close(p.fd) - // delete all *FDOperator - p.m.Range(func(key, value interface{}) bool { - var operator, _ = value.(*FDOperator) - if operator.OnHup != nil { - operator.OnHup(p) - } - return true - }) - return err -} - -// Trigger implements Poll. -func (p *defaultPoll) Trigger() error { - if atomic.AddUint32(&p.trigger, 1) > 1 { - return nil - } - _, err := syscall.Kevent(p.fd, []syscall.Kevent_t{{ - Ident: 0, - Filter: syscall.EVFILT_USER, - Fflags: syscall.NOTE_TRIGGER, - }}, nil, nil) - return err -} - -// Control implements Poll. -func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { - var evs = make([]syscall.Kevent_t, 1) - evs[0].Ident = uint64(operator.FD) - switch event { - case PollReadable, PollModReadable: - operator.inuse() - p.m.Store(operator.FD, operator) - evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_ADD|syscall.EV_ENABLE - case PollDetach: - p.m.Delete(operator.FD) - evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_DELETE|syscall.EV_ONESHOT - case PollWritable: - operator.inuse() - p.m.Store(operator.FD, operator) - evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE|syscall.EV_ONESHOT - case PollR2RW: - evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE - case PollRW2R: - evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_DELETE|syscall.EV_ONESHOT - } - _, err := syscall.Kevent(p.fd, evs, nil, nil) - return err -} - -func (p *defaultPoll) Alloc() (operator *FDOperator) { - op := p.opcache.alloc() - op.poll = p - return op -} - -func (p *defaultPoll) Free(operator *FDOperator) { - p.opcache.freeable(operator) -} - -func (p *defaultPoll) appendHup(operator *FDOperator) { - p.hups = append(p.hups, operator.OnHup) - if err := operator.Control(PollDetach); err != nil { - logger.Printf("NETPOLL: poller detach operator failed: %v", err) - } - operator.done() -} - -func (p *defaultPoll) detaches() { - if len(p.hups) == 0 { - return - } - hups := p.hups - p.hups = nil - go func(onhups []func(p Poll) error) { - for i := range onhups { - if onhups[i] != nil { - onhups[i](p) - } - } - }(hups) -} diff --git a/poll_race_linux.go b/poll_race_linux.go deleted file mode 100644 index 0bbd4049..00000000 --- a/poll_race_linux.go +++ /dev/null @@ -1,277 +0,0 @@ -// Copyright 2022 CloudWeGo Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build race -// +build race - -package netpoll - -import ( - "runtime" - "sync" - "sync/atomic" - "syscall" -) - -// mock no race poll -func openPoll() Poll { - return openDefaultPoll() -} - -func openDefaultPoll() *defaultPoll { - var poll = defaultPoll{} - poll.buf = make([]byte, 8) - var p, err = EpollCreate(0) - if err != nil { - panic(err) - } - poll.fd = p - var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0) - if e0 != 0 { - syscall.Close(p) - panic(err) - } - poll.wfd = int(r0) - poll.Control(&FDOperator{FD: poll.wfd}, PollReadable) - poll.opcache = newOperatorCache() - return &poll -} - -type defaultPoll struct { - pollArgs - fd int // epoll fd - wfd int // wake epoll wait - buf []byte // read wfd trigger msg - trigger uint32 // trigger flag - m sync.Map - opcache *operatorCache // operator cache -} - -type pollArgs struct { - size int - caps int - events []syscall.EpollEvent - barriers []barrier - hups []func(p Poll) error -} - -func (a *pollArgs) reset(size, caps int) { - a.size, a.caps = size, caps - a.events, a.barriers = make([]syscall.EpollEvent, size), make([]barrier, size) - for i := range a.barriers { - a.barriers[i].bs = make([][]byte, a.caps) - a.barriers[i].ivs = make([]syscall.Iovec, a.caps) - } -} - -// Wait implements Poll. -func (p *defaultPoll) Wait() (err error) { - // init - var caps, msec, n = barriercap, -1, 0 - p.reset(128, caps) - // wait - for { - if n == p.size && p.size < 128*1024 { - p.reset(p.size<<1, caps) - } - n, err = syscall.EpollWait(p.fd, p.events, msec) - if err != nil && err != syscall.EINTR { - return err - } - if n <= 0 { - msec = -1 - runtime.Gosched() - continue - } - msec = 0 - if p.handler(p.events[:n]) { - return nil - } - p.opcache.free() - } -} - -func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) { - for i := range events { - var fd = int(events[i].Fd) - // trigger or exit gracefully - if fd == p.wfd { - // must clean trigger first - syscall.Read(p.wfd, p.buf) - atomic.StoreUint32(&p.trigger, 0) - // if closed & exit - if p.buf[0] > 0 { - syscall.Close(p.wfd) - syscall.Close(p.fd) - return true - } - continue - } - tmp, ok := p.m.Load(fd) - if !ok { - continue - } - operator := tmp.(*FDOperator) - if !operator.do() { - continue - } - - evt := events[i].Events - // check poll in - if evt&syscall.EPOLLIN != 0 { - if operator.OnRead != nil { - // for non-connection - operator.OnRead(p) - } else if operator.Inputs != nil { - // for connection - var bs = operator.Inputs(p.barriers[i].bs) - if len(bs) > 0 { - var n, err = ioread(operator.FD, bs, p.barriers[i].ivs) - operator.InputAck(n) - if err != nil { - p.appendHup(operator) - continue - } - } - } else { - logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator) - } - } - - // check hup - if evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0 { - p.appendHup(operator) - continue - } - if evt&syscall.EPOLLERR != 0 { - // Under block-zerocopy, the kernel may give an error callback, which is not a real error, just an EAGAIN. - // So here we need to check this error, if it is EAGAIN then do nothing, otherwise still mark as hup. - if _, _, _, _, err := syscall.Recvmsg(operator.FD, nil, nil, syscall.MSG_ERRQUEUE); err != syscall.EAGAIN { - p.appendHup(operator) - } else { - operator.done() - } - continue - } - - // check poll out - if evt&syscall.EPOLLOUT != 0 { - if operator.OnWrite != nil { - // for non-connection - operator.OnWrite(p) - } else if operator.Outputs != nil { - // for connection - var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs) - if len(bs) > 0 { - // TODO: Let the upper layer pass in whether to use ZeroCopy. - var n, err = iosend(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) - operator.OutputAck(n) - if err != nil { - p.appendHup(operator) - continue - } - } - } else { - logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator) - } - } - operator.done() - } - // hup conns together to avoid blocking the poll. - p.detaches() - return false -} - -// Close will write 10000000 -func (p *defaultPoll) Close() error { - _, err := syscall.Write(p.wfd, []byte{1, 0, 0, 0, 0, 0, 0, 0}) - // delete all *FDOperator - p.m.Range(func(key, value interface{}) bool { - var operator, _ = value.(*FDOperator) - if operator.OnHup != nil { - operator.OnHup(p) - } - return true - }) - return err -} - -// Trigger implements Poll. -func (p *defaultPoll) Trigger() error { - if atomic.AddUint32(&p.trigger, 1) > 1 { - return nil - } - // MAX(eventfd) = 0xfffffffffffffffe - _, err := syscall.Write(p.wfd, []byte{0, 0, 0, 0, 0, 0, 0, 1}) - return err -} - -// Control implements Poll. -func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { - var op int - var evt syscall.EpollEvent - evt.Fd = int32(operator.FD) - switch event { - case PollReadable: - operator.inuse() - p.m.Store(operator.FD, operator) - op, evt.Events = syscall.EPOLL_CTL_ADD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollWritable: - operator.inuse() - p.m.Store(operator.FD, operator) - op, evt.Events = syscall.EPOLL_CTL_ADD, EPOLLET|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollModReadable: - p.m.Store(operator.FD, operator) - op, evt.Events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollDetach: - p.m.Delete(operator.FD) - op, evt.Events = syscall.EPOLL_CTL_DEL, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollR2RW: - op, evt.Events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR - case PollRW2R: - op, evt.Events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR - } - return syscall.EpollCtl(p.fd, op, operator.FD, &evt) -} - -func (p *defaultPoll) Alloc() (operator *FDOperator) { - op := p.opcache.alloc() - op.poll = p - return op -} - -func (p *defaultPoll) Free(operator *FDOperator) { - p.opcache.freeable(operator) -} - -func (p *defaultPoll) appendHup(operator *FDOperator) { - p.hups = append(p.hups, operator.OnHup) - operator.Control(PollDetach) - operator.done() -} - -func (p *defaultPoll) detaches() { - if len(p.hups) == 0 { - return - } - hups := p.hups - p.hups = nil - go func(onhups []func(p Poll) error) { - for i := range onhups { - if onhups[i] != nil { - onhups[i](p) - } - } - }(hups) -} From e30b2afafa48a462f8d4d51acb8a2071b4951f47 Mon Sep 17 00:00:00 2001 From: xiyang <90125263+JBossBC@users.noreply.github.com> Date: Tue, 28 Feb 2023 10:48:59 +0800 Subject: [PATCH 13/20] fix: ignore system fd (#236) --- net_netfd_conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/net_netfd_conn.go b/net_netfd_conn.go index c2ab43e0..4ac53328 100644 --- a/net_netfd_conn.go +++ b/net_netfd_conn.go @@ -59,7 +59,7 @@ func (c *netFD) Close() (err error) { if atomic.AddUint32(&c.closed, 1) != 1 { return nil } - if c.fd > 0 { + if c.fd > 2 { err = syscall.Close(c.fd) if err != nil { logger.Printf("NETPOLL: netFD[%d] close error: %s", c.fd, err.Error()) From 9ddc97b78acabdeb7b2f950b9939d18a921447c4 Mon Sep 17 00:00:00 2001 From: Joway Date: Tue, 28 Feb 2023 11:02:19 +0800 Subject: [PATCH 14/20] fix: shard queue state closed mistake (#237) --- mux/shard_queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mux/shard_queue.go b/mux/shard_queue.go index 7c7c1261..364fabae 100644 --- a/mux/shard_queue.go +++ b/mux/shard_queue.go @@ -111,7 +111,7 @@ func (q *ShardQueue) Close() error { // wait for all tasks finished for atomic.LoadInt32(&q.state) != closed { if atomic.LoadInt32(&q.trigger) == 0 { - atomic.StoreInt32(&q.trigger, closed) + atomic.StoreInt32(&q.state, closed) return nil } runtime.Gosched() From 6704628e5d139b4ab181a459fa1b1e6eb139b369 Mon Sep 17 00:00:00 2001 From: Joway Date: Wed, 1 Mar 2023 14:53:15 +0800 Subject: [PATCH 15/20] fix: poller read all data before connection close (#226) --- connection_impl.go | 48 +++-------------------------- connection_lock.go | 1 - connection_test.go | 2 +- netpoll_test.go | 70 +++++++++++++++++++++++++++++++++++++++---- poll_default.go | 23 ++++++++++++++ poll_default_bsd.go | 22 +++++++++----- poll_default_linux.go | 27 ++++++++++------- 7 files changed, 123 insertions(+), 70 deletions(-) diff --git a/connection_impl.go b/connection_impl.go index e22a137e..5b645ac3 100644 --- a/connection_impl.go +++ b/connection_impl.go @@ -370,8 +370,6 @@ func (c *connection) initFDOperator() { func (c *connection) initFinalizer() { c.AddCloseCallback(func(connection Connection) (err error) { c.stop(flushing) - // stop the finalizing state to prevent conn.fill function to be performed - c.stop(finalizing) c.operator.Free() if err = c.netFD.Close(); err != nil { logger.Printf("NETPOLL: netFD close failed: %v", err) @@ -407,15 +405,10 @@ func (c *connection) waitRead(n int) (err error) { } // wait full n for c.inputBuffer.Len() < n { - if c.IsActive() { - <-c.readTrigger - continue - } - // confirm that fd is still valid. - if atomic.LoadUint32(&c.netFD.closed) == 0 { - return c.fill(n) + if !c.IsActive() { + return Exception(ErrConnClosed, "wait read") } - return Exception(ErrConnClosed, "wait read") + <-c.readTrigger } return nil } @@ -432,12 +425,7 @@ func (c *connection) waitReadWithTimeout(n int) (err error) { for c.inputBuffer.Len() < n { if !c.IsActive() { // cannot return directly, stop timer before ! - // confirm that fd is still valid. - if atomic.LoadUint32(&c.netFD.closed) == 0 { - err = c.fill(n) - } else { - err = Exception(ErrConnClosed, "wait read") - } + err = Exception(ErrConnClosed, "wait read") break } @@ -460,34 +448,6 @@ func (c *connection) waitReadWithTimeout(n int) (err error) { return err } -// fill data after connection is closed. -func (c *connection) fill(need int) (err error) { - if !c.lock(finalizing) { - return ErrConnClosed - } - defer c.unlock(finalizing) - - var n int - var bs [][]byte - for { - bs = c.inputs(c.inputBarrier.bs) - TryRead: - n, err = ioread(c.fd, bs, c.inputBarrier.ivs) - if err != nil { - break - } - if n == 0 { - // we must reuse bs that has been booked, otherwise will mess the input buffer - goto TryRead - } - c.inputAck(n) - } - if c.inputBuffer.Len() >= need { - return nil - } - return err -} - // flush write data directly. func (c *connection) flush() error { if c.outputBuffer.IsEmpty() { diff --git a/connection_lock.go b/connection_lock.go index e036de4b..2dce6622 100644 --- a/connection_lock.go +++ b/connection_lock.go @@ -47,7 +47,6 @@ const ( closing key = iota processing flushing - finalizing // total must be at the bottom. total ) diff --git a/connection_test.go b/connection_test.go index 655777c5..5542ea5a 100644 --- a/connection_test.go +++ b/connection_test.go @@ -395,7 +395,7 @@ func TestConnectionUntil(t *testing.T) { buf, err := rconn.Reader().Until('\n') Equal(t, len(buf), 100) - MustTrue(t, errors.Is(err, ErrEOF)) + Assert(t, errors.Is(err, ErrConnClosed), err) } func TestBookSizeLargerThanMaxSize(t *testing.T) { diff --git a/netpoll_test.go b/netpoll_test.go index 933566aa..cedf6226 100644 --- a/netpoll_test.go +++ b/netpoll_test.go @@ -21,6 +21,9 @@ import ( "context" "errors" "math/rand" + "runtime" + "sync" + "sync/atomic" "testing" "time" ) @@ -248,9 +251,10 @@ func TestCloseCallbackWhenOnConnect(t *testing.T) { MustNil(t, err) } -func TestCloseAndWrite(t *testing.T) { +func TestServerReadAndClose(t *testing.T) { var network, address = "tcp", ":18888" var sendMsg = []byte("hello") + var closed int32 var loop = newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { _, err := connection.Reader().Next(len(sendMsg)) @@ -258,6 +262,7 @@ func TestCloseAndWrite(t *testing.T) { err = connection.Close() MustNil(t, err) + atomic.AddInt32(&closed, 1) return nil }, ) @@ -269,7 +274,10 @@ func TestCloseAndWrite(t *testing.T) { err = conn.Writer().Flush() MustNil(t, err) - time.Sleep(time.Millisecond * 100) // wait for poller close connection + for atomic.LoadInt32(&closed) == 0 { + runtime.Gosched() // wait for poller close connection + } + time.Sleep(time.Millisecond * 50) _, err = conn.Writer().WriteBinary(sendMsg) MustNil(t, err) err = conn.Writer().Flush() @@ -279,9 +287,59 @@ func TestCloseAndWrite(t *testing.T) { MustNil(t, err) } +func TestClientWriteAndClose(t *testing.T) { + var ( + network, address = "tcp", ":18889" + connnum = 10 + packetsize, packetnum = 1000 * 5, 1 + recvbytes int32 = 0 + ) + var loop = newTestEventLoop(network, address, + func(ctx context.Context, connection Connection) error { + buf, err := connection.Reader().Next(connection.Reader().Len()) + if errors.Is(err, ErrConnClosed) { + return err + } + MustNil(t, err) + atomic.AddInt32(&recvbytes, int32(len(buf))) + return nil + }, + ) + var wg sync.WaitGroup + for i := 0; i < connnum; i++ { + wg.Add(1) + go func() { + defer wg.Done() + var conn, err = DialConnection(network, address, time.Second) + MustNil(t, err) + sendMsg := make([]byte, packetsize) + for j := 0; j < packetnum; j++ { + _, err = conn.Write(sendMsg) + MustNil(t, err) + } + err = conn.Close() + MustNil(t, err) + }() + } + wg.Wait() + exceptbytes := int32(packetsize * packetnum * connnum) + for atomic.LoadInt32(&recvbytes) != exceptbytes { + t.Logf("left %d bytes not received", exceptbytes-atomic.LoadInt32(&recvbytes)) + runtime.Gosched() + } + err := loop.Shutdown(context.Background()) + MustNil(t, err) +} + func newTestEventLoop(network, address string, onRequest OnRequest, opts ...Option) EventLoop { - var listener, _ = CreateListener(network, address) - var eventLoop, _ = NewEventLoop(onRequest, opts...) - go eventLoop.Serve(listener) - return eventLoop + ln, err := CreateListener(network, address) + if err != nil { + panic(err) + } + elp, err := NewEventLoop(onRequest, opts...) + if err != nil { + panic(err) + } + go elp.Serve(ln) + return elp } diff --git a/poll_default.go b/poll_default.go index 0372d841..e9aaa093 100644 --- a/poll_default.go +++ b/poll_default.go @@ -50,3 +50,26 @@ func (p *defaultPoll) onhups() { } }(hups) } + +// readall read all left data before close connection +func readall(op *FDOperator, br barrier) (err error) { + var bs = br.bs + var ivs = br.ivs + var n int + for { + bs = op.Inputs(br.bs) + if len(bs) == 0 { + return nil + } + + TryRead: + n, err = ioread(op.FD, bs, ivs) + op.InputAck(n) + if err != nil { + return err + } + if n == 0 { + goto TryRead + } + } +} diff --git a/poll_default_bsd.go b/poll_default_bsd.go index 591b3d1e..b7a8b1c2 100644 --- a/poll_default_bsd.go +++ b/poll_default_bsd.go @@ -65,6 +65,7 @@ func (p *defaultPoll) Wait() error { barriers[i].ivs = make([]syscall.Iovec, caps) } // wait + var triggerRead, triggerWrite, triggerHup bool for { n, err := syscall.Kevent(p.fd, nil, events, nil) if err != nil && err != syscall.EINTR { @@ -87,8 +88,12 @@ func (p *defaultPoll) Wait() error { continue } - // check poll in - if events[i].Filter == syscall.EVFILT_READ && events[i].Flags&syscall.EV_ENABLE != 0 { + evt := events[i] + triggerRead = evt.Filter == syscall.EVFILT_READ && evt.Flags&syscall.EV_ENABLE != 0 + triggerWrite = evt.Filter == syscall.EVFILT_WRITE && evt.Flags&syscall.EV_ENABLE != 0 + triggerHup = evt.Flags&syscall.EV_EOF != 0 + + if triggerRead { if operator.OnRead != nil { // for non-connection operator.OnRead(p) @@ -105,15 +110,16 @@ func (p *defaultPoll) Wait() error { } } } - - // check hup - if events[i].Flags&syscall.EV_EOF != 0 { + if triggerHup && triggerRead && operator.Inputs != nil { // read all left data if peer send and close + if err = readall(operator, barriers[i]); err != nil { + logger.Printf("NETPOLL: readall(fd=%d) before close: %s", operator.FD, err.Error()) + } + } + if triggerHup { p.appendHup(operator) continue } - - // check poll out - if events[i].Filter == syscall.EVFILT_WRITE && events[i].Flags&syscall.EV_ENABLE != 0 { + if triggerWrite { if operator.OnWrite != nil { // for non-connection operator.OnWrite(p) diff --git a/poll_default_linux.go b/poll_default_linux.go index 2905bbac..ed3dde11 100644 --- a/poll_default_linux.go +++ b/poll_default_linux.go @@ -22,7 +22,6 @@ import ( "unsafe" ) -// Includes defaultPoll/multiPoll/uringPoll... func openPoll() Poll { return openDefaultPoll() } @@ -109,11 +108,19 @@ func (p *defaultPoll) Wait() (err error) { } func (p *defaultPoll) handler(events []epollevent) (closed bool) { + var triggerRead, triggerWrite, triggerHup, triggerError bool for i := range events { operator := p.getOperator(0, unsafe.Pointer(&events[i].data)) if operator == nil || !operator.do() { continue } + + evt := events[i].events + triggerRead = evt&syscall.EPOLLIN != 0 + triggerWrite = evt&syscall.EPOLLOUT != 0 + triggerHup = evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0 + triggerError = evt&syscall.EPOLLERR != 0 + // trigger or exit gracefully if operator.FD == p.wop.FD { // must clean trigger first @@ -130,9 +137,7 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { continue } - evt := events[i].events - // check poll in - if evt&syscall.EPOLLIN != 0 { + if triggerRead { if operator.OnRead != nil { // for non-connection operator.OnRead(p) @@ -151,13 +156,16 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator) } } - - // check hup - if evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0 { + if triggerHup && triggerRead && operator.Inputs != nil { // read all left data if peer send and close + if err := readall(operator, p.barriers[i]); err != nil { + logger.Printf("NETPOLL: readall(fd=%d) before close: %s", operator.FD, err.Error()) + } + } + if triggerHup { p.appendHup(operator) continue } - if evt&syscall.EPOLLERR != 0 { + if triggerError { // Under block-zerocopy, the kernel may give an error callback, which is not a real error, just an EAGAIN. // So here we need to check this error, if it is EAGAIN then do nothing, otherwise still mark as hup. if _, _, _, _, err := syscall.Recvmsg(operator.FD, nil, nil, syscall.MSG_ERRQUEUE); err != syscall.EAGAIN { @@ -167,8 +175,7 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { } continue } - // check poll out - if evt&syscall.EPOLLOUT != 0 { + if triggerWrite { if operator.OnWrite != nil { // for non-connection operator.OnWrite(p) From 1d17d4d28b224e8f6bb312f29f20fd12b5380c65 Mon Sep 17 00:00:00 2001 From: cui fliter Date: Fri, 21 Apr 2023 11:02:05 +0800 Subject: [PATCH 16/20] chore: fix some comments (#247) Signed-off-by: cui fliter --- connection_onevent.go | 2 +- netpoll_options.go | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/connection_onevent.go b/connection_onevent.go index e8567ba9..f8351f32 100644 --- a/connection_onevent.go +++ b/connection_onevent.go @@ -90,7 +90,7 @@ func (c *connection) AddCloseCallback(callback CloseCallback) error { return nil } -// OnPrepare supports close connection, but not read/write data. +// onPrepare supports close connection, but not read/write data. // connection will be registered by this call after preparing. func (c *connection) onPrepare(opts *options) (err error) { if opts != nil { diff --git a/netpoll_options.go b/netpoll_options.go index f2effafa..ec384f54 100644 --- a/netpoll_options.go +++ b/netpoll_options.go @@ -29,14 +29,15 @@ import ( // Experience recommends assigning a poller every 20c. // // You can only use SetNumLoops before any connection is created. An example usage: -// func init() { -// netpoll.SetNumLoops(...) -// } +// +// func init() { +// netpoll.SetNumLoops(...) +// } func SetNumLoops(numLoops int) error { return setNumLoops(numLoops) } -// LoadBalance sets the load balancing method. Load balancing is always a best effort to attempt +// SetLoadBalance sets the load balancing method. Load balancing is always a best effort to attempt // to distribute the incoming connections between multiple polls. // This option only works when NumLoops is set. func SetLoadBalance(lb LoadBalance) error { From 80441e9784eb2075fb08a55bb751ce133613f781 Mon Sep 17 00:00:00 2001 From: Remember <36129334+wuqinqiang@users.noreply.github.com> Date: Fri, 5 May 2023 14:01:54 +0800 Subject: [PATCH 17/20] fix: err to e0 (#251) --- poll_default_linux.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/poll_default_linux.go b/poll_default_linux.go index ed3dde11..f7b8ed67 100644 --- a/poll_default_linux.go +++ b/poll_default_linux.go @@ -37,7 +37,7 @@ func openDefaultPoll() *defaultPoll { var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0) if e0 != 0 { syscall.Close(p) - panic(err) + panic(e0) } poll.Reset = poll.reset From 38680138a94c1cc2ad394b53ede46d0a31f0c7ee Mon Sep 17 00:00:00 2001 From: Jayant Date: Mon, 8 May 2023 16:44:28 +0800 Subject: [PATCH 18/20] feat: add Detach function to support detach connection from its poller (#249) --- connection_impl.go | 6 +++++ connection_test.go | 59 ++++++++++++++++++++++++++++++++++++++++++++++ net_netfd.go | 2 ++ net_netfd_conn.go | 2 +- 4 files changed, 68 insertions(+), 1 deletion(-) diff --git a/connection_impl.go b/connection_impl.go index 5b645ac3..def1d97c 100644 --- a/connection_impl.go +++ b/connection_impl.go @@ -299,6 +299,12 @@ func (c *connection) Close() error { return c.onClose() } +// Detach detaches the connection from poller but doesn't close it. +func (c *connection) Detach() error { + c.detaching = true + return c.onClose() +} + // ------------------------------------------ private ------------------------------------------ var barrierPool = sync.Pool{ diff --git a/connection_test.go b/connection_test.go index 5542ea5a..3d8fe160 100644 --- a/connection_test.go +++ b/connection_test.go @@ -22,6 +22,8 @@ import ( "errors" "fmt" "math/rand" + "net" + "os" "runtime" "sync" "sync/atomic" @@ -432,3 +434,60 @@ func TestBookSizeLargerThanMaxSize(t *testing.T) { wg.Wait() rconn.Close() } + +func TestConnDetach(t *testing.T) { + ln, err := CreateListener("tcp", ":1234") + MustNil(t, err) + + go func() { + for { + conn, err := ln.Accept() + if err != nil { + return + } + if conn == nil { + continue + } + go func() { + buf := make([]byte, 1024) + // slow read + for { + _, err := conn.Read(buf) + if err != nil { + return + } + time.Sleep(100 * time.Millisecond) + _, err = conn.Write(buf) + if err != nil { + return + } + } + }() + } + }() + + c, err := DialConnection("tcp", ":1234", time.Second) + MustNil(t, err) + + conn := c.(*TCPConnection) + + err = conn.Detach() + MustNil(t, err) + + f := os.NewFile(uintptr(conn.fd), "netpoll-connection") + defer f.Close() + + gonetconn, err := net.FileConn(f) + MustNil(t, err) + buf := make([]byte, 1024) + _, err = gonetconn.Write(buf) + MustNil(t, err) + _, err = gonetconn.Read(buf) + MustNil(t, err) + + err = gonetconn.Close() + MustNil(t, err) + + err = ln.Close() + MustNil(t, err) +} diff --git a/net_netfd.go b/net_netfd.go index cea01a77..96bc0945 100644 --- a/net_netfd.go +++ b/net_netfd.go @@ -50,6 +50,8 @@ type netFD struct { network string // tcp tcp4 tcp6, udp, udp4, udp6, ip, ip4, ip6, unix, unixgram, unixpacket localAddr net.Addr remoteAddr net.Addr + // for detaching conn from poller + detaching bool } func newNetFD(fd, family, sotype int, net string) *netFD { diff --git a/net_netfd_conn.go b/net_netfd_conn.go index 4ac53328..cd6922d4 100644 --- a/net_netfd_conn.go +++ b/net_netfd_conn.go @@ -59,7 +59,7 @@ func (c *netFD) Close() (err error) { if atomic.AddUint32(&c.closed, 1) != 1 { return nil } - if c.fd > 2 { + if !c.detaching && c.fd > 2 { err = syscall.Close(c.fd) if err != nil { logger.Printf("NETPOLL: netFD[%d] close error: %s", c.fd, err.Error()) From cdac6f0c9014d7f8da1fb91e101969b623e89871 Mon Sep 17 00:00:00 2001 From: Jayant Date: Mon, 8 May 2023 16:45:37 +0800 Subject: [PATCH 19/20] optimize: WriteDirect implementation to avoid panic and duplicate creation of redundant LinkBufferNode when remainLen is 0 (#250) --- nocopy_linkbuffer.go | 32 +++++++++++++++++++------------- nocopy_linkbuffer_race.go | 32 +++++++++++++++++++------------- nocopy_linkbuffer_test.go | 4 +++- 3 files changed, 41 insertions(+), 27 deletions(-) diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index 6200bf5b..59cc6530 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -475,7 +475,7 @@ func (b *LinkBuffer) WriteDirect(p []byte, remainLen int) error { // find origin origin := b.flush malloc := b.mallocSize - remainLen // calculate the remaining malloc length - for t := origin.malloc - len(origin.buf); t <= malloc; t = origin.malloc - len(origin.buf) { + for t := origin.malloc - len(origin.buf); t < malloc; t = origin.malloc - len(origin.buf) { malloc -= t origin = origin.next } @@ -486,18 +486,24 @@ func (b *LinkBuffer) WriteDirect(p []byte, remainLen int) error { dataNode := newLinkBufferNode(0) dataNode.buf, dataNode.malloc = p[:0], n - newNode := newLinkBufferNode(0) - newNode.off = malloc - newNode.buf = origin.buf[:malloc] - newNode.malloc = origin.malloc - newNode.readonly = false - origin.malloc = malloc - origin.readonly = true - - // link nodes - dataNode.next = newNode - newNode.next = origin.next - origin.next = dataNode + if remainLen > 0 { + newNode := newLinkBufferNode(0) + newNode.off = malloc + newNode.buf = origin.buf[:malloc] + newNode.malloc = origin.malloc + newNode.readonly = false + origin.malloc = malloc + origin.readonly = true + + // link nodes + dataNode.next = newNode + newNode.next = origin.next + origin.next = dataNode + } else { + // link nodes + dataNode.next = origin.next + origin.next = dataNode + } // adjust b.write for b.write.next != nil { diff --git a/nocopy_linkbuffer_race.go b/nocopy_linkbuffer_race.go index 7f2f274f..a785aa15 100644 --- a/nocopy_linkbuffer_race.go +++ b/nocopy_linkbuffer_race.go @@ -513,7 +513,7 @@ func (b *LinkBuffer) WriteDirect(p []byte, remainLen int) error { // find origin origin := b.flush malloc := b.mallocSize - remainLen // calculate the remaining malloc length - for t := origin.malloc - len(origin.buf); t <= malloc; t = origin.malloc - len(origin.buf) { + for t := origin.malloc - len(origin.buf); t < malloc; t = origin.malloc - len(origin.buf) { malloc -= t origin = origin.next } @@ -524,18 +524,24 @@ func (b *LinkBuffer) WriteDirect(p []byte, remainLen int) error { dataNode := newLinkBufferNode(0) dataNode.buf, dataNode.malloc = p[:0], n - newNode := newLinkBufferNode(0) - newNode.off = malloc - newNode.buf = origin.buf[:malloc] - newNode.malloc = origin.malloc - newNode.readonly = false - origin.malloc = malloc - origin.readonly = true - - // link nodes - dataNode.next = newNode - newNode.next = origin.next - origin.next = dataNode + if remainLen > 0 { + newNode := newLinkBufferNode(0) + newNode.off = malloc + newNode.buf = origin.buf[:malloc] + newNode.malloc = origin.malloc + newNode.readonly = false + origin.malloc = malloc + origin.readonly = true + + // link nodes + dataNode.next = newNode + newNode.next = origin.next + origin.next = dataNode + } else { + // link nodes + dataNode.next = origin.next + origin.next = dataNode + } // adjust b.write for b.write.next != nil { diff --git a/nocopy_linkbuffer_test.go b/nocopy_linkbuffer_test.go index a2f68fb4..c3f9b9d8 100644 --- a/nocopy_linkbuffer_test.go +++ b/nocopy_linkbuffer_test.go @@ -454,9 +454,11 @@ func TestWriteDirect(t *testing.T) { buf.WriteDirect([]byte("nopqrst"), 28) bt[4] = 'u' buf.WriteDirect([]byte("vwxyz"), 27) + copy(bt[5:], "abcdefghijklmnopqrstuvwxyza") + buf.WriteDirect([]byte("abcdefghijklmnopqrstuvwxyz"), 0) buf.Flush() bs := buf.Bytes() - str := "abcdefghijklmnopqrstuvwxyz" + str := "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzaabcdefghijklmnopqrstuvwxyz" for i := 0; i < len(str); i++ { if bs[i] != str[i] { t.Error("not equal!") From 4b09897f5b07067929002b614c720924c6396f6e Mon Sep 17 00:00:00 2001 From: Remember <36129334+wuqinqiang@users.noreply.github.com> Date: Mon, 29 May 2023 19:30:05 +0800 Subject: [PATCH 20/20] fix: close the poll that has already been created when calling the openPoll fails (#256) --- poll_default_bsd.go | 11 ++++++----- poll_default_linux.go | 25 ++++++++++++++++--------- poll_manager.go | 15 +++++++++++++-- poll_test.go | 15 +++++++++------ 4 files changed, 44 insertions(+), 22 deletions(-) diff --git a/poll_default_bsd.go b/poll_default_bsd.go index b7a8b1c2..3312e435 100644 --- a/poll_default_bsd.go +++ b/poll_default_bsd.go @@ -24,15 +24,15 @@ import ( "unsafe" ) -func openPoll() Poll { +func openPoll() (Poll, error) { return openDefaultPoll() } -func openDefaultPoll() *defaultPoll { +func openDefaultPoll() (*defaultPoll, error) { l := new(defaultPoll) p, err := syscall.Kqueue() if err != nil { - panic(err) + return nil, err } l.fd = p _, err = syscall.Kevent(l.fd, []syscall.Kevent_t{{ @@ -41,10 +41,11 @@ func openDefaultPoll() *defaultPoll { Flags: syscall.EV_ADD | syscall.EV_CLEAR, }}, nil, nil) if err != nil { - panic(err) + syscall.Close(l.fd) + return nil, err } l.opcache = newOperatorCache() - return l + return l, nil } type defaultPoll struct { diff --git a/poll_default_linux.go b/poll_default_linux.go index f7b8ed67..8da7d55b 100644 --- a/poll_default_linux.go +++ b/poll_default_linux.go @@ -22,31 +22,38 @@ import ( "unsafe" ) -func openPoll() Poll { +func openPoll() (Poll, error) { return openDefaultPoll() } -func openDefaultPoll() *defaultPoll { - var poll = defaultPoll{} +func openDefaultPoll() (*defaultPoll, error) { + var poll = new(defaultPoll) + poll.buf = make([]byte, 8) var p, err = EpollCreate(0) if err != nil { - panic(err) + return nil, err } poll.fd = p + var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0) if e0 != 0 { - syscall.Close(p) - panic(e0) + _ = syscall.Close(poll.fd) + return nil, e0 } poll.Reset = poll.reset poll.Handler = poll.handler - poll.wop = &FDOperator{FD: int(r0)} - poll.Control(poll.wop, PollReadable) + + if err = poll.Control(poll.wop, PollReadable); err != nil { + _ = syscall.Close(poll.wop.FD) + _ = syscall.Close(poll.fd) + return nil, err + } + poll.opcache = newOperatorCache() - return &poll + return poll, nil } type defaultPoll struct { diff --git a/poll_manager.go b/poll_manager.go index 2c2e8097..119187c0 100644 --- a/poll_manager.go +++ b/poll_manager.go @@ -107,13 +107,24 @@ func (m *manager) Close() error { } // Run all pollers. -func (m *manager) Run() error { +func (m *manager) Run() (err error) { + defer func() { + if err != nil { + _ = m.Close() + } + }() + // new poll to fill delta. for idx := len(m.polls); idx < m.NumLoops; idx++ { - var poll = openPoll() + var poll Poll + poll, err = openPoll() + if err != nil { + return + } m.polls = append(m.polls, poll) go poll.Wait() } + // LoadBalance must be set before calling Run, otherwise it will panic. m.balance.Rebalance(m.polls) return nil diff --git a/poll_test.go b/poll_test.go index c30dab83..5980dde7 100644 --- a/poll_test.go +++ b/poll_test.go @@ -31,7 +31,9 @@ func TestPollTrigger(t *testing.T) { t.Skip() var trigger int var stop = make(chan error) - var p = openDefaultPoll() + var p, err = openDefaultPoll() + MustNil(t, err) + go func() { stop <- p.Wait() }() @@ -46,7 +48,7 @@ func TestPollTrigger(t *testing.T) { Equal(t, trigger, 2) p.Close() - err := <-stop + err = <-stop MustNil(t, err) } @@ -65,7 +67,8 @@ func TestPollMod(t *testing.T) { return nil } var stop = make(chan error) - var p = openDefaultPoll() + var p, err = openDefaultPoll() + MustNil(t, err) go func() { stop <- p.Wait() }() @@ -73,7 +76,6 @@ func TestPollMod(t *testing.T) { var rfd, wfd = GetSysFdPairs() var rop = &FDOperator{FD: rfd, OnRead: read, OnWrite: write, OnHup: hup, poll: p} var wop = &FDOperator{FD: wfd, OnRead: read, OnWrite: write, OnHup: hup, poll: p} - var err error var r, w, h int32 r, w, h = atomic.LoadInt32(&rn), atomic.LoadInt32(&wn), atomic.LoadInt32(&hn) Assert(t, r == 0 && w == 0 && h == 0, r, w, h) @@ -113,7 +115,8 @@ func TestPollMod(t *testing.T) { } func TestPollClose(t *testing.T) { - var p = openDefaultPoll() + var p, err = openDefaultPoll() + MustNil(t, err) var wg sync.WaitGroup wg.Add(1) go func() { @@ -126,7 +129,7 @@ func TestPollClose(t *testing.T) { func BenchmarkPollMod(b *testing.B) { b.StopTimer() - var p = openDefaultPoll() + var p, _ = openDefaultPoll() r, _ := GetSysFdPairs() var operator = &FDOperator{FD: r} p.Control(operator, PollReadable)