From 2e5e1bf75503046ddba301fafde2bb74b4c679d5 Mon Sep 17 00:00:00 2001 From: Eric Daniels Date: Sat, 24 Aug 2024 02:22:19 -0400 Subject: [PATCH] Fix pc.GracefulClose concurrency > 2 --- peerconnection.go | 102 +++++++++++++++++++++++------------ peerconnection_close_test.go | 59 +++++++++++++++++++- 2 files changed, 126 insertions(+), 35 deletions(-) diff --git a/peerconnection.go b/peerconnection.go index 60edfa2ea88..dffe76fad30 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -56,8 +56,9 @@ type PeerConnection struct { idpLoginURL *string isClosed *atomicBool - isGracefulClosed *atomicBool - isGracefulClosedDone chan struct{} + isGracefullyClosingOrClosed bool + isCloseDone chan struct{} + isGracefulCloseDone chan struct{} isNegotiationNeeded *atomicBool updateNegotiationNeededFlagOnEmptyChain *atomicBool @@ -119,8 +120,8 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection, ICECandidatePoolSize: 0, }, isClosed: &atomicBool{}, - isGracefulClosed: &atomicBool{}, - isGracefulClosedDone: make(chan struct{}), + isCloseDone: make(chan struct{}), + isGracefulCloseDone: make(chan struct{}), isNegotiationNeeded: &atomicBool{}, updateNegotiationNeededFlagOnEmptyChain: &atomicBool{}, lastOffer: "", @@ -2111,22 +2112,40 @@ func (pc *PeerConnection) GracefulClose() error { func (pc *PeerConnection) close(shouldGracefullyClose bool) error { // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #1) // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #2) - alreadyGracefullyClosed := shouldGracefullyClose && pc.isGracefulClosed.swap(true) - if pc.isClosed.swap(true) { - if alreadyGracefullyClosed { - // similar but distinct condition where we may be waiting for some - // other graceful close to finish. Incorrectly using isClosed may - // leak a goroutine. - <-pc.isGracefulClosedDone - } - return nil + + pc.mu.Lock() + // A lock in this critical section is needed because pc.isClosed and + // pc.isGracefullyClosingOrClosed are related to each other in that we + // want to make graceful and normal closure one time operations to avoid + // any double closure errors from cropping up. + isAlreadyClosingOrClosed := pc.isClosed.swap(true) + isAlreadyGracefullyClosingOrClosed := pc.isGracefullyClosingOrClosed + if shouldGracefullyClose && !isAlreadyGracefullyClosingOrClosed { + pc.isGracefullyClosingOrClosed = true } - if shouldGracefullyClose && !alreadyGracefullyClosed { - defer close(pc.isGracefulClosedDone) + pc.mu.Unlock() + + if isAlreadyClosingOrClosed { + if !shouldGracefullyClose { + return nil + } + // even if we're already closing, it may not be graceful: + // if we are not closing the graceful channel, we just wait for the close + // to happen and return. + if isAlreadyGracefullyClosingOrClosed { + <-pc.isGracefulCloseDone + return nil + } + // Otherwise we need to go through the graceful flow once the current normal closer + // is done since there are extra steps to take with a graceful close. + <-pc.isCloseDone + } else { + defer close(pc.isCloseDone) } - // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3) - pc.signalingState.Set(SignalingStateClosed) + if shouldGracefullyClose && !isAlreadyGracefullyClosingOrClosed { + defer close(pc.isGracefulCloseDone) + } // Try closing everything and collect the errors // Shutdown strategy: @@ -2136,6 +2155,34 @@ func (pc *PeerConnection) close(shouldGracefullyClose bool) error { // continue the chain the Mux has to be closed. closeErrs := make([]error, 4) + doGracefulCloseOps := func() []error { + if !shouldGracefullyClose { + return nil + } + + // these are all non-canon steps + var gracefulCloseErrors []error + if pc.iceTransport != nil { + gracefulCloseErrors = append(gracefulCloseErrors, pc.iceTransport.GracefulStop()) + } + + pc.ops.GracefulClose() + + pc.sctpTransport.lock.Lock() + for _, d := range pc.sctpTransport.dataChannels { + gracefulCloseErrors = append(gracefulCloseErrors, d.GracefulClose()) + } + pc.sctpTransport.lock.Unlock() + return gracefulCloseErrors + } + + if isAlreadyClosingOrClosed { + return util.FlattenErrs(doGracefulCloseOps()) + } + + // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3) + pc.signalingState.Set(SignalingStateClosed) + closeErrs = append(closeErrs, pc.api.interceptor.Close()) // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #4) @@ -2166,28 +2213,15 @@ func (pc *PeerConnection) close(shouldGracefullyClose bool) error { closeErrs = append(closeErrs, pc.dtlsTransport.Stop()) // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #8, #9, #10) - if pc.iceTransport != nil { - if shouldGracefullyClose { - // note that it isn't canon to stop gracefully - closeErrs = append(closeErrs, pc.iceTransport.GracefulStop()) - } else { - closeErrs = append(closeErrs, pc.iceTransport.Stop()) - } + if pc.iceTransport != nil && !shouldGracefullyClose { + // we will stop gracefully in doGracefulCloseOps + closeErrs = append(closeErrs, pc.iceTransport.Stop()) } // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #11) pc.updateConnectionState(pc.ICEConnectionState(), pc.dtlsTransport.State()) - if shouldGracefullyClose { - pc.ops.GracefulClose() - - // note that it isn't canon to stop gracefully - pc.sctpTransport.lock.Lock() - for _, d := range pc.sctpTransport.dataChannels { - closeErrs = append(closeErrs, d.GracefulClose()) - } - pc.sctpTransport.lock.Unlock() - } + closeErrs = append(closeErrs, doGracefulCloseOps()...) return util.FlattenErrs(closeErrs) } diff --git a/peerconnection_close_test.go b/peerconnection_close_test.go index 100fa85a91f..4db68bb6224 100644 --- a/peerconnection_close_test.go +++ b/peerconnection_close_test.go @@ -7,6 +7,7 @@ package webrtc import ( + "sync" "testing" "time" @@ -180,7 +181,7 @@ func TestPeerConnection_Close_DuringICE(t *testing.T) { } } -func TestPeerConnection_CloseWithIncomingMessages(t *testing.T) { +func TestPeerConnection_GracefulCloseWithIncomingMessages(t *testing.T) { // Limit runtime in case of deadlocks lim := test.TimeOut(time.Second * 20) defer lim.Stop() @@ -287,3 +288,59 @@ func TestPeerConnection_GracefulCloseWhileOpening(t *testing.T) { t.Fatal(err) } } + +const gracefulCloseConcurrency = 50 + +func TestPeerConnection_GracefulCloseConcurrent(t *testing.T) { + // Limit runtime in case of deadlocks + lim := test.TimeOut(time.Second * 5) + defer lim.Stop() + + report := test.CheckRoutinesStrict(t) + defer report() + + pc, err := NewPeerConnection(Configuration{}) + if err != nil { + t.Fatal(err) + } + + var wg sync.WaitGroup + wg.Add(gracefulCloseConcurrency) + for i := 0; i < gracefulCloseConcurrency; i++ { + go func() { + defer wg.Done() + assert.NoError(t, pc.GracefulClose()) + }() + } + if err := pc.GracefulClose(); err != nil { + t.Fatal(err) + } + wg.Wait() +} + +func TestPeerConnection_GracefulAndNormalCloseConcurrent(t *testing.T) { + // Limit runtime in case of deadlocks + lim := test.TimeOut(time.Second * 5) + defer lim.Stop() + + report := test.CheckRoutinesStrict(t) + defer report() + + pc, err := NewPeerConnection(Configuration{}) + if err != nil { + t.Fatal(err) + } + + var wg sync.WaitGroup + wg.Add(gracefulCloseConcurrency) + for i := 0; i < gracefulCloseConcurrency; i++ { + go func() { + defer wg.Done() + assert.NoError(t, pc.GracefulClose()) + }() + } + if err := pc.Close(); err != nil { + t.Fatal(err) + } + wg.Wait() +}