Skip to content

Commit

Permalink
server: Switch to using "go.uber.org/zap" for logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
fancycode committed Sep 15, 2024
1 parent 5975d6e commit 8037124
Show file tree
Hide file tree
Showing 73 changed files with 2,750 additions and 1,101 deletions.
9 changes: 7 additions & 2 deletions api_signaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"net/url"
"sort"
"strings"
"time"

"github.com/golang-jwt/jwt/v4"
"github.com/pion/sdp/v3"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -265,7 +265,12 @@ func NewErrorDetail(code string, message string, details interface{}) *Error {
if details != nil {
var err error
if rawDetails, err = json.Marshal(details); err != nil {
log.Printf("Could not marshal details %+v for error %s with %s: %s", details, code, message, err)
zap.L().Error("Could not marshal error details",
zap.String("code", code),
zap.String("message", message),
zap.Any("details", details),
zap.Error(err),
)
return NewError("internal_error", "Could not marshal error details")
}
}
Expand Down
12 changes: 8 additions & 4 deletions async_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
*/
package signaling

import "sync"
import (
"sync"

"go.uber.org/zap"
)

type AsyncBackendRoomEventListener interface {
ProcessBackendRoomRequest(message *AsyncMessage)
Expand Down Expand Up @@ -60,13 +64,13 @@ type AsyncEvents interface {
PublishSessionMessage(sessionId string, backend *Backend, message *AsyncMessage) error
}

func NewAsyncEvents(url string) (AsyncEvents, error) {
client, err := NewNatsClient(url)
func NewAsyncEvents(log *zap.Logger, url string) (AsyncEvents, error) {
client, err := NewNatsClient(log, url)
if err != nil {
return nil, err
}

return NewAsyncEventsNats(client)
return NewAsyncEventsNats(log, client)
}

type asyncBackendRoomSubscriber struct {
Expand Down
60 changes: 40 additions & 20 deletions async_events_nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
package signaling

import (
"log"
"sync"
"time"

"github.com/nats-io/nats.go"
"go.uber.org/zap"
)

func GetSubjectForBackendRoomId(roomId string, backend *Backend) string {
Expand Down Expand Up @@ -58,6 +58,7 @@ func GetSubjectForSessionId(sessionId string, backend *Backend) string {
}

type asyncSubscriberNats struct {
log *zap.Logger
key string
client NatsClient

Expand All @@ -68,14 +69,17 @@ type asyncSubscriberNats struct {
processMessage func(*nats.Msg)
}

func newAsyncSubscriberNats(key string, client NatsClient) (*asyncSubscriberNats, error) {
func newAsyncSubscriberNats(log *zap.Logger, key string, client NatsClient) (*asyncSubscriberNats, error) {
receiver := make(chan *nats.Msg, 64)
sub, err := client.Subscribe(key, receiver)
if err != nil {
return nil, err
}

result := &asyncSubscriberNats{
log: log.With(
zap.String("key", key),
),
key: key,
client: client,

Expand All @@ -89,7 +93,9 @@ func newAsyncSubscriberNats(key string, client NatsClient) (*asyncSubscriberNats
func (s *asyncSubscriberNats) run() {
defer func() {
if err := s.subscription.Unsubscribe(); err != nil {
log.Printf("Error unsubscribing %s: %s", s.key, err)
s.log.Error("Error unsubscribing",
zap.Error(err),
)
}
}()

Expand All @@ -115,8 +121,8 @@ type asyncBackendRoomSubscriberNats struct {
asyncBackendRoomSubscriber
}

func newAsyncBackendRoomSubscriberNats(key string, client NatsClient) (*asyncBackendRoomSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(key, client)
func newAsyncBackendRoomSubscriberNats(log *zap.Logger, key string, client NatsClient) (*asyncBackendRoomSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(log, key, client)
if err != nil {
return nil, err
}
Expand All @@ -132,7 +138,10 @@ func newAsyncBackendRoomSubscriberNats(key string, client NatsClient) (*asyncBac
func (s *asyncBackendRoomSubscriberNats) doProcessMessage(msg *nats.Msg) {
var message AsyncMessage
if err := s.client.Decode(msg, &message); err != nil {
log.Printf("Could not decode NATS message %+v, %s", msg, err)
s.log.Error("Could not decode NATS message",
zap.Any("message", msg),
zap.Error(err),
)
return
}

Expand All @@ -144,8 +153,8 @@ type asyncRoomSubscriberNats struct {
*asyncSubscriberNats
}

func newAsyncRoomSubscriberNats(key string, client NatsClient) (*asyncRoomSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(key, client)
func newAsyncRoomSubscriberNats(log *zap.Logger, key string, client NatsClient) (*asyncRoomSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(log, key, client)
if err != nil {
return nil, err
}
Expand All @@ -161,7 +170,10 @@ func newAsyncRoomSubscriberNats(key string, client NatsClient) (*asyncRoomSubscr
func (s *asyncRoomSubscriberNats) doProcessMessage(msg *nats.Msg) {
var message AsyncMessage
if err := s.client.Decode(msg, &message); err != nil {
log.Printf("Could not decode nats message %+v, %s", msg, err)
s.log.Error("Could not decode NATS message",
zap.Any("message", msg),
zap.Error(err),
)
return
}

Expand All @@ -173,8 +185,8 @@ type asyncUserSubscriberNats struct {
asyncUserSubscriber
}

func newAsyncUserSubscriberNats(key string, client NatsClient) (*asyncUserSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(key, client)
func newAsyncUserSubscriberNats(log *zap.Logger, key string, client NatsClient) (*asyncUserSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(log, key, client)
if err != nil {
return nil, err
}
Expand All @@ -190,7 +202,10 @@ func newAsyncUserSubscriberNats(key string, client NatsClient) (*asyncUserSubscr
func (s *asyncUserSubscriberNats) doProcessMessage(msg *nats.Msg) {
var message AsyncMessage
if err := s.client.Decode(msg, &message); err != nil {
log.Printf("Could not decode nats message %+v, %s", msg, err)
s.log.Error("Could not decode NATS message",
zap.Any("message", msg),
zap.Error(err),
)
return
}

Expand All @@ -202,8 +217,8 @@ type asyncSessionSubscriberNats struct {
asyncSessionSubscriber
}

func newAsyncSessionSubscriberNats(key string, client NatsClient) (*asyncSessionSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(key, client)
func newAsyncSessionSubscriberNats(log *zap.Logger, key string, client NatsClient) (*asyncSessionSubscriberNats, error) {
sub, err := newAsyncSubscriberNats(log, key, client)
if err != nil {
return nil, err
}
Expand All @@ -219,14 +234,18 @@ func newAsyncSessionSubscriberNats(key string, client NatsClient) (*asyncSession
func (s *asyncSessionSubscriberNats) doProcessMessage(msg *nats.Msg) {
var message AsyncMessage
if err := s.client.Decode(msg, &message); err != nil {
log.Printf("Could not decode nats message %+v, %s", msg, err)
s.log.Error("Could not decode NATS message",
zap.Any("message", msg),
zap.Error(err),
)
return
}

s.processAsyncSessionMessage(&message)
}

type asyncEventsNats struct {
log *zap.Logger
mu sync.Mutex
client NatsClient

Expand All @@ -236,8 +255,9 @@ type asyncEventsNats struct {
sessionSubscriptions map[string]*asyncSessionSubscriberNats
}

func NewAsyncEventsNats(client NatsClient) (AsyncEvents, error) {
func NewAsyncEventsNats(log *zap.Logger, client NatsClient) (AsyncEvents, error) {
events := &asyncEventsNats{
log: log,
client: client,

backendRoomSubscriptions: make(map[string]*asyncBackendRoomSubscriberNats),
Expand Down Expand Up @@ -298,7 +318,7 @@ func (e *asyncEventsNats) RegisterBackendRoomListener(roomId string, backend *Ba
sub, found := e.backendRoomSubscriptions[key]
if !found {
var err error
if sub, err = newAsyncBackendRoomSubscriberNats(key, e.client); err != nil {
if sub, err = newAsyncBackendRoomSubscriberNats(e.log, key, e.client); err != nil {
return err
}

Expand Down Expand Up @@ -332,7 +352,7 @@ func (e *asyncEventsNats) RegisterRoomListener(roomId string, backend *Backend,
sub, found := e.roomSubscriptions[key]
if !found {
var err error
if sub, err = newAsyncRoomSubscriberNats(key, e.client); err != nil {
if sub, err = newAsyncRoomSubscriberNats(e.log, key, e.client); err != nil {
return err
}

Expand Down Expand Up @@ -366,7 +386,7 @@ func (e *asyncEventsNats) RegisterUserListener(roomId string, backend *Backend,
sub, found := e.userSubscriptions[key]
if !found {
var err error
if sub, err = newAsyncUserSubscriberNats(key, e.client); err != nil {
if sub, err = newAsyncUserSubscriberNats(e.log, key, e.client); err != nil {
return err
}

Expand Down Expand Up @@ -400,7 +420,7 @@ func (e *asyncEventsNats) RegisterSessionListener(sessionId string, backend *Bac
sub, found := e.sessionSubscriptions[key]
if !found {
var err error
if sub, err = newAsyncSessionSubscriberNats(key, e.client); err != nil {
if sub, err = newAsyncSessionSubscriberNats(e.log, key, e.client); err != nil {
return err
}

Expand Down
6 changes: 4 additions & 2 deletions async_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,17 @@ func getAsyncEventsForTest(t *testing.T) AsyncEvents {

func getRealAsyncEventsForTest(t *testing.T) AsyncEvents {
url := startLocalNatsServer(t)
events, err := NewAsyncEvents(url)
log := GetLoggerForTest(t)
events, err := NewAsyncEvents(log, url)
if err != nil {
require.NoError(t, err)
}
return events
}

func getLoopbackAsyncEventsForTest(t *testing.T) AsyncEvents {
events, err := NewAsyncEvents(NatsLoopbackUrl)
log := GetLoggerForTest(t)
events, err := NewAsyncEvents(log, NatsLoopbackUrl)
if err != nil {
require.NoError(t, err)
}
Expand Down
Loading

0 comments on commit 8037124

Please sign in to comment.