Skip to content

Commit

Permalink
Add basic stats API.
Browse files Browse the repository at this point in the history
Can be used to query number of sessions, rooms and (if Janus is configured),
overall MCU clients and publishers.
  • Loading branch information
fancycode committed May 28, 2020
1 parent 563658b commit 6455e70
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 0 deletions.
5 changes: 5 additions & 0 deletions server.conf.in
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,8 @@ url =
# information.
# Leave empty to disable GeoIP lookups.
#license =

[stats]
# Comma-separated list of IP addresses that are allowed to access the stats
# endpoint. Leave empty (or commented) to only allow access from "127.0.0.1".
#allowed_ips =
58 changes: 58 additions & 0 deletions src/signaling/backend_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"io"
"io/ioutil"
"log"
"net"
"net/http"
"reflect"
"strings"
Expand All @@ -49,6 +50,7 @@ const (
)

type BackendServer struct {
hub *Hub
nats NatsClient
roomSessions RoomSessions

Expand All @@ -61,6 +63,8 @@ type BackendServer struct {
turnsecret []byte
turnvalid time.Duration
turnservers []string

statsAllowedIps map[string]bool
}

func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*BackendServer, error) {
Expand Down Expand Up @@ -95,7 +99,26 @@ func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*Bac
}
}

statsAllowed, _ := config.GetString("stats", "allowed_ips")
var statsAllowedIps map[string]bool
if statsAllowed == "" {
log.Printf("No IPs configured for the stats endpoint, only allowing access from 127.0.0.1")
statsAllowedIps = map[string]bool{
"127.0.0.1": true,
}
} else {
log.Printf("Only allowing access to the stats endpoing from %s", statsAllowed)
statsAllowedIps = make(map[string]bool)
for _, ip := range strings.Split(statsAllowed, ",") {
ip = strings.TrimSpace(ip)
if ip != "" {
statsAllowedIps[ip] = true
}
}
}

return &BackendServer{
hub: hub,
nats: hub.nats,
roomSessions: hub.roomSessions,
version: version,
Expand All @@ -107,6 +130,8 @@ func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*Bac
turnsecret: []byte(turnsecret),
turnvalid: turnvalid,
turnservers: turnserverslist,

statsAllowedIps: statsAllowedIps,
}, nil
}

Expand All @@ -124,6 +149,7 @@ func (b *BackendServer) Start(r *mux.Router) error {
s := r.PathPrefix("/api/v1").Subrouter()
s.HandleFunc("/welcome", b.setComonHeaders(b.welcomeFunc)).Methods("GET")
s.HandleFunc("/room/{roomid}", b.setComonHeaders(b.validateBackendRequest(b.roomHandler))).Methods("POST")
s.HandleFunc("/stats", b.setComonHeaders(b.validateStatsRequest(b.statsHandler))).Methods("GET")

// Provide a REST service to get TURN credentials.
// See https://tools.ietf.org/html/draft-uberti-behave-turn-rest-00
Expand Down Expand Up @@ -524,3 +550,35 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body
// TODO(jojo): Return better response struct.
w.Write([]byte("{}"))
}

func (b *BackendServer) validateStatsRequest(f func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
addr := getRealUserIP(r)
if strings.Contains(addr, ":") {
if host, _, err := net.SplitHostPort(addr); err == nil {
addr = host
}
}
if !b.statsAllowedIps[addr] {
http.Error(w, "Authentication check failed", http.StatusForbidden)
return
}

f(w, r)
}
}

func (b *BackendServer) statsHandler(w http.ResponseWriter, r *http.Request) {
stats := b.hub.GetStats()
statsData, err := json.MarshalIndent(stats, "", " ")
if err != nil {
log.Printf("Could not serialize stats %+v: %s", stats, err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(http.StatusOK)
w.Write(statsData)
}
16 changes: 16 additions & 0 deletions src/signaling/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -1413,6 +1413,22 @@ func (h *Hub) processRoomParticipants(message *BackendServerRoomRequest) {
room.PublishUsersChanged(message.Participants.Changed, message.Participants.Users)
}

func (h *Hub) GetStats() map[string]interface{} {
result := make(map[string]interface{})
h.ru.RLock()
result["rooms"] = len(h.rooms)
h.ru.RUnlock()
h.mu.Lock()
result["sessions"] = len(h.sessions)
h.mu.Unlock()
if h.mcu != nil {
if stats := h.mcu.GetStats(); stats != nil {
result["mcu"] = stats
}
}
return result
}

func getRealUserIP(r *http.Request) string {
// Note this function assumes it is running behind a trusted proxy, so
// the headers can be trusted.
Expand Down
2 changes: 2 additions & 0 deletions src/signaling/mcu_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type Mcu interface {
Start() error
Stop()

GetStats() interface{}

NewPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error)
NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error)
}
Expand Down
28 changes: 28 additions & 0 deletions src/signaling/mcu_janus.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ type mcuJanus struct {

reconnectTimer *time.Timer
reconnectInterval time.Duration

connectedSince time.Time
}

func NewMcuJanus(url string, config *goconf.ConfigFile, nats NatsClient) (Mcu, error) {
Expand Down Expand Up @@ -303,6 +305,7 @@ func (m *mcuJanus) Start() error {
return err
}
log.Println("Created Janus session", m.session.Id)
m.connectedSince = time.Now()

if m.handle, err = m.session.Attach(ctx, pluginVideoRoom); err != nil {
m.disconnect()
Expand Down Expand Up @@ -346,6 +349,31 @@ func (m *mcuJanus) Stop() {
m.reconnectTimer.Stop()
}

type mcuJanusConnectionStats struct {
Url string `json:"url"`
Connected bool `json:"connected"`
Publishers int64 `json:"publishers"`
Clients int64 `json:"clients"`
Uptime *time.Time `json:"uptime,omitempty"`
}

func (m *mcuJanus) GetStats() interface{} {
result := mcuJanusConnectionStats{
Url: m.url,
}
if m.session != nil {
result.Connected = true
result.Uptime = &m.connectedSince
}
m.mu.Lock()
result.Publishers = int64(len(m.publisherRoomIds))
m.mu.Unlock()
m.muClients.Lock()
result.Clients = int64(len(m.clients))
m.muClients.Unlock()
return result
}

func (m *mcuJanus) sendKeepalive() {
ctx := context.TODO()
if _, err := m.session.KeepAlive(ctx); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions src/signaling/mcu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (m *TestMCU) Start() error {
func (m *TestMCU) Stop() {
}

func (m *TestMCU) GetStats() interface{} {
return nil
}

func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error) {
return nil, fmt.Errorf("Not implemented")
}
Expand Down

0 comments on commit 6455e70

Please sign in to comment.