Skip to content

Commit

Permalink
Merge pull request #860 from strukturag/unpublish-remote
Browse files Browse the repository at this point in the history
Notify remote to stop publishing when last local subscriber is closed.
  • Loading branch information
fancycode authored Nov 11, 2024
2 parents b7c40d1 + 71ceadb commit 8a53bab
Show file tree
Hide file tree
Showing 12 changed files with 1,624 additions and 20 deletions.
25 changes: 20 additions & 5 deletions janus_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,17 @@ type dummyGatewayListener struct {
func (l *dummyGatewayListener) ConnectionInterrupted() {
}

type JanusGatewayInterface interface {
Info(context.Context) (*InfoMsg, error)
Create(context.Context) (*JanusSession, error)
Close() error

send(map[string]interface{}, *transaction) (uint64, error)
removeTransaction(uint64)

removeSession(*JanusSession)
}

// Gateway represents a connection to an instance of the Janus Gateway.
type JanusGateway struct {
listener GatewayListener
Expand Down Expand Up @@ -560,12 +571,18 @@ func (gateway *JanusGateway) Create(ctx context.Context) (*JanusSession, error)

// Store this session
gateway.Lock()
defer gateway.Unlock()
gateway.Sessions[session.Id] = session
gateway.Unlock()

return session, nil
}

func (gateway *JanusGateway) removeSession(session *JanusSession) {
gateway.Lock()
defer gateway.Unlock()
delete(gateway.Sessions, session.Id)
}

// Session represents a session instance on the Janus Gateway.
type JanusSession struct {
// Id is the session_id of this session
Expand All @@ -578,7 +595,7 @@ type JanusSession struct {
// and Session.Unlock() methods provided by the embedded sync.Mutex.
sync.Mutex

gateway *JanusGateway
gateway JanusGatewayInterface
}

func (session *JanusSession) send(msg map[string]interface{}, t *transaction) (uint64, error) {
Expand Down Expand Up @@ -670,9 +687,7 @@ func (session *JanusSession) Destroy(ctx context.Context) (*janus.AckMsg, error)
}

// Remove this session from the gateway
session.gateway.Lock()
delete(session.gateway.Sessions, session.Id)
session.gateway.Unlock()
session.gateway.removeSession(session)

return ack, nil
}
Expand Down
3 changes: 2 additions & 1 deletion mcu_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ type RemotePublisherController interface {
PublisherId() string

StartPublishing(ctx context.Context, publisher McuRemotePublisherProperties) error
StopPublishing(ctx context.Context, publisher McuRemotePublisherProperties) error
GetStreams(ctx context.Context) ([]PublisherStream, error)
}

Expand Down Expand Up @@ -214,7 +215,7 @@ type McuPublisher interface {

GetStreams(ctx context.Context) ([]PublisherStream, error)
PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error
UnpublishRemote(ctx context.Context, remoteId string) error
UnpublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error
}

type McuSubscriber interface {
Expand Down
29 changes: 24 additions & 5 deletions mcu_janus.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func convertIntValue(value interface{}) (uint64, error) {
return uint64(t), nil
case uint64:
return t, nil
case int:
if t < 0 {
return 0, fmt.Errorf("Unsupported int number: %+v", t)
}
return uint64(t), nil
case int64:
if t < 0 {
return 0, fmt.Errorf("Unsupported int64 number: %+v", t)
Expand All @@ -92,7 +97,7 @@ func convertIntValue(value interface{}) (uint64, error) {
}
return uint64(r), nil
default:
return 0, fmt.Errorf("Unknown number type: %+v", t)
return 0, fmt.Errorf("Unknown number type: %+v (%T)", t, t)
}
}

Expand Down Expand Up @@ -170,7 +175,9 @@ type mcuJanus struct {

settings McuSettings

gw *JanusGateway
createJanusGateway func(ctx context.Context, wsURL string, listener GatewayListener) (JanusGatewayInterface, error)

gw JanusGatewayInterface
session *JanusSession
handle *JanusHandle

Expand Down Expand Up @@ -213,6 +220,9 @@ func NewMcuJanus(ctx context.Context, url string, config *goconf.ConfigFile) (Mc
publishers: make(map[string]*mcuJanusPublisher),
remotePublishers: make(map[string]*mcuJanusRemotePublisher),

createJanusGateway: func(ctx context.Context, wsURL string, listener GatewayListener) (JanusGatewayInterface, error) {
return NewJanusGateway(ctx, wsURL, listener)
},
reconnectInterval: initialReconnectInterval,
}
mcu.onConnected.Store(emptyOnConnected)
Expand All @@ -222,8 +232,10 @@ func NewMcuJanus(ctx context.Context, url string, config *goconf.ConfigFile) (Mc
mcu.doReconnect(context.Background())
})
mcu.reconnectTimer.Stop()
if err := mcu.reconnect(ctx); err != nil {
return nil, err
if mcu.url != "" {
if err := mcu.reconnect(ctx); err != nil {
return nil, err
}
}
return mcu, nil
}
Expand Down Expand Up @@ -252,7 +264,7 @@ func (m *mcuJanus) disconnect() {

func (m *mcuJanus) reconnect(ctx context.Context) error {
m.disconnect()
gw, err := NewJanusGateway(ctx, m.url, m)
gw, err := m.createJanusGateway(ctx, m.url, m)
if err != nil {
return err
}
Expand Down Expand Up @@ -317,6 +329,11 @@ func (m *mcuJanus) hasRemotePublisher() bool {
}

func (m *mcuJanus) Start(ctx context.Context) error {
if m.url == "" {
if err := m.reconnect(ctx); err != nil {
return err
}
}
info, err := m.gw.Info(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -785,6 +802,8 @@ func (m *mcuJanus) getOrCreateRemotePublisher(ctx context.Context, controller Re
settings: settings,
},

controller: controller,

port: int(port),
rtcpPort: int(rtcp_port),
}
Expand Down
10 changes: 5 additions & 5 deletions mcu_janus_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,16 +380,16 @@ func (p *mcuJanusPublisher) GetStreams(ctx context.Context) ([]PublisherStream,
return streams, nil
}

func getPublisherRemoteId(id string, remoteId string) string {
return fmt.Sprintf("%s@%s", id, remoteId)
func getPublisherRemoteId(id string, remoteId string, hostname string, port int, rtcpPort int) string {
return fmt.Sprintf("%s-%s@%s:%d:%d", id, remoteId, hostname, port, rtcpPort)
}

func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error {
msg := map[string]interface{}{
"request": "publish_remotely",
"room": p.roomId,
"publisher_id": streamTypeUserIds[p.streamType],
"remote_id": getPublisherRemoteId(p.id, remoteId),
"remote_id": getPublisherRemoteId(p.id, remoteId, hostname, port, rtcpPort),
"host": hostname,
"port": port,
"rtcp_port": rtcpPort,
Expand Down Expand Up @@ -421,12 +421,12 @@ func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, remoteId string,
return nil
}

func (p *mcuJanusPublisher) UnpublishRemote(ctx context.Context, remoteId string) error {
func (p *mcuJanusPublisher) UnpublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error {
msg := map[string]interface{}{
"request": "unpublish_remotely",
"room": p.roomId,
"publisher_id": streamTypeUserIds[p.streamType],
"remote_id": getPublisherRemoteId(p.id, remoteId),
"remote_id": getPublisherRemoteId(p.id, remoteId, hostname, port, rtcpPort),
}
response, err := p.handle.Request(ctx, msg)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions mcu_janus_remote_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type mcuJanusRemotePublisher struct {

ref atomic.Int64

controller RemotePublisherController

port int
rtcpPort int
}
Expand Down Expand Up @@ -116,6 +118,10 @@ func (p *mcuJanusRemotePublisher) Close(ctx context.Context) {
return
}

if err := p.controller.StopPublishing(ctx, p); err != nil {
log.Printf("Error stopping remote publisher %s in room %d: %s", p.id, p.roomId, err)
}

p.mu.Lock()
if handle := p.handle; handle != nil {
response, err := p.handle.Request(ctx, map[string]interface{}{
Expand Down
Loading

0 comments on commit 8a53bab

Please sign in to comment.