From f229dc010f009d9fd307d34d4aa09b4a58e80bb3 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 19 Nov 2024 17:58:33 -0800 Subject: [PATCH 1/2] tweaks to compare-streams debugger --- cmd/gosky/debug.go | 61 +++++++++++++--------------------------------- 1 file changed, 17 insertions(+), 44 deletions(-) diff --git a/cmd/gosky/debug.go b/cmd/gosky/debug.go index 75d231cfc..c29ca404d 100644 --- a/cmd/gosky/debug.go +++ b/cmd/gosky/debug.go @@ -320,59 +320,32 @@ var compareStreamsCmd = &cli.Command{ make(chan *comatproto.SyncSubscribeRepos_Commit, 2), } - buffers := []map[string][]*comatproto.SyncSubscribeRepos_Commit{ - make(map[string][]*comatproto.SyncSubscribeRepos_Commit), - make(map[string][]*comatproto.SyncSubscribeRepos_Commit), + buffers := []map[string]string{ + make(map[string]string), + make(map[string]string), } addToBuffer := func(n int, event *comatproto.SyncSubscribeRepos_Commit) { - buffers[n][event.Repo] = append(buffers[n][event.Repo], event) + buffers[n][event.Repo] = event.Rev } - pll := func(ll *lexutil.LexLink) string { - if ll == nil { - return "" - } - return ll.String() - } - - findMatchAndRemove := func(n int, event *comatproto.SyncSubscribeRepos_Commit) (*comatproto.SyncSubscribeRepos_Commit, error) { + findMatchAndRemove := func(n int, event *comatproto.SyncSubscribeRepos_Commit) (int, error) { buf := buffers[n] - slice, ok := buf[event.Repo] - if !ok || len(slice) == 0 { - return nil, nil + lastRevSeen, ok := buf[event.Repo] + if !ok || lastRevSeen == "" { + return 0, nil } - for i, ev := range slice { - if ev.Commit == event.Commit { - if pll(ev.Prev) != pll(event.Prev) { - // same commit different prev?? - return nil, fmt.Errorf("matched event with same commit but different prev: (%d) %d - %d", n, ev.Seq, event.Seq) - } - } - - if i != 0 { - fmt.Printf("detected skipped event: %d (%d)\n", slice[0].Seq, i) - } - - slice = slice[i+1:] - buf[event.Repo] = slice - return ev, nil + if event.Rev >= lastRevSeen { + delete(buf, event.Repo) + return 1, nil } - return nil, fmt.Errorf("did not find matching event despite having events in buffer") + return 0, nil } printCurrentDelta := func() { - var a, b int - for _, sl := range buffers[0] { - a += len(sl) - } - for _, sl := range buffers[1] { - b += len(sl) - } - - fmt.Printf("%d %d\n", a, b) + fmt.Printf("%d %d\n", len(buffers[1]), len(buffers[0])) } printDetailedDelta := func() { @@ -418,12 +391,12 @@ var compareStreamsCmd = &cli.Command{ for { select { case event := <-eventChans[0]: - partner, err := findMatchAndRemove(1, event) + found, err := findMatchAndRemove(1, event) if err != nil { fmt.Println("checking for match failed: ", err) continue } - if partner == nil { + if found == 0 { addToBuffer(0, event) } else { // the good case @@ -431,12 +404,12 @@ var compareStreamsCmd = &cli.Command{ } case event := <-eventChans[1]: - partner, err := findMatchAndRemove(0, event) + found, err := findMatchAndRemove(0, event) if err != nil { fmt.Println("checking for match failed: ", err) continue } - if partner == nil { + if found == 0 { addToBuffer(1, event) } else { // the good case From 0976265fae45ea1408c727f25cf8bf09b6c9e560 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 20 Nov 2024 09:19:33 -0800 Subject: [PATCH 2/2] maybe better compare streams command --- cmd/gosky/debug.go | 93 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/cmd/gosky/debug.go b/cmd/gosky/debug.go index c29ca404d..b4dd1f597 100644 --- a/cmd/gosky/debug.go +++ b/cmd/gosky/debug.go @@ -48,6 +48,7 @@ var debugCmd = &cli.Command{ compareStreamsCmd, debugGetRepoCmd, debugCompareReposCmd, + compareStreams2Cmd, }, } @@ -293,6 +294,98 @@ var debugStreamCmd = &cli.Command{ }, } +var compareStreams2Cmd = &cli.Command{ + Name: "compare-streams2", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "host1", + Required: true, + }, + &cli.StringFlag{ + Name: "host2", + Required: true, + }, + }, + ArgsUsage: ``, + Action: func(cctx *cli.Context) error { + h1 := cctx.String("host1") + h2 := cctx.String("host2") + + url1 := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", h1) + url2 := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", h2) + + d := websocket.DefaultDialer + + var buflk sync.Mutex + buffers := []map[string]string{ + make(map[string]string), + make(map[string]string), + } + + // Create two goroutines for reading events from two URLs + for i, url := range []string{url1, url2} { + go func(i int, url string) { + + oi := (i + 1) % 2 + con, _, err := d.Dial(url, http.Header{}) + if err != nil { + log.Fatalf("Dial failure on url%d: %s", i+1, err) + } + + ctx := context.TODO() + rsc := &events.RepoStreamCallbacks{ + RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { + buflk.Lock() + if buffers[oi][evt.Repo] == evt.Rev { + delete(buffers[oi], evt.Repo) + delete(buffers[i], evt.Repo) + } else { + buffers[i][evt.Repo] = evt.Rev + } + buflk.Unlock() + return nil + }, + // TODO: all the other Repo* event types + Error: func(evt *events.ErrorFrame) error { + return fmt.Errorf("%s: %s", evt.Error, evt.Message) + }, + } + seqScheduler := sequential.NewScheduler(fmt.Sprintf("debug-stream-%d", i+1), rsc.EventHandler) + if err := events.HandleRepoStream(ctx, con, seqScheduler); err != nil { + log.Fatalf("HandleRepoStream failure on url%d: %s", i+1, err) + } + }(i, url) + } + + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT) + + // Compare events from the two URLs + for { + select { + case <-time.Tick(time.Second / 2): + buflk.Lock() + a := len(buffers[0]) + b := len(buffers[1]) + buflk.Unlock() + + fmt.Println(a, b) + case <-ch: + //printDetailedDelta() + /* + b, err := json.Marshal(buffers) + if err != nil { + return err + } + + fmt.Println(string(b)) + */ + return nil + } + } + }, +} + var compareStreamsCmd = &cli.Command{ Name: "compare-streams", Flags: []cli.Flag{