From 1b285a1f897ff6f471230454af9441f7cdd2e3b5 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Sun, 15 Sep 2024 19:22:58 +0200 Subject: [PATCH] client: Switch to using go.uber.org/zap for logging. --- client/main.go | 154 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 109 insertions(+), 45 deletions(-) diff --git a/client/main.go b/client/main.go index 674fc9e9..c537bf7d 100644 --- a/client/main.go +++ b/client/main.go @@ -28,7 +28,6 @@ import ( "flag" "fmt" "io" - "log" pseudorand "math/rand" "net" "net/http" @@ -46,6 +45,7 @@ import ( "github.com/gorilla/securecookie" "github.com/gorilla/websocket" "github.com/mailru/easyjson" + "go.uber.org/zap" signaling "github.com/strukturag/nextcloud-spreed-signaling" ) @@ -81,6 +81,8 @@ const ( ) type Stats struct { + log *zap.Logger + numRecvMessages atomic.Uint64 numSentMessages atomic.Uint64 resetRecvMessages uint64 @@ -107,10 +109,13 @@ func (s *Stats) Log() { sentMessages := totalSentMessages - s.resetSentMessages totalRecvMessages := s.numRecvMessages.Load() recvMessages := totalRecvMessages - s.resetRecvMessages - log.Printf("Stats: sent=%d (%d/sec), recv=%d (%d/sec), delta=%d", - totalSentMessages, sentMessages/perSec, - totalRecvMessages, recvMessages/perSec, - totalSentMessages-totalRecvMessages) + s.log.Info("Stats updated", + zap.Uint64("sent", totalSentMessages), + zap.Uint64("sentspeed", sentMessages/perSec), + zap.Uint64("recv", totalRecvMessages), + zap.Uint64("recvspeed", recvMessages/perSec), + zap.Uint64("delta", totalSentMessages-totalRecvMessages), + ) s.reset(now) } @@ -119,6 +124,7 @@ type MessagePayload struct { } type SignalingClient struct { + log *zap.Logger readyWg *sync.WaitGroup cookie *securecookie.SecureCookie @@ -135,13 +141,14 @@ type SignalingClient struct { userId string } -func NewSignalingClient(cookie *securecookie.SecureCookie, url string, stats *Stats, readyWg *sync.WaitGroup, doneWg *sync.WaitGroup) (*SignalingClient, error) { +func NewSignalingClient(log *zap.Logger, cookie *securecookie.SecureCookie, url string, stats *Stats, readyWg *sync.WaitGroup, doneWg *sync.WaitGroup) (*SignalingClient, error) { conn, _, err := websocket.DefaultDialer.Dial(url, nil) if err != nil { return nil, err } client := &SignalingClient{ + log: log, readyWg: readyWg, cookie: cookie, @@ -204,13 +211,19 @@ func (c *SignalingClient) processMessage(message *signaling.ServerMessage) { case "message": c.processMessageMessage(message) case "bye": - log.Printf("Received bye: %+v", message.Bye) + c.log.Error("Received bye", + zap.Any("bye", message.Bye), + ) c.Close() case "error": - log.Printf("Received error: %+v", message.Error) + c.log.Error("Received error", + zap.Any("error", message.Error), + ) c.Close() default: - log.Printf("Unsupported message type: %+v", *message) + c.log.Warn("Unsupported message type", + zap.Stringer("message", message), + ) } } @@ -236,7 +249,10 @@ func (c *SignalingClient) processHelloMessage(message *signaling.ServerMessage) c.privateSessionId = message.Hello.ResumeId c.publicSessionId = c.privateToPublicSessionId(c.privateSessionId) c.userId = message.Hello.UserId - log.Printf("Registered as %s (userid %s)", c.privateSessionId, c.userId) + c.log.Info("Registered", + zap.String("privateid", c.privateSessionId), + zap.String("userid", c.userId), + ) c.readyWg.Done() } @@ -249,14 +265,18 @@ func (c *SignalingClient) PublicSessionId() string { func (c *SignalingClient) processMessageMessage(message *signaling.ServerMessage) { var msg MessagePayload if err := json.Unmarshal(message.Message.Data, &msg); err != nil { - log.Println("Error in unmarshal", err) + c.log.Error("Error in unmarshal", + zap.Error(err), + ) return } now := time.Now() duration := now.Sub(msg.Now) if duration > messageReportDuration { - log.Printf("Message took %s", duration) + c.log.Warn("Message took too long", + zap.Duration("duration", duration), + ) } } @@ -283,13 +303,17 @@ func (c *SignalingClient) readPump() { websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived) { - log.Printf("Error: %v", err) + c.log.Error("Error reading", + zap.Error(err), + ) } break } if messageType != websocket.TextMessage { - log.Println("Unsupported message type", messageType) + c.log.Error("Unsupported message type", + zap.Int("type", messageType), + ) break } @@ -297,7 +321,9 @@ func (c *SignalingClient) readPump() { if _, err := decodeBuffer.ReadFrom(reader); err != nil { c.lock.Lock() if c.conn != nil { - log.Println("Error reading message", err) + c.log.Error("Error reading message", + zap.Error(err), + ) } c.lock.Unlock() break @@ -305,7 +331,9 @@ func (c *SignalingClient) readPump() { var message signaling.ServerMessage if err := message.UnmarshalJSON(decodeBuffer.Bytes()); err != nil { - log.Printf("Error: %v", err) + c.log.Error("Error unmarshalling", + zap.Error(err), + ) break } @@ -327,7 +355,10 @@ func (c *SignalingClient) writeInternal(message *signaling.ClientMessage) bool { return false } - log.Println("Could not send message", message, err) + c.log.Error("Could not send message", + zap.Stringer("message", message), + zap.Error(err), + ) // TODO(jojo): Differentiate between JSON encode errors and websocket errors. closeData = websocket.FormatCloseMessage(websocket.CloseInternalServerErr, "") goto close @@ -413,29 +444,33 @@ func (c *SignalingClient) SendMessages(clients []*SignalingClient) { } } -func registerAuthHandler(router *mux.Router) { +func registerAuthHandler(log *zap.Logger, router *mux.Router) { router.HandleFunc("/auth", func(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) if err != nil { - log.Println("Error reading body:", err) + log.Error("Error reading body", + zap.Error(err), + ) return } rnd := r.Header.Get(signaling.HeaderBackendSignalingRandom) checksum := r.Header.Get(signaling.HeaderBackendSignalingChecksum) if rnd == "" || checksum == "" { - log.Println("No checksum headers found") + log.Error("No checksum headers found") return } if verify := signaling.CalculateBackendChecksum(rnd, body, backendSecret); verify != checksum { - log.Println("Backend checksum verification failed") + log.Error("Backend checksum verification failed") return } var request signaling.BackendClientRequest if err := request.UnmarshalJSON(body); err != nil { - log.Println(err) + log.Error("Error unmarshalling", + zap.Error(err), + ) return } @@ -449,7 +484,9 @@ func registerAuthHandler(router *mux.Router) { data, err := response.MarshalJSON() if err != nil { - log.Println(err) + log.Error("Error marshalling response message", + zap.Error(err), + ) return } @@ -467,7 +504,9 @@ func registerAuthHandler(router *mux.Router) { jsonpayload, err := payload.MarshalJSON() if err != nil { - log.Println(err) + log.Error("Error marshalling payload", + zap.Error(err), + ) return } @@ -477,10 +516,12 @@ func registerAuthHandler(router *mux.Router) { }) } -func getLocalIP() string { +func getLocalIP(log *zap.Logger) string { interfaces, err := net.InterfaceAddrs() if err != nil { - log.Fatal(err) + log.Fatal("Error getting interfaces", + zap.Error(err), + ) } for _, intf := range interfaces { switch t := intf.(type) { @@ -508,11 +549,14 @@ func reverseSessionId(s string) (string, error) { func main() { flag.Parse() - log.SetFlags(0) + + log := zap.Must(zap.NewDevelopment()) config, err := goconf.ReadConfigFile(*config) if err != nil { - log.Fatal("Could not read configuration: ", err) + log.Fatal("Could not read configuration", + zap.Error(err), + ) } secret, _ := config.GetString("backend", "secret") @@ -523,7 +567,9 @@ func main() { case 32: case 64: default: - log.Printf("WARNING: The sessions hash key should be 32 or 64 bytes but is %d bytes", len(hashKey)) + log.Warn("The sessions hash key should be 32 or 64 bytes", + zap.Int("len", len(hashKey)), + ) } blockKey, _ := config.GetString("sessions", "blockkey") @@ -535,24 +581,30 @@ func main() { case 24: case 32: default: - log.Fatalf("The sessions block key must be 16, 24 or 32 bytes but is %d bytes", len(blockKey)) + log.Fatal("The sessions block key must be 16, 24 or 32 bytes", + zap.Int("len", len(blockKey)), + ) } cookie := securecookie.New([]byte(hashKey), blockBytes).MaxAge(0) cpus := runtime.NumCPU() runtime.GOMAXPROCS(cpus) - log.Printf("Using a maximum of %d CPUs", cpus) + log.Debug("Using number of CPUs", + zap.Int("cpus", cpus), + ) interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt) r := mux.NewRouter() - registerAuthHandler(r) + registerAuthHandler(log, r) - localIP := getLocalIP() + localIP := getLocalIP(log) listener, err := net.Listen("tcp", localIP+":0") if err != nil { - log.Fatal(err) + log.Fatal("Error starting listener", + zap.Error(err), + ) } server := http.Server{ @@ -562,7 +614,9 @@ func main() { server.Serve(listener) // nolint }() backendUrl := "http://" + listener.Addr().String() - log.Println("Backend server running on", backendUrl) + log.Info("Backend server running", + zap.String("url", backendUrl), + ) urls := make([]url.URL, 0) urlstrings := make([]string, 0) @@ -575,24 +629,34 @@ func main() { urls = append(urls, u) urlstrings = append(urlstrings, u.String()) } - log.Printf("Connecting to %s", urlstrings) + log.Info("Connecting", + zap.Strings("urls", urlstrings), + ) clients := make([]*SignalingClient, 0) - stats := &Stats{} + stats := &Stats{ + log: log, + } if *maxClients < 2 { - log.Fatalf("Need at least 2 clients, got %d", *maxClients) + log.Fatal("Need at least 2 clients", + zap.Int("count", *maxClients), + ) } - log.Printf("Starting %d clients", *maxClients) + log.Info("Starting clients", + zap.Int("count", *maxClients), + ) var doneWg sync.WaitGroup var readyWg sync.WaitGroup for i := 0; i < *maxClients; i++ { - client, err := NewSignalingClient(cookie, urls[i%len(urls)].String(), stats, &readyWg, &doneWg) + client, err := NewSignalingClient(log, cookie, urls[i%len(urls)].String(), stats, &readyWg, &doneWg) if err != nil { - log.Fatal(err) + log.Fatal("Error creating signaling client", + zap.Error(err), + ) } defer client.Close() readyWg.Add(1) @@ -612,10 +676,10 @@ func main() { clients = append(clients, client) } - log.Println("Clients created") + log.Info("Clients created") readyWg.Wait() - log.Println("All connections established") + log.Info("All connections established") for _, c := range clients { doneWg.Add(1) @@ -632,14 +696,14 @@ loop: for { select { case <-interrupt: - log.Println("Interrupted") + log.Info("Interrupted") break loop case <-report.C: stats.Log() } } - log.Println("Waiting for clients to terminate ...") + log.Info("Waiting for clients to terminate ...") for _, c := range clients { c.Close() }