From 02070032983f6bd593da0f77f3e31210b147b4a3 Mon Sep 17 00:00:00 2001 From: Kyle Xiao Date: Mon, 13 Jan 2025 16:44:34 +0800 Subject: [PATCH] fix: unified runner config and hid gopool (#380) --- connection_onevent.go | 17 ++----------- internal/runner/runner.go | 45 ++++++++++++++++++++++++++++++++++ internal/runner/runner_test.go | 37 ++++++++++++++++++++++++++++ mux/shard_queue.go | 5 ++-- netpoll_unix.go | 15 ++++++++---- 5 files changed, 96 insertions(+), 23 deletions(-) create mode 100644 internal/runner/runner.go create mode 100644 internal/runner/runner_test.go diff --git a/connection_onevent.go b/connection_onevent.go index 8efcd98d..97c4aa64 100644 --- a/connection_onevent.go +++ b/connection_onevent.go @@ -21,22 +21,9 @@ import ( "context" "sync/atomic" - "github.com/bytedance/gopkg/util/gopool" + "github.com/cloudwego/netpoll/internal/runner" ) -var runTask = gopool.CtxGo - -func setRunner(runner func(ctx context.Context, f func())) { - runTask = runner -} - -func disableGopool() error { - runTask = func(ctx context.Context, f func()) { - go f() - } - return nil -} - // ------------------------------------ implement OnPrepare, OnRequest, CloseCallback ------------------------------------ type gracefulExit interface { @@ -272,7 +259,7 @@ func (c *connection) onProcess(onConnect OnConnect, onRequest OnRequest) (proces } // end of task closure func // add new task - runTask(c.ctx, task) + runner.RunTask(c.ctx, task) return true } diff --git a/internal/runner/runner.go b/internal/runner/runner.go new file mode 100644 index 00000000..a12ae686 --- /dev/null +++ b/internal/runner/runner.go @@ -0,0 +1,45 @@ +/* + * Copyright 2025 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package runner + +import ( + "context" + + "github.com/bytedance/gopkg/util/gopool" +) + +// RunTask runs the `f` in background, and `ctx` is optional. +// `ctx` is used to pass to underlying implementation +var RunTask func(ctx context.Context, f func()) + +func goRunTask(ctx context.Context, f func()) { + go f() +} + +func init() { + // TODO(xiaost): Disable gopool by default in the future. + // Once we move to use gopool of cloudwego/gopkg in other repos, + // there should be no reason to continue using bytedance/gopkg version, + // and for most users, using the 'go' keyword directly is more suitable. + RunTask = gopool.CtxGo +} + +// UseGoRunTask updates RunTask with goRunTask which creates +// a new goroutine for the given func, basically `go f()` +func UseGoRunTask() { + RunTask = goRunTask +} diff --git a/internal/runner/runner_test.go b/internal/runner/runner_test.go new file mode 100644 index 00000000..2f952dcb --- /dev/null +++ b/internal/runner/runner_test.go @@ -0,0 +1,37 @@ +/* + * Copyright 2025 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package runner + +import ( + "context" + "sync" + "testing" +) + +func TestRunTask(t *testing.T) { + var wg sync.WaitGroup + wg.Add(2) + ctx := context.Background() + RunTask(ctx, func() { + wg.Done() + }) + UseGoRunTask() + RunTask(ctx, func() { + wg.Done() + }) + wg.Wait() +} diff --git a/mux/shard_queue.go b/mux/shard_queue.go index 364fabae..a225ea87 100644 --- a/mux/shard_queue.go +++ b/mux/shard_queue.go @@ -20,9 +20,8 @@ import ( "sync" "sync/atomic" - "github.com/bytedance/gopkg/util/gopool" - "github.com/cloudwego/netpoll" + "github.com/cloudwego/netpoll/internal/runner" ) /* DOC: @@ -137,7 +136,7 @@ func (q *ShardQueue) foreach() { if atomic.AddInt32(&q.runNum, 1) > 1 { return } - gopool.CtxGo(nil, func() { + runner.RunTask(nil, func() { var negNum int32 // is negative number of triggerNum for triggerNum := atomic.LoadInt32(&q.trigger); triggerNum > 0; { q.r = (q.r + 1) % q.size diff --git a/netpoll_unix.go b/netpoll_unix.go index 5dd1b7fb..0d2b78d9 100644 --- a/netpoll_unix.go +++ b/netpoll_unix.go @@ -25,6 +25,8 @@ import ( "os" "runtime" "sync" + + "github.com/cloudwego/netpoll/internal/runner" ) var ( @@ -52,7 +54,7 @@ func Configure(config Config) (err error) { } if config.Runner != nil { - setRunner(config.Runner) + runner.RunTask = config.Runner } if config.LoggerOutput != nil { logger = log.New(config.LoggerOutput, "", log.LstdFlags) @@ -99,9 +101,10 @@ func SetLoggerOutput(w io.Writer) { } // SetRunner set the runner function for every OnRequest/OnConnect callback -// Deprecated: use Configure instead. +// +// Deprecated: use Configure and specify config.Runner instead. func SetRunner(f func(ctx context.Context, f func())) { - setRunner(f) + runner.RunTask = f } // DisableGopool will remove gopool(the goroutine pool used to run OnRequest), @@ -109,9 +112,11 @@ func SetRunner(f func(ctx context.Context, f func())) { // Usually, OnRequest will cause stack expansion, which can be solved by reusing goroutine. // But if you can confirm that the OnRequest will not cause stack expansion, // it is recommended to use DisableGopool to reduce redundancy and improve performance. -// Deprecated: use Configure instead. +// +// Deprecated: use Configure() and specify config.Runner instead. func DisableGopool() error { - return disableGopool() + runner.UseGoRunTask() + return nil } // NewEventLoop .