Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ws reconnection bug + clean up logging #1058

Merged
merged 2 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion emain/docsite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ export async function initDocsite() {
console.log("Embedded docsite is running, using embedded version for help view");
docsiteUrl = docsiteEmbeddedUrl;
} else {
console.log("Embedded docsite is not running, using web version for help view", response);
console.log(
"Embedded docsite is not running, using web version for help view",
"status: " + response?.status
);
docsiteUrl = docsiteWebUrl;
}
} catch (error) {
Expand Down
2 changes: 1 addition & 1 deletion emain/emain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ async function relaunchBrowserWindows(): Promise<void> {
}
for (const win of wins) {
await win.readyPromise;
console.log("show", win.waveWindowId);
console.log("show window", win.waveWindowId);
win.show();
}
}
Expand Down
32 changes: 21 additions & 11 deletions frontend/app/store/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const dlog = debug("wave:ws");
const WarnWebSocketSendSize = 1024 * 1024; // 1MB
const MaxWebSocketSendSize = 5 * 1024 * 1024; // 5MB
const reconnectHandlers: (() => void)[] = [];
const StableConnTime = 2000;

function addWSReconnectHandler(handler: () => void) {
reconnectHandlers.push(handler);
Expand Down Expand Up @@ -45,6 +46,7 @@ class WSControl {
lastReconnectTime: number = 0;
eoOpts: ElectronOverrideOpts;
noReconnect: boolean = false;
onOpenTimeoutId: NodeJS.Timeout = null;

constructor(
baseHostPort: string,
Expand Down Expand Up @@ -80,9 +82,15 @@ class WSControl {
}
: null
);
this.wsConn.onopen = this.onopen.bind(this);
this.wsConn.onmessage = this.onmessage.bind(this);
this.wsConn.onclose = this.onclose.bind(this);
this.wsConn.onopen = (e: Event) => {
this.onopen(e);
};
this.wsConn.onmessage = (e: MessageEvent) => {
this.onmessage(e);
};
this.wsConn.onclose = (e: CloseEvent) => {
this.onclose(e);
};
// turns out onerror is not necessary (onclose always follows onerror)
// this.wsConn.onerror = this.onerror;
}
Expand Down Expand Up @@ -118,8 +126,11 @@ class WSControl {
}, timeout * 1000);
}

onclose(event: any) {
onclose(event: CloseEvent) {
// console.log("close", event);
if (this.onOpenTimeoutId) {
clearTimeout(this.onOpenTimeoutId);
}
if (event.wasClean) {
dlog("connection closed");
} else {
Expand All @@ -132,15 +143,18 @@ class WSControl {
}
}

onopen() {
onopen(e: Event) {
dlog("connection open");
this.open = true;
this.opening = false;
this.onOpenTimeoutId = setTimeout(() => {
this.reconnectTimes = 0;
dlog("clear reconnect times");
}, StableConnTime);
for (let handler of reconnectHandlers) {
handler();
}
this.runMsgQueue();
// reconnectTimes is reset in onmessage:hello
}

runMsgQueue() {
Expand All @@ -157,7 +171,7 @@ class WSControl {
}, 100);
}

onmessage(event: any) {
onmessage(event: MessageEvent) {
let eventData = null;
if (event.data != null) {
eventData = JSON.parse(event.data);
Expand All @@ -173,10 +187,6 @@ class WSControl {
// nothing
return;
}
if (eventData.type == "hello") {
this.reconnectTimes = 0;
return;
}
if (this.messageCallback) {
try {
this.messageCallback(eventData);
Expand Down
1 change: 0 additions & 1 deletion frontend/app/view/webview/webview.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ export class WebViewModel implements ViewModel {
this.homepageUrl = atom((get) => {
const defaultUrl = get(defaultUrlAtom);
const pinnedUrl = get(this.blockAtom).meta.pinnedurl;
console.log("homepageUrl", pinnedUrl, defaultUrl);
return pinnedUrl ?? defaultUrl;
});
this.urlWrapperClassName = atom("");
Expand Down
1 change: 0 additions & 1 deletion pkg/blockcontroller/blockcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts, blockMeta waveobj
bc.ShellProc.Cmd.Write(ic.InputData)
}
if ic.TermSize != nil {
log.Printf("SETTERMSIZE: %dx%d\n", ic.TermSize.Rows, ic.TermSize.Cols)
err = setTermSize(ctx, bc.BlockId, *ic.TermSize)
if err != nil {
log.Printf("error setting pty size: %v\n", err)
Expand Down
31 changes: 16 additions & 15 deletions pkg/web/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ func RunWebSocketServer(listener net.Listener) {
Handler: gr,
}
server.SetKeepAlivesEnabled(false)
log.Printf("Running websocket server on %s\n", listener.Addr())
log.Printf("[websocket] running websocket server on %s\n", listener.Addr())
err := server.Serve(listener)
if err != nil {
log.Printf("[error] trying to run websocket server: %v\n", err)
log.Printf("[websocket] error trying to run websocket server: %v\n", err)
}
}

Expand Down Expand Up @@ -81,7 +81,7 @@ func processWSCommand(jmsg map[string]any, outputCh chan any, rpcInputCh chan []
r := recover()
if r != nil {
rtnErr = fmt.Errorf("panic: %v", r)
log.Printf("panic in processMessage: %v\n", r)
log.Printf("[websocket] panic in processMessage: %v\n", r)
debug.PrintStack()
}
if rtnErr == nil {
Expand All @@ -108,7 +108,7 @@ func processWSCommand(jmsg map[string]any, outputCh chan any, rpcInputCh chan []
msgBytes, err := json.Marshal(rpcMsg)
if err != nil {
// this really should never fail since we just unmarshalled this value
log.Printf("error marshalling rpc message: %v\n", err)
log.Printf("[websocket] error marshalling rpc message: %v\n", err)
return
}
rpcInputCh <- msgBytes
Expand All @@ -125,7 +125,7 @@ func processWSCommand(jmsg map[string]any, outputCh chan any, rpcInputCh chan []
msgBytes, err := json.Marshal(rpcMsg)
if err != nil {
// this really should never fail since we just unmarshalled this value
log.Printf("error marshalling rpc message: %v\n", err)
log.Printf("[websocket] error marshalling rpc message: %v\n", err)
return
}
rpcInputCh <- msgBytes
Expand All @@ -152,21 +152,21 @@ func processMessage(jmsg map[string]any, outputCh chan any, rpcInputCh chan []by
processWSCommand(jmsg, outputCh, rpcInputCh)
}

func ReadLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any, rpcInputCh chan []byte) {
func ReadLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any, rpcInputCh chan []byte, routeId string) {
readWait := wsReadWaitTimeout
conn.SetReadLimit(64 * 1024)
conn.SetReadDeadline(time.Now().Add(readWait))
defer close(closeCh)
for {
_, message, err := conn.ReadMessage()
if err != nil {
log.Printf("ReadPump error: %v\n", err)
log.Printf("[websocket] ReadPump error (%s): %v\n", routeId, err)
break
}
jmsg := map[string]any{}
err = json.Unmarshal(message, &jmsg)
if err != nil {
log.Printf("Error unmarshalling json: %v\n", err)
log.Printf("[websocket] error unmarshalling json: %v\n", err)
break
}
conn.SetReadDeadline(time.Now().Add(readWait))
Expand Down Expand Up @@ -197,7 +197,7 @@ func WritePing(conn *websocket.Conn) error {
return nil
}

func WriteLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any) {
func WriteLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any, routeId string) {
ticker := time.NewTicker(wsInitialPingTime)
defer ticker.Stop()
initialPing := true
Expand All @@ -211,22 +211,22 @@ func WriteLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any) {
} else {
barr, err = json.Marshal(msg)
if err != nil {
log.Printf("cannot marshal websocket message: %v\n", err)
log.Printf("[websocket] cannot marshal websocket message: %v\n", err)
// just loop again
break
}
}
err = conn.WriteMessage(websocket.TextMessage, barr)
if err != nil {
conn.Close()
log.Printf("WritePump error: %v\n", err)
log.Printf("[websocket] WritePump error (%s): %v\n", routeId, err)
return
}

case <-ticker.C:
err := WritePing(conn)
if err != nil {
log.Printf("WritePump error: %v\n", err)
log.Printf("[websocket] WritePump error (%s): %v\n", routeId, err)
return
}
if initialPing {
Expand All @@ -250,6 +250,7 @@ func HandleWsInternal(w http.ResponseWriter, r *http.Request) error {
if err != nil {
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte(fmt.Sprintf("error validating authkey: %v", err)))
log.Printf("[websocket] error validating authkey: %v\n", err)
return err
}
conn, err := WebSocketUpgrader.Upgrade(w, r, nil)
Expand All @@ -258,7 +259,6 @@ func HandleWsInternal(w http.ResponseWriter, r *http.Request) error {
}
defer conn.Close()
wsConnId := uuid.New().String()
log.Printf("New websocket connection: windowid:%s connid:%s\n", windowId, wsConnId)
outputCh := make(chan any, 100)
closeCh := make(chan any)
eventbus.RegisterWSChannel(wsConnId, windowId, outputCh)
Expand All @@ -269,6 +269,7 @@ func HandleWsInternal(w http.ResponseWriter, r *http.Request) error {
routeId = wshutil.MakeWindowRouteId(windowId)
}
defer eventbus.UnregisterWSChannel(wsConnId)
log.Printf("[websocket] new connection: windowid:%s connid:%s routeid:%s\n", windowId, wsConnId, routeId)
// we create a wshproxy to handle rpc messages to/from the window
wproxy := wshutil.MakeRpcProxy()
wshutil.DefaultRouter.RegisterRoute(routeId, wproxy)
Expand All @@ -293,12 +294,12 @@ func HandleWsInternal(w http.ResponseWriter, r *http.Request) error {
go func() {
// read loop
defer wg.Done()
ReadLoop(conn, outputCh, closeCh, wproxy.FromRemoteCh)
ReadLoop(conn, outputCh, closeCh, wproxy.FromRemoteCh, routeId)
}()
go func() {
// write loop
defer wg.Done()
WriteLoop(conn, outputCh, closeCh)
WriteLoop(conn, outputCh, closeCh, routeId)
}()
wg.Wait()
close(wproxy.FromRemoteCh)
Expand Down
1 change: 0 additions & 1 deletion pkg/wshrpc/wshserver/wshserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,6 @@ func (ws *WshServer) EventPublishCommand(ctx context.Context, data wps.WaveEvent
}

func (ws *WshServer) EventSubCommand(ctx context.Context, data wps.SubscriptionRequest) error {
log.Printf("EventSubCommand: %v\n", data)
rpcSource := wshutil.GetRpcSourceFromContext(ctx)
if rpcSource == "" {
return fmt.Errorf("no rpc source set")
Expand Down
Loading