Skip to content

Commit

Permalink
(update): Use sync.atomic to replace some sync.RWMutex and `sync.…
Browse files Browse the repository at this point in the history
…Mutex`.
  • Loading branch information
zishang520 committed May 29, 2024
1 parent f195a05 commit ccdbfbe
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 320 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
client-dist/* linguist-generated
22 changes: 11 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ go 1.22.2
require (
github.com/andybalholm/brotli v1.1.0
github.com/mitchellh/mapstructure v1.5.0
github.com/zishang520/engine.io-go-parser v1.2.4
github.com/zishang520/engine.io/v2 v2.0.8
github.com/zishang520/socket.io-go-parser/v2 v2.0.7
github.com/zishang520/engine.io-go-parser v1.2.5
github.com/zishang520/engine.io/v2 v2.1.0
github.com/zishang520/socket.io-go-parser/v2 v2.1.0
)

require (
Expand All @@ -17,17 +17,17 @@ require (
github.com/gorilla/websocket v1.5.1 // indirect
github.com/onsi/ginkgo/v2 v2.12.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/quic-go v0.43.0 // indirect
github.com/quic-go/quic-go v0.44.0 // indirect
github.com/quic-go/webtransport-go v0.8.0 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect
go.uber.org/mock v0.4.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/tools v0.21.0 // indirect
)
50 changes: 26 additions & 24 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEe
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20230821062121-407c9e7a662f h1:pDhu5sgp8yJlEF/g6osliIIpF9K4F5jvkULXa4daRDQ=
github.com/google/pprof v0.0.0-20230821062121-407c9e7a662f/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik=
github.com/gookit/color v1.5.4 h1:FZmqs7XOyGgCAxmWyPslpiok1k05wmY3SJTytgvYFs0=
Expand All @@ -29,8 +29,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
github.com/quic-go/quic-go v0.43.0 h1:sjtsTKWX0dsHpuMJvLxGqoQdtgJnbAPWY+W+5vjYW/g=
github.com/quic-go/quic-go v0.43.0/go.mod h1:132kz4kL3F9vxhW3CtQJLDVwcFe5wdWeJXXijhsO57M=
github.com/quic-go/quic-go v0.44.0 h1:So5wOr7jyO4vzL2sd8/pD9Kesciv91zSk8BoFngItQ0=
github.com/quic-go/quic-go v0.44.0/go.mod h1:z4cx/9Ny9UtGITIPzmPTXh1ULfOyWh4qGQlpnPcWmek=
github.com/quic-go/webtransport-go v0.8.0 h1:HxSrwun11U+LlmwpgM1kEqIqH90IT4N8auv/cD7QFJg=
github.com/quic-go/webtransport-go v0.8.0/go.mod h1:N99tjprW432Ut5ONql/aUhSLT0YVSlwHohQsuac9WaM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand All @@ -43,30 +43,32 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 h1:QldyIu/L63oPpyvQmHgvgickp1Yw510KJOqX7H24mg8=
github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778/go.mod h1:2MuV+tbUrU1zIOPMxZ5EncGwgmMJsa+9ucAQZXxsObs=
github.com/zishang520/engine.io-go-parser v1.2.4 h1:37h7Mt3Msc3aqub6hl+mYlG6wB81O4zcynrZQIjG41s=
github.com/zishang520/engine.io-go-parser v1.2.4/go.mod h1:G1DciRIGH4/S7x01DIdZQaXrk09ZeRgEw5e/Z9ms4Is=
github.com/zishang520/engine.io/v2 v2.0.8 h1:84rkbpWPzblAMj62uYsaD+XuZQTJTempSTCaxzemNSA=
github.com/zishang520/engine.io/v2 v2.0.8/go.mod h1:z9wFZLzqW1ykzWA84jt//1x0dQjMSim1G3SzIPovdHw=
github.com/zishang520/socket.io-go-parser/v2 v2.0.7 h1:Pcv668c8PYhyeQpaw5/MqV+D9x4p01p5K9ygSYOnYp8=
github.com/zishang520/socket.io-go-parser/v2 v2.0.7/go.mod h1:O/6sR1SjIm8bZvMS3GqwT29TvxdxGYvugvBbRA+a/Zg=
github.com/zishang520/engine.io-go-parser v1.2.5 h1:Disf4rvNQzDsgoC+3yuwuFx5A7JNWlPp+QLUW32WDtc=
github.com/zishang520/engine.io-go-parser v1.2.5/go.mod h1:G1DciRIGH4/S7x01DIdZQaXrk09ZeRgEw5e/Z9ms4Is=
github.com/zishang520/engine.io/v2 v2.1.0 h1:dh3O7OcAfqfhg7AhqlqPRM/6pfdAcoRlEmNbe2wv8qE=
github.com/zishang520/engine.io/v2 v2.1.0/go.mod h1:FnXtT+k/6g2uOb9MpqY71DhV7COwlCH5DCbczn6Q3K8=
github.com/zishang520/socket.io-go-parser/v2 v2.1.0 h1:YaTul861UxdTtq/v7XKmF52gWmDOqwugKBlFyiifKCE=
github.com/zishang520/socket.io-go-parser/v2 v2.1.0/go.mod h1:zmToGML+lCjSjyGZMuVtnvgnFOnDuAxJZKwfDDDHiqI=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 h1:m64FZMko/V45gv0bNmrNYoDEq8U5YUhetc9cBWKS1TQ=
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63/go.mod h1:0v4NqG35kSWCMzLaMeX+IQrlSnVE/bqGSyC2cz/9Le8=
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM=
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc=
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 h1:Vve/L0v7CXXuxUmaMGIEK/dEeq7uiqb5qBgQrZzIE7E=
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846/go.mod h1:Sc0INKfu04TlqNoRA1hgpFZbhYXHPr4V5DzpSBTPqQM=
golang.org/x/tools v0.21.0 h1:qc0xYgIbsSDt9EyWz05J5wfa7LOVW0YTLOXrqdLAWIw=
golang.org/x/tools v0.21.0/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
6 changes: 3 additions & 3 deletions socket/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,18 +187,18 @@ func (a *adapter) BroadcastWithAck(packet *parser.Packet, opts *BroadcastOptions
id := a.nsp.Ids()
packet.Id = &id
encodedPackets := a._encode(packet, packetOpts)
clientCount := uint64(0)
var clientCount atomic.Uint64
a.apply(opts, func(socket *Socket) {
// track the total number of acknowledgements that are expected
atomic.AddUint64(&clientCount, 1)
clientCount.Add(1)
// call the ack callback for each client response
socket.Acks().Store(*packet.Id, ack)
if notifyOutgoingListeners := socket.NotifyOutgoingListeners(); notifyOutgoingListeners != nil {
notifyOutgoingListeners(packet)
}
socket.Client().WriteToEngine(encodedPackets, packetOpts)
})
clientCountCallback(atomic.LoadUint64(&clientCount))
clientCountCallback(clientCount.Load())
}

func (a *adapter) _encode(packet *parser.Packet, packetOpts *WriteOptions) []_types.BufferInterface {
Expand Down
35 changes: 13 additions & 22 deletions socket/broadcast-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package socket
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -200,41 +199,35 @@ func (b *BroadcastOperator) Emit(ev string, args ...any) error {

packet.Data = data[:data_len-1]

timedOut := uint32(0)
responses := []any{}
var responsesMu sync.RWMutex
var timedOut atomic.Bool
responses := types.NewSlice[any]()
var timeout time.Duration

if time := b.flags.Timeout; time != nil {
timeout = *time
}

timer := utils.SetTimeout(func() {
atomic.StoreUint32(&timedOut, 1)
timedOut.Store(true)
if b.flags.ExpectSingleResponse {
ack(nil, errors.New("operation has timed out"))
} else {
responsesMu.RLock()
defer responsesMu.RUnlock()

ack(responses, errors.New("operation has timed out"))
ack(responses.All(), errors.New("operation has timed out"))
}
}, timeout)

expectedServerCount := int64(-1)
actualServerCount := int64(0)
expectedClientCount := uint64(0)
var actualServerCount atomic.Int64
var expectedClientCount atomic.Uint64

checkCompleteness := func() {
responsesMu.RLock()
defer responsesMu.RUnlock()

if 0 == atomic.LoadUint32(&timedOut) && expectedServerCount == atomic.LoadInt64(&actualServerCount) && uint64(len(responses)) == atomic.LoadUint64(&expectedClientCount) {
if !timedOut.Load() && expectedServerCount == actualServerCount.Load() && uint64(responses.Len()) == expectedClientCount.Load() {
utils.ClearTimeout(timer)
if b.flags.ExpectSingleResponse {
ack(responses[0].([]any), nil)
data, _ := responses.Get(0)
ack(data.([]any), nil)
} else {
ack(responses, nil)
ack(responses.All(), nil)
}
}
}
Expand All @@ -245,14 +238,12 @@ func (b *BroadcastOperator) Emit(ev string, args ...any) error {
Flags: b.flags,
}, func(clientCount uint64) {
// each Socket.IO server in the cluster sends the number of clients that were notified
atomic.AddUint64(&expectedClientCount, clientCount)
atomic.AddInt64(&actualServerCount, 1)
expectedClientCount.Add(clientCount)
actualServerCount.Add(1)
checkCompleteness()
}, func(clientResponse []any, _ error) {
// each client sends an acknowledgement
responsesMu.Lock()
responses = append(responses, clientResponse...)
responsesMu.Unlock()
responses.Push(clientResponse...)
checkCompleteness()
})
expectedServerCount = b.adapter.ServerCount()
Expand Down
40 changes: 16 additions & 24 deletions socket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package socket

import (
"net/url"
"sync"
"sync/atomic"

_types "github.com/zishang520/engine.io-go-parser/types"
"github.com/zishang520/engine.io/v2/engine"
Expand All @@ -17,14 +17,13 @@ var client_log = log.NewLog("socket.io:client")
type Client struct {
conn engine.Socket

id string
server *Server
encoder parser.Encoder
decoder parser.Decoder
sockets *types.Map[SocketId, *Socket]
nsps *types.Map[string, *Socket]
connectTimeout *utils.Timer
connectTimeout_mu sync.Mutex
id string
server *Server
encoder parser.Encoder
decoder parser.Decoder
sockets *types.Map[SocketId, *Socket]
nsps *types.Map[string, *Socket]
connectTimeout atomic.Pointer[utils.Timer]
}

func MakeClient() *Client {
Expand Down Expand Up @@ -74,17 +73,14 @@ func (c *Client) setup() {
c.conn.On("error", c.onerror)
c.conn.On("close", c.onclose)

c.connectTimeout_mu.Lock()
defer c.connectTimeout_mu.Unlock()

c.connectTimeout = utils.SetTimeout(func() {
c.connectTimeout.Store(utils.SetTimeout(func() {
if c.nsps.Len() == 0 {
client_log.Debug("no namespace joined yet, close the client")
c.close()
} else {
client_log.Debug("the client has already joined a namespace, nothing to do")
}
}, c.server._connectTimeout)
}, c.server._connectTimeout))
}

// Connects a client to a namespace.
Expand Down Expand Up @@ -124,11 +120,9 @@ func (c *Client) doConnect(name string, auth any) {
nsp.Add(c, auth, func(socket *Socket) {
c.sockets.Store(socket.Id(), socket)
c.nsps.Store(nsp.Name(), socket)
c.connectTimeout_mu.Lock()
defer c.connectTimeout_mu.Unlock()
if c.connectTimeout != nil {
utils.ClearTimeout(c.connectTimeout)
c.connectTimeout = nil
if connectTimeout := c.connectTimeout.Load(); connectTimeout != nil {
utils.ClearTimeout(connectTimeout)
c.connectTimeout.Store(nil)
}
})
}
Expand Down Expand Up @@ -259,10 +253,8 @@ func (c *Client) destroy() {
c.conn.RemoveListener("close", c.onclose)
c.decoder.RemoveListener("decoded", c.ondecoded)

c.connectTimeout_mu.Lock()
defer c.connectTimeout_mu.Unlock()
if c.connectTimeout != nil {
utils.ClearTimeout(c.connectTimeout)
c.connectTimeout = nil
if connectTimeout := c.connectTimeout.Load(); connectTimeout != nil {
utils.ClearTimeout(connectTimeout)
c.connectTimeout.Store(nil)
}
}
36 changes: 6 additions & 30 deletions socket/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package socket
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -68,9 +67,7 @@ var (
// // ensure the socket has access to the "users" namespace
// })
type Namespace struct {
// _ids has to be first in the struct to guarantee alignment for atomic
// operations. http://golang.org/pkg/sync/atomic/#pkg-note-BUG
_ids uint64
_ids atomic.Uint64

*StrictEventEmitter

Expand All @@ -89,8 +86,7 @@ type Namespace struct {

server *Server

_fns []func(*Socket, func(*ExtendedError))
_fns_mu sync.RWMutex
_fns *types.Slice[func(*Socket, func(*ExtendedError))]

_remove func(socket *Socket)
}
Expand All @@ -100,8 +96,7 @@ func MakeNamespace() *Namespace {
StrictEventEmitter: NewStrictEventEmitter(),

sockets: &types.Map[SocketId, *Socket]{},
_fns: []func(*Socket, func(*ExtendedError)){},
_ids: 0,
_fns: types.NewSlice[func(*Socket, func(*ExtendedError))](),
}

n._remove = n.namespace_remove
Expand Down Expand Up @@ -147,20 +142,7 @@ func (n *Namespace) Name() string {
}

func (n *Namespace) Ids() uint64 {
return atomic.AddUint64(&n._ids, 1)
}

func (n *Namespace) fns() []func(*Socket, func(*ExtendedError)) {
n._fns_mu.RLock()
defer n._fns_mu.RUnlock()

return n._fns
}
func (n *Namespace) useFns(_fns []func(*Socket, func(*ExtendedError))) {
n._fns_mu.Lock()
defer n._fns_mu.Unlock()

n._fns = _fns
return n._ids.Add(1)
}

func (n *Namespace) Construct(server *Server, name string) {
Expand Down Expand Up @@ -189,10 +171,7 @@ func (n *Namespace) InitAdapter() {
//
// Param: func(*ExtendedError) - the middleware function
func (n *Namespace) Use(fn func(*Socket, func(*ExtendedError))) NamespaceInterface {
n._fns_mu.Lock()
defer n._fns_mu.Unlock()

n._fns = append(n._fns, fn)
n._fns.Push(fn)
return n
}

Expand All @@ -202,10 +181,7 @@ func (n *Namespace) Use(fn func(*Socket, func(*ExtendedError))) NamespaceInterfa
//
// Param: fn - last fn call in the middleware
func (n *Namespace) run(socket *Socket, fn func(err *ExtendedError)) {
n._fns_mu.RLock()
fns := make([]func(*Socket, func(*ExtendedError)), len(n._fns))
copy(fns, n._fns)
n._fns_mu.RUnlock()
fns := n._fns.All()
if length := len(fns); length > 0 {
var run func(i int)
run = func(i int) {
Expand Down
Loading

0 comments on commit ccdbfbe

Please sign in to comment.