Skip to content

Commit

Permalink
Fix pc.GracefulClose concurrency > 2
Browse files Browse the repository at this point in the history
  • Loading branch information
edaniels committed Aug 24, 2024
1 parent 64a837f commit 6353fee
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 44 deletions.
113 changes: 70 additions & 43 deletions peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ type PeerConnection struct {
idpLoginURL *string

isClosed *atomicBool
isGracefulClosed *atomicBool
isGracefulClosedDone chan struct{}
isGracefullyClosingOrClosed bool
isCloseDone chan struct{}
isGracefulCloseDone chan struct{}
isNegotiationNeeded *atomicBool
updateNegotiationNeededFlagOnEmptyChain *atomicBool

Expand Down Expand Up @@ -119,8 +120,8 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection,
ICECandidatePoolSize: 0,
},
isClosed: &atomicBool{},
isGracefulClosed: &atomicBool{},
isGracefulClosedDone: make(chan struct{}),
isCloseDone: make(chan struct{}),
isGracefulCloseDone: make(chan struct{}),
isNegotiationNeeded: &atomicBool{},
updateNegotiationNeededFlagOnEmptyChain: &atomicBool{},
lastOffer: "",
Expand Down Expand Up @@ -2111,22 +2112,41 @@ func (pc *PeerConnection) GracefulClose() error {
func (pc *PeerConnection) close(shouldGracefullyClose bool) error {

Check failure on line 2112 in peerconnection.go

View workflow job for this annotation

GitHub Actions / lint / Go

cognitive complexity 32 of func `(*PeerConnection).close` is high (> 30) (gocognit)
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #1)
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #2)
alreadyGracefullyClosed := shouldGracefullyClose && pc.isGracefulClosed.swap(true)
if pc.isClosed.swap(true) {
if alreadyGracefullyClosed {
// similar but distinct condition where we may be waiting for some
// other graceful close to finish. Incorrectly using isClosed may
// leak a goroutine.
<-pc.isGracefulClosedDone
}
return nil

pc.mu.Lock()
// A lock in this critical section is needed because pc.isClosed and
// pc.isGracefullyClosingOrClosed are related to each other in that we
// want to make graceful and normal closure one time operations to avoid
// any double closure errors from cropping up.
isAlreadyClosingOrClosed := pc.isClosed.swap(true)
isAlreadyGracefullyClosingOrClosed := pc.isGracefullyClosingOrClosed
if shouldGracefullyClose && !isAlreadyGracefullyClosingOrClosed {
pc.isGracefullyClosingOrClosed = true
}
if shouldGracefullyClose && !alreadyGracefullyClosed {
defer close(pc.isGracefulClosedDone)
pc.mu.Unlock()

if isAlreadyClosingOrClosed {
if !shouldGracefullyClose {
return nil
}
// even if we're already closing, it may not be graceful:
// if we are not closing the graceful channel, we just wait for the close
// to happen and return.
if isAlreadyGracefullyClosingOrClosed {
<-pc.isGracefulCloseDone
return nil
}
// Otherwise we need to go through the graceful flow once the current normal closer
// is done since there are extra steps to take with a graceful close.
<-pc.isCloseDone
}

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3)
pc.signalingState.Set(SignalingStateClosed)
if !isAlreadyClosingOrClosed {
defer close(pc.isCloseDone)
}
if shouldGracefullyClose && !isAlreadyGracefullyClosingOrClosed {
defer close(pc.isGracefulCloseDone)
}

// Try closing everything and collect the errors
// Shutdown strategy:
Expand All @@ -2136,47 +2156,54 @@ func (pc *PeerConnection) close(shouldGracefullyClose bool) error {
// continue the chain the Mux has to be closed.
closeErrs := make([]error, 4)

closeErrs = append(closeErrs, pc.api.interceptor.Close())
if !isAlreadyClosingOrClosed {
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3)
pc.signalingState.Set(SignalingStateClosed)

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #4)
pc.mu.Lock()
for _, t := range pc.rtpTransceivers {
if !t.stopped {
closeErrs = append(closeErrs, t.Stop())
closeErrs = append(closeErrs, pc.api.interceptor.Close())

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #4)
pc.mu.Lock()
for _, t := range pc.rtpTransceivers {
if !t.stopped {
closeErrs = append(closeErrs, t.Stop())
}
}
}
if nonMediaBandwidthProbe, ok := pc.nonMediaBandwidthProbe.Load().(*RTPReceiver); ok {
closeErrs = append(closeErrs, nonMediaBandwidthProbe.Stop())
}
pc.mu.Unlock()
if nonMediaBandwidthProbe, ok := pc.nonMediaBandwidthProbe.Load().(*RTPReceiver); ok {
closeErrs = append(closeErrs, nonMediaBandwidthProbe.Stop())
}
pc.mu.Unlock()

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #5)
pc.sctpTransport.lock.Lock()
for _, d := range pc.sctpTransport.dataChannels {
d.setReadyState(DataChannelStateClosed)
}
pc.sctpTransport.lock.Unlock()
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #5)
pc.sctpTransport.lock.Lock()
for _, d := range pc.sctpTransport.dataChannels {
d.setReadyState(DataChannelStateClosed)
}
pc.sctpTransport.lock.Unlock()

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #6)
if pc.sctpTransport != nil {
closeErrs = append(closeErrs, pc.sctpTransport.Stop())
}
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #6)
if pc.sctpTransport != nil {
closeErrs = append(closeErrs, pc.sctpTransport.Stop())
}

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #7)
closeErrs = append(closeErrs, pc.dtlsTransport.Stop())
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #7)
closeErrs = append(closeErrs, pc.dtlsTransport.Stop())
}

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #8, #9, #10)
if pc.iceTransport != nil {
if shouldGracefullyClose {
// note that it isn't canon to stop gracefully
closeErrs = append(closeErrs, pc.iceTransport.GracefulStop())
} else {
} else if !isAlreadyClosingOrClosed {
closeErrs = append(closeErrs, pc.iceTransport.Stop())
}
}

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #11)
pc.updateConnectionState(pc.ICEConnectionState(), pc.dtlsTransport.State())
if !isAlreadyClosingOrClosed {
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #11)
pc.updateConnectionState(pc.ICEConnectionState(), pc.dtlsTransport.State())
}

if shouldGracefullyClose {
pc.ops.GracefulClose()
Expand Down
42 changes: 41 additions & 1 deletion peerconnection_close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestPeerConnection_Close_DuringICE(t *testing.T) {
}
}

func TestPeerConnection_CloseWithIncomingMessages(t *testing.T) {
func TestPeerConnection_GracefulCloseWithIncomingMessages(t *testing.T) {
// Limit runtime in case of deadlocks
lim := test.TimeOut(time.Second * 20)
defer lim.Stop()
Expand Down Expand Up @@ -287,3 +287,43 @@ func TestPeerConnection_GracefulCloseWhileOpening(t *testing.T) {
t.Fatal(err)
}
}

func TestPeerConnection_GracefulCloseConcurrent(t *testing.T) {
// Limit runtime in case of deadlocks
lim := test.TimeOut(time.Second * 5)
defer lim.Stop()

report := test.CheckRoutinesStrict(t)
defer report()

pc, err := NewPeerConnection(Configuration{})
if err != nil {
t.Fatal(err)
}

go pc.GracefulClose()

Check failure on line 304 in peerconnection_close_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

Error return value of `pc.GracefulClose` is not checked (errcheck)
go pc.GracefulClose()

Check failure on line 305 in peerconnection_close_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

Error return value of `pc.GracefulClose` is not checked (errcheck)
if err := pc.GracefulClose(); err != nil {
t.Fatal(err)
}
}

func TestPeerConnection_GracefulAndNormalCloseConcurrent(t *testing.T) {
// Limit runtime in case of deadlocks
lim := test.TimeOut(time.Second * 5)
defer lim.Stop()

report := test.CheckRoutinesStrict(t)
defer report()

pc, err := NewPeerConnection(Configuration{})
if err != nil {
t.Fatal(err)
}

go pc.GracefulClose()

Check failure on line 324 in peerconnection_close_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

Error return value of `pc.GracefulClose` is not checked (errcheck)
go pc.GracefulClose()
if err := pc.Close(); err != nil {
t.Fatal(err)
}
}

0 comments on commit 6353fee

Please sign in to comment.