Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tweaks to compare-streams debugger #831

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 110 additions & 44 deletions cmd/gosky/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var debugCmd = &cli.Command{
compareStreamsCmd,
debugGetRepoCmd,
debugCompareReposCmd,
compareStreams2Cmd,
},
}

Expand Down Expand Up @@ -293,6 +294,98 @@ var debugStreamCmd = &cli.Command{
},
}

var compareStreams2Cmd = &cli.Command{
Name: "compare-streams2",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "host1",
Required: true,
},
&cli.StringFlag{
Name: "host2",
Required: true,
},
},
ArgsUsage: `<cursor>`,
Action: func(cctx *cli.Context) error {
h1 := cctx.String("host1")
h2 := cctx.String("host2")

url1 := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", h1)
url2 := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", h2)

d := websocket.DefaultDialer

var buflk sync.Mutex
buffers := []map[string]string{
make(map[string]string),
make(map[string]string),
}

// Create two goroutines for reading events from two URLs
for i, url := range []string{url1, url2} {
go func(i int, url string) {

oi := (i + 1) % 2
con, _, err := d.Dial(url, http.Header{})
if err != nil {
log.Fatalf("Dial failure on url%d: %s", i+1, err)
}

ctx := context.TODO()
rsc := &events.RepoStreamCallbacks{
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
buflk.Lock()
if buffers[oi][evt.Repo] == evt.Rev {
delete(buffers[oi], evt.Repo)
delete(buffers[i], evt.Repo)
} else {
buffers[i][evt.Repo] = evt.Rev
}
buflk.Unlock()
return nil
},
// TODO: all the other Repo* event types
Error: func(evt *events.ErrorFrame) error {
return fmt.Errorf("%s: %s", evt.Error, evt.Message)
},
}
seqScheduler := sequential.NewScheduler(fmt.Sprintf("debug-stream-%d", i+1), rsc.EventHandler)
if err := events.HandleRepoStream(ctx, con, seqScheduler); err != nil {
log.Fatalf("HandleRepoStream failure on url%d: %s", i+1, err)
}
}(i, url)
}

ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT)

// Compare events from the two URLs
for {
select {
case <-time.Tick(time.Second / 2):
buflk.Lock()
a := len(buffers[0])
b := len(buffers[1])
buflk.Unlock()

fmt.Println(a, b)
case <-ch:
//printDetailedDelta()
/*
b, err := json.Marshal(buffers)
if err != nil {
return err
}

fmt.Println(string(b))
*/
return nil
}
}
},
}

var compareStreamsCmd = &cli.Command{
Name: "compare-streams",
Flags: []cli.Flag{
Expand Down Expand Up @@ -320,59 +413,32 @@ var compareStreamsCmd = &cli.Command{
make(chan *comatproto.SyncSubscribeRepos_Commit, 2),
}

buffers := []map[string][]*comatproto.SyncSubscribeRepos_Commit{
make(map[string][]*comatproto.SyncSubscribeRepos_Commit),
make(map[string][]*comatproto.SyncSubscribeRepos_Commit),
buffers := []map[string]string{
make(map[string]string),
make(map[string]string),
}

addToBuffer := func(n int, event *comatproto.SyncSubscribeRepos_Commit) {
buffers[n][event.Repo] = append(buffers[n][event.Repo], event)
}

pll := func(ll *lexutil.LexLink) string {
if ll == nil {
return "<nil>"
}
return ll.String()
buffers[n][event.Repo] = event.Rev
}

findMatchAndRemove := func(n int, event *comatproto.SyncSubscribeRepos_Commit) (*comatproto.SyncSubscribeRepos_Commit, error) {
findMatchAndRemove := func(n int, event *comatproto.SyncSubscribeRepos_Commit) (int, error) {
buf := buffers[n]
slice, ok := buf[event.Repo]
if !ok || len(slice) == 0 {
return nil, nil
lastRevSeen, ok := buf[event.Repo]
if !ok || lastRevSeen == "" {
return 0, nil
}

for i, ev := range slice {
if ev.Commit == event.Commit {
if pll(ev.Prev) != pll(event.Prev) {
// same commit different prev??
return nil, fmt.Errorf("matched event with same commit but different prev: (%d) %d - %d", n, ev.Seq, event.Seq)
}
}

if i != 0 {
fmt.Printf("detected skipped event: %d (%d)\n", slice[0].Seq, i)
}

slice = slice[i+1:]
buf[event.Repo] = slice
return ev, nil
if event.Rev >= lastRevSeen {
delete(buf, event.Repo)
return 1, nil
}

return nil, fmt.Errorf("did not find matching event despite having events in buffer")
return 0, nil
}

printCurrentDelta := func() {
var a, b int
for _, sl := range buffers[0] {
a += len(sl)
}
for _, sl := range buffers[1] {
b += len(sl)
}

fmt.Printf("%d %d\n", a, b)
fmt.Printf("%d %d\n", len(buffers[1]), len(buffers[0]))
}

printDetailedDelta := func() {
Expand Down Expand Up @@ -418,25 +484,25 @@ var compareStreamsCmd = &cli.Command{
for {
select {
case event := <-eventChans[0]:
partner, err := findMatchAndRemove(1, event)
found, err := findMatchAndRemove(1, event)
if err != nil {
fmt.Println("checking for match failed: ", err)
continue
}
if partner == nil {
if found == 0 {
addToBuffer(0, event)
} else {
// the good case
fmt.Println("Match found")
}

case event := <-eventChans[1]:
partner, err := findMatchAndRemove(0, event)
found, err := findMatchAndRemove(0, event)
if err != nil {
fmt.Println("checking for match failed: ", err)
continue
}
if partner == nil {
if found == 0 {
addToBuffer(1, event)
} else {
// the good case
Expand Down
Loading