Skip to content

Commit

Permalink
Merge pull request #352 from cloudwego/release-v0.6.3
Browse files Browse the repository at this point in the history
chore: release v0.6.3
  • Loading branch information
joway authored Jul 25, 2024
2 parents fa3c9e0 + 6790949 commit 3bd1fe2
Show file tree
Hide file tree
Showing 16 changed files with 477 additions and 425 deletions.
25 changes: 9 additions & 16 deletions .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,22 @@ jobs:
compatibility-test:
strategy:
matrix:
go: [ 1.15, "1.21" ]
os: [ X64, ARM64 ]
go: [ 1.15, 1.22 ]
# - "ubuntu-latest" is for Linux with X64 CPU, hosted by GitHub,
# fewer CPUs but high speed international network
# - "ARM64" is for Linux with ARM64 CPU, hosted by bytedance,
# more CPUs but inside CN internet which may download go cache slowly.
# GitHub don't have free runner with ARM CPU.
os: [ ubuntu-latest, ARM64 ]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go }}
# - uses: actions/cache@v2
# with:
# path: ~/go/pkg/mod
# key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
# restore-keys: |
# ${{ runner.os }}-go-
- name: Unit Test
run: go test -v -race -covermode=atomic -coverprofile=coverage.out ./...
run: go test -timeout=2m -v -race -covermode=atomic -coverprofile=coverage.out ./...
- name: Benchmark
run: go test -bench=. -benchmem -run=none ./...
windows-test:
Expand All @@ -32,13 +31,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.20"
# - uses: actions/cache@v2
# with:
# path: ~/go/pkg/mod
# key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
# restore-keys: |
# ${{ runner.os }}-go-
go-version: 1.22
- name: Build Test
run: go vet -v ./...
style-test:
Expand Down
4 changes: 2 additions & 2 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,8 @@ func (c *connection) init(conn Conn, opts *options) (err error) {
// init buffer, barrier, finalizer
c.readTrigger = make(chan error, 1)
c.writeTrigger = make(chan error, 1)
c.bookSize, c.maxSize = pagesize, pagesize
c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer()
c.bookSize, c.maxSize = defaultLinkBufferSize, defaultLinkBufferSize
c.inputBuffer, c.outputBuffer = NewLinkBuffer(defaultLinkBufferSize), NewLinkBuffer()
c.outputBarrier = barrierPool.Get().(*barrier)
c.state = 0

Expand Down
124 changes: 67 additions & 57 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"os"
"runtime"
Expand Down Expand Up @@ -102,11 +101,13 @@ func TestConnectionLargeWrite(t *testing.T) {
func TestConnectionRead(t *testing.T) {
r, w := GetSysFdPairs()
var rconn, wconn = &connection{}, &connection{}
rconn.init(&netFD{fd: r}, nil)
wconn.init(&netFD{fd: w}, nil)
err := rconn.init(&netFD{fd: r}, nil)
MustNil(t, err)
err = wconn.init(&netFD{fd: w}, nil)
MustNil(t, err)

var size = 256
var cycleTime = 100000
var cycleTime = 1000
var msg = make([]byte, size)
var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -129,6 +130,13 @@ func TestConnectionRead(t *testing.T) {
}

func TestConnectionNoCopyReadString(t *testing.T) {
err := Configure(Config{Feature: Feature{AlwaysNoCopyRead: true}})
MustNil(t, err)
defer func() {
err = Configure(Config{Feature: Feature{AlwaysNoCopyRead: false}})
MustNil(t, err)
}()

r, w := GetSysFdPairs()
var rconn, wconn = &connection{}, &connection{}
rconn.init(&netFD{fd: r}, nil)
Expand Down Expand Up @@ -382,34 +390,28 @@ func TestConnectionLargeMemory(t *testing.T) {
rconn.init(&netFD{fd: r}, nil)

var wg sync.WaitGroup
defer wg.Wait()

var rn, wn = 1024, 1 * 1024 * 1024

wg.Add(1)
go func() {
defer wg.Done()
rconn.Reader().Next(wn)
_, err := rconn.Reader().Next(wn)
MustNil(t, err)
}()

var msg = make([]byte, rn)
for i := 0; i < wn/rn; i++ {
n, err := syscall.Write(w, msg)
if err != nil {
panic(err)
}
if n != rn {
panic(fmt.Sprintf("n[%d]!=rn[%d]", n, rn))
MustNil(t, err)
}
time.Sleep(time.Millisecond)
Equal(t, n, rn)
}

runtime.ReadMemStats(&end)
alloc := end.TotalAlloc - start.TotalAlloc
limit := uint64(4 * 1024 * 1024)
if alloc > limit {
panic(fmt.Sprintf("alloc[%d] out of memory %d", alloc, limit))
}
Assert(t, alloc <= limit, fmt.Sprintf("alloc[%d] out of memory %d", alloc, limit))
}

// TestSetTCPNoDelay is used to verify the connection initialization set the TCP_NODELAY correctly
Expand All @@ -431,7 +433,7 @@ func TestConnectionUntil(t *testing.T) {
rconn, wconn := &connection{}, &connection{}
rconn.init(&netFD{fd: r}, nil)
wconn.init(&netFD{fd: w}, nil)
loopSize := 100000
loopSize := 10000

msg := make([]byte, 1002)
msg[500], msg[1001] = '\n', '\n'
Expand Down Expand Up @@ -459,44 +461,57 @@ func TestConnectionUntil(t *testing.T) {

func TestBookSizeLargerThanMaxSize(t *testing.T) {
r, w := GetSysFdPairs()
var rconn, wconn = &connection{}, &connection{}
rconn.init(&netFD{fd: r}, nil)
wconn.init(&netFD{fd: w}, nil)
rconn, wconn := &connection{}, &connection{}
err := rconn.init(&netFD{fd: r}, nil)
MustNil(t, err)
err = wconn.init(&netFD{fd: w}, nil)
MustNil(t, err)

var length = 25
dataCollection := make([][]byte, length)
for i := 0; i < length; i++ {
dataCollection[i] = make([]byte, 2<<i)
for j := 0; j < 2<<i; j++ {
dataCollection[i][j] = byte(rand.Intn(256))
// prepare data
maxSize := 1024 * 1024 * 128
origin := make([][]byte, 0)
for size := maxSize; size > 0; size = size >> 1 {
ch := 'a' + byte(size%26)
origin = append(origin, make([]byte, size))
for i := 0; i < size; i++ {
origin[len(origin)-1][i] = ch
}
}

// read
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < length; i++ {
buf, err := rconn.Reader().Next(2 << i)
idx := 0
for size := maxSize; size > 0; size = size >> 1 {
buf, err := rconn.Reader().Next(size)
MustNil(t, err)
Equal(t, string(buf), string(dataCollection[i]))
rconn.Reader().Release()
Equal(t, string(buf), string(origin[idx]))
err = rconn.Reader().Release()
MustNil(t, err)
idx++
}
}()
for i := 0; i < length; i++ {
n, err := wconn.Write(dataCollection[i])

// write
for i := 0; i < len(origin); i++ {
n, err := wconn.Write(origin[i])
MustNil(t, err)
Equal(t, n, 2<<i)
Equal(t, n, len(origin[i]))
}
wg.Wait()
rconn.Close()
wconn.Close()
}

func TestConnDetach(t *testing.T) {
address := getTestAddress()
ln, err := createTestListener("tcp", address)
MustNil(t, err)

// accept => read => write
var wg sync.WaitGroup
go func() {
for {
conn, err := ln.Accept()
Expand All @@ -506,35 +521,33 @@ func TestConnDetach(t *testing.T) {
if conn == nil {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
buf := make([]byte, 1024)
// slow read
for {
_, err := conn.Read(buf)
if err != nil {
return
}
time.Sleep(100 * time.Millisecond)
_, err = conn.Write(buf)
if err != nil {
return
}
_, err := conn.Read(buf)
if err != nil {
return
}
time.Sleep(10 * time.Millisecond)
_, err = conn.Write(buf)
if err != nil {
return
}
}()
}
}()

// dial => detach => write => read
c, err := DialConnection("tcp", address, time.Second)
MustNil(t, err)

conn := c.(*TCPConnection)

err = conn.Detach()
MustNil(t, err)

f := os.NewFile(uintptr(conn.fd), "netpoll-connection")
defer f.Close()

gonetconn, err := net.FileConn(f)
MustNil(t, err)
buf := make([]byte, 1024)
Expand All @@ -545,13 +558,14 @@ func TestConnDetach(t *testing.T) {

err = gonetconn.Close()
MustNil(t, err)

err = ln.Close()
MustNil(t, err)
err = c.Close()
MustNil(t, err)
wg.Wait()
}

func TestParallelShortConnection(t *testing.T) {
t.Skip("TODO: it's not stable now, need fix CI")
address := getTestAddress()
ln, err := createTestListener("tcp", address)
MustNil(t, err)
Expand Down Expand Up @@ -592,11 +606,8 @@ func TestParallelShortConnection(t *testing.T) {
}
wg.Wait()

count := 100
for count > 0 && atomic.LoadInt64(&received) < int64(totalSize) {
t.Logf("received: %d, except: %d", atomic.LoadInt64(&received), totalSize)
time.Sleep(time.Millisecond * 100)
count--
for atomic.LoadInt64(&received) < int64(totalSize) {
runtime.Gosched()
}
Equal(t, atomic.LoadInt64(&received), int64(totalSize))
}
Expand All @@ -619,7 +630,7 @@ func TestConnectionServerClose(t *testing.T) {
var wg sync.WaitGroup
el, err := NewEventLoop(
func(ctx context.Context, connection Connection) error {
t.Logf("server.OnRequest: addr=%s", connection.RemoteAddr())
//t.Logf("server.OnRequest: addr=%s", connection.RemoteAddr())
defer wg.Done()
buf, err := connection.Reader().Next(len(PONG)) // pong
Equal(t, string(buf), PONG)
Expand All @@ -642,14 +653,14 @@ func TestConnectionServerClose(t *testing.T) {
err = connection.Writer().Flush()
MustNil(t, err)
connection.AddCloseCallback(func(connection Connection) error {
t.Logf("server.CloseCallback: addr=%s", connection.RemoteAddr())
//t.Logf("server.CloseCallback: addr=%s", connection.RemoteAddr())
wg.Done()
return nil
})
return ctx
}),
WithOnPrepare(func(connection Connection) context.Context {
t.Logf("server.OnPrepare: addr=%s", connection.RemoteAddr())
//t.Logf("server.OnPrepare: addr=%s", connection.RemoteAddr())
defer wg.Done()
return context.WithValue(context.Background(), "prepare", "true")
}),
Expand Down Expand Up @@ -690,13 +701,12 @@ func TestConnectionServerClose(t *testing.T) {
err = conn.SetOnRequest(clientOnRequest)
MustNil(t, err)
conn.AddCloseCallback(func(connection Connection) error {
t.Logf("client.CloseCallback: addr=%s", connection.LocalAddr())
//t.Logf("client.CloseCallback: addr=%s", connection.LocalAddr())
defer wg.Done()
return nil
})
}()
}
//time.Sleep(time.Second)
wg.Wait()
}

Expand Down
Loading

0 comments on commit 3bd1fe2

Please sign in to comment.