diff --git a/prombench/manifests/prombench/benchmark/6_loadgen.yaml b/prombench/manifests/prombench/benchmark/6_loadgen.yaml index 86b72d6cb..8a9aeb1f8 100644 --- a/prombench/manifests/prombench/benchmark/6_loadgen.yaml +++ b/prombench/manifests/prombench/benchmark/6_loadgen.yaml @@ -125,6 +125,7 @@ spec: labels: app: loadgen-querier spec: + initContainers: containers: - name: prom-load-generator image: docker.io/prominfra/load-generator:master @@ -138,6 +139,8 @@ spec: volumeMounts: - name: config-volume mountPath: /etc/loadgen + - name: key + mountPath: /config ports: - name: loadgen-port containerPort: 8080 @@ -145,6 +148,9 @@ spec: - name: config-volume configMap: name: prometheus-loadgen + - name: key + configMap: + name: blocksync-config nodeSelector: node-name: nodes-{{ .PR_NUMBER }} isolation: none diff --git a/tools/load-generator/main.go b/tools/load-generator/main.go index 410c4bdc2..ae45c3fa7 100644 --- a/tools/load-generator/main.go +++ b/tools/load-generator/main.go @@ -14,6 +14,7 @@ package main import ( + "flag" "fmt" "io" "log" @@ -92,6 +93,16 @@ type QueryGroup struct { Step string `yaml:"step,omitempty"` } +type BucketConfig struct { + Path string `yaml:"path"` + MinTime int64 `yaml:"minTime"` + MaxTime int64 `yaml:"maxTime"` +} + +type bucketState struct { + bucketConfig *BucketConfig +} + func NewQuerier(groupID int, target, prNumber string, qg QueryGroup) *Querier { qtype := qg.Type if qtype == "" { @@ -120,16 +131,63 @@ func NewQuerier(groupID int, target, prNumber string, qg QueryGroup) *Querier { } } -func (q *Querier) run(wg *sync.WaitGroup) { +// Function to load `minTime` and `maxTime` from bucket-config.yml +func loadBucketConfig() (*BucketConfig, error) { + filePath := flag.String("bucketconfig-file", "/config/bucket-config.yml", "Path to the bucket configuration file") + flag.Parse() + + _, err := os.Stat(*filePath) + if os.IsNotExist(err) { + return nil, fmt.Errorf("file not found: %s", *filePath) + } + + data, err := os.ReadFile(*filePath) + if err != nil { + return nil, fmt.Errorf("error reading file: %w", err) + } + + var bucketConfig BucketConfig + err = yaml.Unmarshal(data, &bucketConfig) + if err != nil { + return nil, fmt.Errorf("error parsing YAML: %w", err) + } + + return &bucketConfig, nil +} + +func setconfig(v *BucketConfig, err error) *bucketState { + // If there is an error in reading bucket-config.yml file then just return nil. + if err != nil { + return nil + } + return &bucketState{ + bucketConfig: v, + } +} + +func (q *Querier) run(wg *sync.WaitGroup, timeBound *bucketState) { defer wg.Done() fmt.Printf("Running querier %s %s for %s\n", q.target, q.name, q.url) time.Sleep(20 * time.Second) for { start := time.Now() - + // If timeBound is not nil, both the "absolute" and "current" blocks will run; + // otherwise, only the "current" block will execute. This execution pattern is used + // because if Downloaded block data is present, both the head block and downloaded block + // need to be processed. + runBlockMode := "current" for _, query := range q.queries { - q.query(query.Expr) + if runBlockMode == "current" { + q.query(query.Expr, "current", nil) + } else if runBlockMode == "absolute" { + q.query(query.Expr, "absolute", timeBound) + } + if runBlockMode == "current" && timeBound != nil { + runBlockMode = "absolute" + } else if timeBound != nil { + runBlockMode = "current" + } } wait := q.interval - time.Since(start) @@ -139,7 +197,7 @@ func (q *Querier) run(wg *sync.WaitGroup) { } } -func (q *Querier) query(expr string) { +func (q *Querier) query(expr string, timeMode string, timeBound *bucketState) { queryCount.WithLabelValues(q.target, q.name, expr, q.qtype).Inc() start := time.Now() @@ -153,9 +211,19 @@ func (q *Querier) query(expr string) { qParams := req.URL.Query() qParams.Set("query", expr) if q.qtype == "range" { - qParams.Set("start", fmt.Sprintf("%d", int64(time.Now().Add(-q.start).Unix()))) - qParams.Set("end", fmt.Sprintf("%d", int64(time.Now().Add(-q.end).Unix()))) - qParams.Set("step", q.step) + if timeMode == "current" { + qParams.Set("start", fmt.Sprintf("%d", int64(time.Now().Add(-q.start).Unix()))) + qParams.Set("end", fmt.Sprintf("%d", int64(time.Now().Add(-q.end).Unix()))) + qParams.Set("step", q.step) + } else { + endTime := time.Unix(0, timeBound.bucketConfig.MaxTime*int64(time.Millisecond)) + qParams.Set("start", fmt.Sprintf("%d", int64(endTime.Add(-q.start).Unix()))) + qParams.Set("end", fmt.Sprintf("%d", int64(endTime.Add(-q.end).Unix()))) + qParams.Set("step", q.step) + } + } else if timeMode == "absolute" { + blockinstime := time.Unix(0, timeBound.bucketConfig.MaxTime*int64(time.Millisecond)) + qParams.Set("time", fmt.Sprintf("%d", int64(blockinstime.Unix()))) } req.URL.RawQuery = qParams.Encode() @@ -203,9 +271,13 @@ func main() { } prNumber := os.Args[2] - configFile, err := os.ReadFile("/etc/loadgen/config.yaml") + configPath := flag.String("config-file", "/etc/loadgen/config.yaml", "Path to the configuration file") + flag.Parse() + + configFile, err := os.ReadFile(*configPath) if err != nil { - log.Fatalf("Failed to load config: %v", err) + fmt.Printf("Error reading config file: %v\n", err) + return } var config struct { @@ -221,11 +293,14 @@ func main() { var wg sync.WaitGroup + bucketConfig, err := loadBucketConfig() + timeBound := setconfig(bucketConfig, err) + for i, group := range config.Querier.Groups { wg.Add(1) - go NewQuerier(i, "pr", prNumber, group).run(&wg) + go NewQuerier(i, "pr", prNumber, group).run(&wg, timeBound) wg.Add(1) - go NewQuerier(i, "release", prNumber, group).run(&wg) + go NewQuerier(i, "release", prNumber, group).run(&wg, timeBound) } prometheus.MustRegister(queryDuration, queryCount, queryFailCount)