Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrent pc.GracefulClose #2887

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 72 additions & 34 deletions peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ type PeerConnection struct {
idpLoginURL *string

isClosed *atomicBool
isGracefulClosed *atomicBool
isGracefulClosedDone chan struct{}
isGracefullyClosingOrClosed bool
isCloseDone chan struct{}
isGracefulCloseDone chan struct{}
isNegotiationNeeded *atomicBool
updateNegotiationNeededFlagOnEmptyChain *atomicBool

Expand Down Expand Up @@ -119,8 +120,8 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection,
ICECandidatePoolSize: 0,
},
isClosed: &atomicBool{},
isGracefulClosed: &atomicBool{},
isGracefulClosedDone: make(chan struct{}),
isCloseDone: make(chan struct{}),
isGracefulCloseDone: make(chan struct{}),
isNegotiationNeeded: &atomicBool{},
updateNegotiationNeededFlagOnEmptyChain: &atomicBool{},
lastOffer: "",
Expand Down Expand Up @@ -2111,22 +2112,44 @@ func (pc *PeerConnection) GracefulClose() error {
func (pc *PeerConnection) close(shouldGracefullyClose bool) error {
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #1)
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #2)
alreadyGracefullyClosed := shouldGracefullyClose && pc.isGracefulClosed.swap(true)
if pc.isClosed.swap(true) {
if alreadyGracefullyClosed {
// similar but distinct condition where we may be waiting for some
// other graceful close to finish. Incorrectly using isClosed may
// leak a goroutine.
<-pc.isGracefulClosedDone
}
return nil

pc.mu.Lock()
// A lock in this critical section is needed because pc.isClosed and
// pc.isGracefullyClosingOrClosed are related to each other in that we
// want to make graceful and normal closure one time operations in order
// to avoid any double closure errors from cropping up. However, there are
// some overlapping close cases when both normal and graceful close are used
// that should be idempotent, but be cautioned when writing new close behavior

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came into this function thinking that it wouldn't make sense for applications to make concurrent graceful + non-graceful close calls. I think it would be fine to declare that usage undefined.

But allowing for both is obviously better if the idempotency requirement is not burdensome.

// to preserve this property.
isAlreadyClosingOrClosed := pc.isClosed.swap(true)
isAlreadyGracefullyClosingOrClosed := pc.isGracefullyClosingOrClosed
if shouldGracefullyClose && !isAlreadyGracefullyClosingOrClosed {
pc.isGracefullyClosingOrClosed = true
}
if shouldGracefullyClose && !alreadyGracefullyClosed {
defer close(pc.isGracefulClosedDone)
pc.mu.Unlock()

if isAlreadyClosingOrClosed {
if !shouldGracefullyClose {
return nil
}
// Even if we're already closing, it may not be graceful:
// If we are not the ones doing the closing, we just wait for the graceful close
// to happen and then return.
if isAlreadyGracefullyClosingOrClosed {
<-pc.isGracefulCloseDone
return nil
}
// Otherwise we need to go through the graceful closure flow once the
// normal closure is done since there are extra steps to take with a
// graceful close.
<-pc.isCloseDone
} else {
defer close(pc.isCloseDone)
}

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3)
pc.signalingState.Set(SignalingStateClosed)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've double checked that the place this line moved to was equivalent.

I suppose it must be the case that once someone calls close and moving the signaling state into SignalingStateClosed-- it's impossible for the signaling state to leave SignalingStateClosed?

Given the old + new code for Close do nothing on subsequent calls.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So nothings prevents that state from moving out of Closed, but yeah, the logic in here doesn't have move to anything else other than Closed.

if shouldGracefullyClose {
defer close(pc.isGracefulCloseDone)
}

// Try closing everything and collect the errors
// Shutdown strategy:
Expand All @@ -2136,6 +2159,34 @@ func (pc *PeerConnection) close(shouldGracefullyClose bool) error {
// continue the chain the Mux has to be closed.
closeErrs := make([]error, 4)

doGracefulCloseOps := func() []error {
if !shouldGracefullyClose {
return nil
}

// these are all non-canon steps
var gracefulCloseErrors []error
if pc.iceTransport != nil {
gracefulCloseErrors = append(gracefulCloseErrors, pc.iceTransport.GracefulStop())
}

pc.ops.GracefulClose()

pc.sctpTransport.lock.Lock()
for _, d := range pc.sctpTransport.dataChannels {
gracefulCloseErrors = append(gracefulCloseErrors, d.GracefulClose())
}
pc.sctpTransport.lock.Unlock()
return gracefulCloseErrors
}

if isAlreadyClosingOrClosed {
return util.FlattenErrs(doGracefulCloseOps())
}

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3)
pc.signalingState.Set(SignalingStateClosed)

closeErrs = append(closeErrs, pc.api.interceptor.Close())

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #4)
Expand Down Expand Up @@ -2166,28 +2217,15 @@ func (pc *PeerConnection) close(shouldGracefullyClose bool) error {
closeErrs = append(closeErrs, pc.dtlsTransport.Stop())

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #8, #9, #10)
if pc.iceTransport != nil {
if shouldGracefullyClose {
// note that it isn't canon to stop gracefully
closeErrs = append(closeErrs, pc.iceTransport.GracefulStop())
} else {
closeErrs = append(closeErrs, pc.iceTransport.Stop())
}
if pc.iceTransport != nil && !shouldGracefullyClose {
// we will stop gracefully in doGracefulCloseOps
closeErrs = append(closeErrs, pc.iceTransport.Stop())
}

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #11)
pc.updateConnectionState(pc.ICEConnectionState(), pc.dtlsTransport.State())

if shouldGracefullyClose {
pc.ops.GracefulClose()

// note that it isn't canon to stop gracefully
pc.sctpTransport.lock.Lock()
for _, d := range pc.sctpTransport.dataChannels {
closeErrs = append(closeErrs, d.GracefulClose())
}
pc.sctpTransport.lock.Unlock()
}
closeErrs = append(closeErrs, doGracefulCloseOps()...)

return util.FlattenErrs(closeErrs)
}
Expand Down
42 changes: 41 additions & 1 deletion peerconnection_close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package webrtc

import (
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -180,7 +182,7 @@ func TestPeerConnection_Close_DuringICE(t *testing.T) {
}
}

func TestPeerConnection_CloseWithIncomingMessages(t *testing.T) {
func TestPeerConnection_GracefulCloseWithIncomingMessages(t *testing.T) {
// Limit runtime in case of deadlocks
lim := test.TimeOut(time.Second * 20)
defer lim.Stop()
Expand Down Expand Up @@ -287,3 +289,41 @@ func TestPeerConnection_GracefulCloseWhileOpening(t *testing.T) {
t.Fatal(err)
}
}

func TestPeerConnection_GracefulCloseConcurrent(t *testing.T) {
// Limit runtime in case of deadlocks
lim := test.TimeOut(time.Second * 10)
defer lim.Stop()

for _, mixed := range []bool{false, true} {
t.Run(fmt.Sprintf("mixed_graceful=%t", mixed), func(t *testing.T) {
report := test.CheckRoutinesStrict(t)
defer report()

pc, err := NewPeerConnection(Configuration{})
if err != nil {
t.Fatal(err)
}

const gracefulCloseConcurrency = 50
var wg sync.WaitGroup
wg.Add(gracefulCloseConcurrency)
for i := 0; i < gracefulCloseConcurrency; i++ {
go func() {
defer wg.Done()
assert.NoError(t, pc.GracefulClose())
}()
}
if !mixed {
if err := pc.Close(); err != nil {
t.Fatal(err)
}
} else {
if err := pc.GracefulClose(); err != nil {
t.Fatal(err)
}
}
wg.Wait()
})
}
}
Loading