Skip to content

Commit

Permalink
chore: release v0.5.1 (#291)
Browse files Browse the repository at this point in the history
Co-authored-by: Joway <[email protected]>
  • Loading branch information
alice-yyds and joway authored Oct 18, 2023
1 parent 0faba6e commit 4b0bb96
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 6 deletions.
3 changes: 1 addition & 2 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type connection struct {
writeTrigger chan error
inputBuffer *LinkBuffer
outputBuffer *LinkBuffer
inputBarrier *barrier
outputBarrier *barrier
supportZeroCopy bool
maxSize int // The maximum size of data between two Release().
Expand Down Expand Up @@ -323,7 +322,7 @@ func (c *connection) init(conn Conn, opts *options) (err error) {
c.writeTrigger = make(chan error, 1)
c.bookSize, c.maxSize = pagesize, pagesize
c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer()
c.inputBarrier, c.outputBarrier = barrierPool.Get().(*barrier), barrierPool.Get().(*barrier)
c.outputBarrier = barrierPool.Get().(*barrier)

c.initNetFD(conn) // conn must be *netFD{}
c.initFDOperator()
Expand Down
1 change: 0 additions & 1 deletion connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func (c *connection) closeBuffer() {
// so we need to check the buffer length, and if it's an "unclean" close operation, let's give up to reuse the buffer
if c.inputBuffer.Len() == 0 || onConnect != nil || onRequest != nil {
c.inputBuffer.Close()
barrierPool.Put(c.inputBarrier)
}
if c.outputBuffer.Len() == 0 || onConnect != nil || onRequest != nil {
c.outputBuffer.Close()
Expand Down
37 changes: 37 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net"
"os"
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
Expand Down Expand Up @@ -638,3 +639,39 @@ func TestConnectionServerClose(t *testing.T) {
//time.Sleep(time.Second)
wg.Wait()
}

func TestConnectionDailTimeoutAndClose(t *testing.T) {
ln, err := createTestListener("tcp", ":12345")
MustNil(t, err)
defer ln.Close()

el, err := NewEventLoop(
func(ctx context.Context, connection Connection) error {
_, err = connection.Reader().Next(connection.Reader().Len())
return err
},
)
defer el.Shutdown(context.Background())
go func() {
err := el.Serve(ln)
if err != nil {
t.Logf("servce end with error: %v", err)
}
}()

loops := 100
conns := 100
for l := 0; l < loops; l++ {
var wg sync.WaitGroup
wg.Add(conns)
for i := 0; i < conns; i++ {
go func() {
defer wg.Done()
conn, err := DialConnection("tcp", ":12345", time.Nanosecond)
Assert(t, err == nil || strings.Contains(err.Error(), "i/o timeout"))
_ = conn
}()
}
wg.Wait()
}
}
9 changes: 6 additions & 3 deletions net_polldesc.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,17 @@ func (pd *pollDesc) WaitWrite(ctx context.Context) (err error) {
}

select {
case <-pd.closeTrigger:
case <-pd.closeTrigger: // triggered by poller
// no need to detach, since poller has done it in OnHup.
return Exception(ErrConnClosed, "by peer")
case <-pd.writeTrigger:
case <-pd.writeTrigger: // triggered by poller
err = nil
case <-ctx.Done():
case <-ctx.Done(): // triggered by ctx
// deregister from poller, upper caller function will close fd
// detach first but there's a very small possibility that operator is doing in poller,
// so need call unused() to wait operator done
pd.detach()
pd.operator.unused()
err = mapErr(ctx.Err())
}
// double check close trigger
Expand Down

0 comments on commit 4b0bb96

Please sign in to comment.