Skip to content

Commit

Permalink
fix: unified runner config and hid gopool (#380)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaost authored Jan 13, 2025
1 parent 53e3080 commit 0207003
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 23 deletions.
17 changes: 2 additions & 15 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,9 @@ import (
"context"
"sync/atomic"

"github.com/bytedance/gopkg/util/gopool"
"github.com/cloudwego/netpoll/internal/runner"
)

var runTask = gopool.CtxGo

func setRunner(runner func(ctx context.Context, f func())) {
runTask = runner
}

func disableGopool() error {
runTask = func(ctx context.Context, f func()) {
go f()
}
return nil
}

// ------------------------------------ implement OnPrepare, OnRequest, CloseCallback ------------------------------------

type gracefulExit interface {
Expand Down Expand Up @@ -272,7 +259,7 @@ func (c *connection) onProcess(onConnect OnConnect, onRequest OnRequest) (proces
} // end of task closure func

// add new task
runTask(c.ctx, task)
runner.RunTask(c.ctx, task)
return true
}

Expand Down
45 changes: 45 additions & 0 deletions internal/runner/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2025 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package runner

import (
"context"

"github.com/bytedance/gopkg/util/gopool"
)

// RunTask runs the `f` in background, and `ctx` is optional.
// `ctx` is used to pass to underlying implementation
var RunTask func(ctx context.Context, f func())

func goRunTask(ctx context.Context, f func()) {
go f()
}

func init() {
// TODO(xiaost): Disable gopool by default in the future.
// Once we move to use gopool of cloudwego/gopkg in other repos,
// there should be no reason to continue using bytedance/gopkg version,
// and for most users, using the 'go' keyword directly is more suitable.
RunTask = gopool.CtxGo
}

// UseGoRunTask updates RunTask with goRunTask which creates
// a new goroutine for the given func, basically `go f()`
func UseGoRunTask() {
RunTask = goRunTask
}
37 changes: 37 additions & 0 deletions internal/runner/runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2025 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package runner

import (
"context"
"sync"
"testing"
)

func TestRunTask(t *testing.T) {
var wg sync.WaitGroup
wg.Add(2)
ctx := context.Background()
RunTask(ctx, func() {
wg.Done()
})
UseGoRunTask()
RunTask(ctx, func() {
wg.Done()
})
wg.Wait()
}
5 changes: 2 additions & 3 deletions mux/shard_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import (
"sync"
"sync/atomic"

"github.com/bytedance/gopkg/util/gopool"

"github.com/cloudwego/netpoll"
"github.com/cloudwego/netpoll/internal/runner"
)

/* DOC:
Expand Down Expand Up @@ -137,7 +136,7 @@ func (q *ShardQueue) foreach() {
if atomic.AddInt32(&q.runNum, 1) > 1 {
return
}
gopool.CtxGo(nil, func() {
runner.RunTask(nil, func() {
var negNum int32 // is negative number of triggerNum
for triggerNum := atomic.LoadInt32(&q.trigger); triggerNum > 0; {
q.r = (q.r + 1) % q.size
Expand Down
15 changes: 10 additions & 5 deletions netpoll_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"os"
"runtime"
"sync"

"github.com/cloudwego/netpoll/internal/runner"
)

var (
Expand Down Expand Up @@ -52,7 +54,7 @@ func Configure(config Config) (err error) {
}

if config.Runner != nil {
setRunner(config.Runner)
runner.RunTask = config.Runner
}
if config.LoggerOutput != nil {
logger = log.New(config.LoggerOutput, "", log.LstdFlags)
Expand Down Expand Up @@ -99,19 +101,22 @@ func SetLoggerOutput(w io.Writer) {
}

// SetRunner set the runner function for every OnRequest/OnConnect callback
// Deprecated: use Configure instead.
//
// Deprecated: use Configure and specify config.Runner instead.
func SetRunner(f func(ctx context.Context, f func())) {
setRunner(f)
runner.RunTask = f
}

// DisableGopool will remove gopool(the goroutine pool used to run OnRequest),
// which means that OnRequest will be run via `go OnRequest(...)`.
// Usually, OnRequest will cause stack expansion, which can be solved by reusing goroutine.
// But if you can confirm that the OnRequest will not cause stack expansion,
// it is recommended to use DisableGopool to reduce redundancy and improve performance.
// Deprecated: use Configure instead.
//
// Deprecated: use Configure() and specify config.Runner instead.
func DisableGopool() error {
return disableGopool()
runner.UseGoRunTask()
return nil
}

// NewEventLoop .
Expand Down

0 comments on commit 0207003

Please sign in to comment.