Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve watch latency benchmark #17562

Merged
merged 1 commit into from
Mar 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 82 additions & 40 deletions tools/benchmark/cmd/watch_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"os"
"sync"
"time"

"github.com/cheggaaa/pb/v3"
Expand All @@ -40,15 +39,22 @@ var watchLatencyCmd = &cobra.Command{
}

var (
watchLTotal int
watchLPutRate int
watchLKeySize int
watchLValueSize int
watchLPutTotal int
watchLPutRate int
watchLKeySize int
watchLValueSize int
watchLStreams int
watchLWatchersPerStream int
watchLPrevKV bool
)

func init() {
RootCmd.AddCommand(watchLatencyCmd)
watchLatencyCmd.Flags().IntVar(&watchLTotal, "total", 10000, "Total number of put requests")
watchLatencyCmd.Flags().IntVar(&watchLStreams, "streams", 10, "Total watch streams")
watchLatencyCmd.Flags().IntVar(&watchLWatchersPerStream, "watchers-per-stream", 10, "Total watchers per stream")
watchLatencyCmd.Flags().BoolVar(&watchLPrevKV, "prevkv", false, "PrevKV enabled on watch requests")

watchLatencyCmd.Flags().IntVar(&watchLPutTotal, "put-total", 1000, "Total number of put requests")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be trying to align the defaults across the board at 10,000?

 james  ~  D  e  t  benchmark  ?6  benchmark-put-..                                                                                  19:30:38 
 ➜ grep -Ri "1000" | grep -i intvar
cmd/put.go:     putCmd.Flags().IntVar(&putTotal, "total", 10000, "Total number of put requests")
cmd/stm.go:     stmCmd.Flags().IntVar(&stmTotal, "total", 10000, "Total number of completed STM transactions")
cmd/txn_put.go: txnPutCmd.Flags().IntVar(&txnPutTotal, "total", 10000, "Total number of txn requests")
cmd/watch.go:   watchCmd.Flags().IntVar(&watchPutTotal, "put-total", 1000, "Number of put requests")
cmd/range.go:   rangeCmd.Flags().IntVar(&rangeTotal, "total", 10000, "Total number of range requests")
cmd/watch_get.go:       watchGetCmd.Flags().IntVar(&watchGetTotalWatchers, "watchers", 10000, "Total number of watchers")
cmd/lease.go:   leaseKeepaliveCmd.Flags().IntVar(&leaseKeepaliveTotal, "total", 10000, "Total number of lease keepalive requests")
cmd/watch_latency.go:   watchLatencyCmd.Flags().IntVar(&watchLPutTotal, "put-total", 1000, "Total number of put requests")
cmd/mvcc.go:    mvccCmd.PersistentFlags().IntVar(&batchLimit, "batch-limit", 10000, "A limit of batched transaction")
cmd/txn_mixed.go:       mixedTxnCmd.Flags().IntVar(&mixedTxnTotal, "total", 10000, "Total number of txn requests")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark code is not very organized. I planned to make a direct improvement that we require for testing prevKV, and leave cleanups for later.

watchLatencyCmd.Flags().IntVar(&watchLPutRate, "put-rate", 100, "Number of keys to put per second")
watchLatencyCmd.Flags().IntVar(&watchLKeySize, "key-size", 32, "Key size of watch response")
watchLatencyCmd.Flags().IntVar(&watchLValueSize, "val-size", 32, "Value size of watch response")
Expand All @@ -57,54 +63,90 @@ func init() {
func watchLatencyFunc(_ *cobra.Command, _ []string) {
key := string(mustRandBytes(watchLKeySize))
value := string(mustRandBytes(watchLValueSize))

clients := mustCreateClients(totalClients, totalConns)
wchs := setupWatchChannels(key)
putClient := mustCreateConn()

wchs := make([]clientv3.WatchChan, len(clients))
for i := range wchs {
wchs[i] = clients[i].Watch(context.TODO(), key)
}

bar = pb.New(watchLTotal)
bar = pb.New(watchLPutTotal * len(wchs))
bar.Start()

limiter := rate.NewLimiter(rate.Limit(watchLPutRate), watchLPutRate)
r := newReport()
rc := r.Run()

for i := 0; i < watchLTotal; i++ {
putTimes := make([]time.Time, watchLPutTotal)
eventTimes := make([][]time.Time, len(wchs))

for i, wch := range wchs {
wch := wch
i := i
eventTimes[i] = make([]time.Time, watchLPutTotal)
wg.Add(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the wg defined?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

He he, didn't notice that, it's a global wg for all benchmarks. Would prefer to leave it as it is to at least be consistent with other benchmarks.

go func() {
defer wg.Done()
eventCount := 0
for eventCount < watchLPutTotal {
resp := <-wch
for range resp.Events {
eventTimes[i][eventCount] = time.Now()
eventCount++
bar.Increment()
}
}
}()
}

putReport := newReport()
putReportResults := putReport.Run()
watchReport := newReport()
watchReportResults := watchReport.Run()
for i := 0; i < watchLPutTotal; i++ {
// limit key put as per reqRate
if err := limiter.Wait(context.TODO()); err != nil {
break
}

var st time.Time
var wg sync.WaitGroup
wg.Add(len(clients))
barrierc := make(chan struct{})
for _, wch := range wchs {
ch := wch
go func() {
<-barrierc
<-ch
r.Results() <- report.Result{Start: st, End: time.Now()}
wg.Done()
}()
}

start := time.Now()
if _, err := putClient.Put(context.TODO(), key, value); err != nil {
fmt.Fprintf(os.Stderr, "Failed to Put for watch latency benchmark: %v\n", err)
os.Exit(1)
}

st = time.Now()
close(barrierc)
wg.Wait()
bar.Increment()
end := time.Now()
putReport.Results() <- report.Result{Start: start, End: end}
putTimes[i] = end
}

close(r.Results())
wg.Wait()
close(putReport.Results())
bar.Finish()
fmt.Printf("%s", <-rc)
fmt.Printf("\nPut summary:\n%s", <-putReportResults)

for i := 0; i < len(wchs); i++ {
for j := 0; j < watchLPutTotal; j++ {
start := putTimes[j]
end := eventTimes[i][j]
if end.Before(start) {
start = end
}
watchReport.Results() <- report.Result{Start: start, End: end}
Comment on lines +121 to +126
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic is too fragile. The startTime is recorded in the main goroutine when putting the key, while the endTime is recorded in the watch gouroutines, and you correlate the startTime with the endTime using an index.

Suggest to use WithPrefix when watching, and put a different key each time when putting, e.g. "foo1", "foo2", etc. And we use the exact key to correlate the startTime with the endTime.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fragile? Puts are done sequentially and etcd guarantees that responses are sequential to writes. Not sure I would call it fragile or at least up to the etcd correctness, whoever I don't think we should care about this in benchmarks.

Copy link
Member

@ahrtr ahrtr Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "fragile" isn't about the etcd's correctness, it's about the readability, it's about the code itself. There may be network jitter, the put may fail. Currently it looks OK because the tool just exits directly when put fails. I think we shouldn't exit in such case, we are running benchmark test, why do we care about some occasional failures due to env problems (e.g. network jitter)? Some time later, other contributors may even update the implementation not exiting in such case.

Also depending on index (correlate the times by an index) reduces the readability, also increase review burden. We need super carefulness to ensure there is no +1/-1 problem.

Anyway we should make the implementation more robustness & less error prone by itself.

if _, err := putClient.Put(context.TODO(), key, value); err != nil {
fmt.Fprintf(os.Stderr, "Failed to Put for watch latency benchmark: %v\n", err)
os.Exit(1)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't insist on this for now, because it works for now as mentioned above "Currently it looks OK because the tool just exits directly when put fails.".

Please let me know if you want to keep it as it's or plan to change it based on the proposed above.

Copy link
Member Author

@serathius serathius Mar 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't plan to change it for now. I think it could be improved, however I'm not as concerned with making benchmark code super readable, as it's not often changed nor critical for etcd functionality (at least until we start to really depend in automatic performance regression detection by @jmhbnz). I think we could just merge it to make sure the improvements are available for everyone to use.

@ahrtr do you see any blockers?

}
}

close(watchReport.Results())
fmt.Printf("\nWatch events summary:\n%s", <-watchReportResults)
}

func setupWatchChannels(key string) []clientv3.WatchChan {
clients := mustCreateClients(totalClients, totalConns)

streams := make([]clientv3.Watcher, watchLStreams)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that it doesn't really create watchLStreams gRPC streams. Instead it just create totalConns gRPC streams?

Copy link
Member Author

@serathius serathius Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by streams here I mean watch streams aka Watch RPC. I based the code on tools/benchmark/cmd/watch.go where I discovered that you can separate watch requests into different stream by creating a new watcher. This reuses same grpc connection as you pass it the same etcd client, however creates a separate stream as those are stored on watcher. Seems like a better way to separate streams approach I proposed in Kubernetes via grpc context metadata.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discovered that you can separate watch requests into different stream by creating a new watcher.

Yes, it's true.

Each gRPC connection may be reused by multiple gRPC streams, and each gRPC stream can be reused by multiple etcd watchers.

for i := range streams {
streams[i] = clientv3.NewWatcher(clients[i%len(clients)])
}
opts := []clientv3.OpOption{}
if watchLPrevKV {
opts = append(opts, clientv3.WithPrevKV())
}
wchs := make([]clientv3.WatchChan, len(streams)*watchLWatchersPerStream)
for i := 0; i < len(streams); i++ {
for j := 0; j < watchLWatchersPerStream; j++ {
wchs[i*len(streams)+j] = streams[i].Watch(context.TODO(), key, opts...)
}
}
return wchs
}
Loading