Skip to content

Commit

Permalink
Merge pull request #16 from strukturag/stats-api
Browse files Browse the repository at this point in the history
Add basic stats API.
  • Loading branch information
fancycode authored Jun 30, 2020
2 parents 3e11bf7 + 6455e70 commit 3d73ab4
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 0 deletions.
5 changes: 5 additions & 0 deletions server.conf.in
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,8 @@ url =
# information.
# Leave empty to disable GeoIP lookups.
#license =

[stats]
# Comma-separated list of IP addresses that are allowed to access the stats
# endpoint. Leave empty (or commented) to only allow access from "127.0.0.1".
#allowed_ips =
58 changes: 58 additions & 0 deletions src/signaling/backend_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"io"
"io/ioutil"
"log"
"net"
"net/http"
"reflect"
"strings"
Expand All @@ -49,6 +50,7 @@ const (
)

type BackendServer struct {
hub *Hub
nats NatsClient
roomSessions RoomSessions

Expand All @@ -61,6 +63,8 @@ type BackendServer struct {
turnsecret []byte
turnvalid time.Duration
turnservers []string

statsAllowedIps map[string]bool
}

func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*BackendServer, error) {
Expand Down Expand Up @@ -95,7 +99,26 @@ func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*Bac
}
}

statsAllowed, _ := config.GetString("stats", "allowed_ips")
var statsAllowedIps map[string]bool
if statsAllowed == "" {
log.Printf("No IPs configured for the stats endpoint, only allowing access from 127.0.0.1")
statsAllowedIps = map[string]bool{
"127.0.0.1": true,
}
} else {
log.Printf("Only allowing access to the stats endpoing from %s", statsAllowed)
statsAllowedIps = make(map[string]bool)
for _, ip := range strings.Split(statsAllowed, ",") {
ip = strings.TrimSpace(ip)
if ip != "" {
statsAllowedIps[ip] = true
}
}
}

return &BackendServer{
hub: hub,
nats: hub.nats,
roomSessions: hub.roomSessions,
version: version,
Expand All @@ -107,6 +130,8 @@ func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*Bac
turnsecret: []byte(turnsecret),
turnvalid: turnvalid,
turnservers: turnserverslist,

statsAllowedIps: statsAllowedIps,
}, nil
}

Expand All @@ -124,6 +149,7 @@ func (b *BackendServer) Start(r *mux.Router) error {
s := r.PathPrefix("/api/v1").Subrouter()
s.HandleFunc("/welcome", b.setComonHeaders(b.welcomeFunc)).Methods("GET")
s.HandleFunc("/room/{roomid}", b.setComonHeaders(b.validateBackendRequest(b.roomHandler))).Methods("POST")
s.HandleFunc("/stats", b.setComonHeaders(b.validateStatsRequest(b.statsHandler))).Methods("GET")

// Provide a REST service to get TURN credentials.
// See https://tools.ietf.org/html/draft-uberti-behave-turn-rest-00
Expand Down Expand Up @@ -524,3 +550,35 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body
// TODO(jojo): Return better response struct.
w.Write([]byte("{}"))
}

func (b *BackendServer) validateStatsRequest(f func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
addr := getRealUserIP(r)
if strings.Contains(addr, ":") {
if host, _, err := net.SplitHostPort(addr); err == nil {
addr = host
}
}
if !b.statsAllowedIps[addr] {
http.Error(w, "Authentication check failed", http.StatusForbidden)
return
}

f(w, r)
}
}

func (b *BackendServer) statsHandler(w http.ResponseWriter, r *http.Request) {
stats := b.hub.GetStats()
statsData, err := json.MarshalIndent(stats, "", " ")
if err != nil {
log.Printf("Could not serialize stats %+v: %s", stats, err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(http.StatusOK)
w.Write(statsData)
}
16 changes: 16 additions & 0 deletions src/signaling/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -1413,6 +1413,22 @@ func (h *Hub) processRoomParticipants(message *BackendServerRoomRequest) {
room.PublishUsersChanged(message.Participants.Changed, message.Participants.Users)
}

func (h *Hub) GetStats() map[string]interface{} {
result := make(map[string]interface{})
h.ru.RLock()
result["rooms"] = len(h.rooms)
h.ru.RUnlock()
h.mu.Lock()
result["sessions"] = len(h.sessions)
h.mu.Unlock()
if h.mcu != nil {
if stats := h.mcu.GetStats(); stats != nil {
result["mcu"] = stats
}
}
return result
}

func getRealUserIP(r *http.Request) string {
// Note this function assumes it is running behind a trusted proxy, so
// the headers can be trusted.
Expand Down
2 changes: 2 additions & 0 deletions src/signaling/mcu_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type Mcu interface {
Start() error
Stop()

GetStats() interface{}

NewPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error)
NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error)
}
Expand Down
28 changes: 28 additions & 0 deletions src/signaling/mcu_janus.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ type mcuJanus struct {

reconnectTimer *time.Timer
reconnectInterval time.Duration

connectedSince time.Time
}

func NewMcuJanus(url string, config *goconf.ConfigFile, nats NatsClient) (Mcu, error) {
Expand Down Expand Up @@ -303,6 +305,7 @@ func (m *mcuJanus) Start() error {
return err
}
log.Println("Created Janus session", m.session.Id)
m.connectedSince = time.Now()

if m.handle, err = m.session.Attach(ctx, pluginVideoRoom); err != nil {
m.disconnect()
Expand Down Expand Up @@ -346,6 +349,31 @@ func (m *mcuJanus) Stop() {
m.reconnectTimer.Stop()
}

type mcuJanusConnectionStats struct {
Url string `json:"url"`
Connected bool `json:"connected"`
Publishers int64 `json:"publishers"`
Clients int64 `json:"clients"`
Uptime *time.Time `json:"uptime,omitempty"`
}

func (m *mcuJanus) GetStats() interface{} {
result := mcuJanusConnectionStats{
Url: m.url,
}
if m.session != nil {
result.Connected = true
result.Uptime = &m.connectedSince
}
m.mu.Lock()
result.Publishers = int64(len(m.publisherRoomIds))
m.mu.Unlock()
m.muClients.Lock()
result.Clients = int64(len(m.clients))
m.muClients.Unlock()
return result
}

func (m *mcuJanus) sendKeepalive() {
ctx := context.TODO()
if _, err := m.session.KeepAlive(ctx); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions src/signaling/mcu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (m *TestMCU) Start() error {
func (m *TestMCU) Stop() {
}

func (m *TestMCU) GetStats() interface{} {
return nil
}

func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error) {
return nil, fmt.Errorf("Not implemented")
}
Expand Down

0 comments on commit 3d73ab4

Please sign in to comment.