Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize monitoring performance #484

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions charts/hami/templates/device-plugin/daemonsetnvidia.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ spec:
drop: ["ALL"]
add: ["SYS_ADMIN"]
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: NVIDIA_VISIBLE_DEVICES
value: "all"
- name: NVIDIA_MIG_MONITOR_DEVICES
Expand Down
11 changes: 5 additions & 6 deletions cmd/vGPUmonitor/feedback.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"strings"
"time"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/Project-HAMi/HAMi/pkg/monitor/nvidia"

"github.com/NVIDIA/go-nvml/pkg/nvml"
Expand Down Expand Up @@ -260,16 +262,13 @@ func Observe(lister *nvidia.ContainerLister) {
}
}

func watchAndFeedback(lister *nvidia.ContainerLister) {
func watchAndFeedback(lister *nvidia.ContainerLister, stopChan <-chan struct{}) {
nvml.Init()
for {
time.Sleep(time.Second * 5)
wait.Until(func() {
err := lister.Update()
if err != nil {
klog.Errorf("Failed to update container list: %v", err)
continue
}
//klog.Infof("WatchAndFeedback srPodList=%v", srPodList)
Observe(lister)
}
}, time.Second*5, stopChan)
}
48 changes: 40 additions & 8 deletions cmd/vGPUmonitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ limitations under the License.
package main

import (
"context"
"os"
"os/signal"
"syscall"
"time"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"

"github.com/Project-HAMi/HAMi/pkg/lister"
"github.com/Project-HAMi/HAMi/pkg/monitor/nvidia"

"k8s.io/klog/v2"
Expand All @@ -31,17 +42,38 @@ func main() {
if err := ValidateEnvVars(); err != nil {
klog.Fatalf("Failed to validate environment variables: %v", err)
}
containerLister, err := nvidia.NewContainerLister()
config, err := clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG"))
if err != nil {
klog.Fatalf("Failed to create container lister: %v", err)
klog.Fatalf("Failed to build kubeconfig: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to build clientset: %v", err)
}

cgroupDriver = 0
errchannel := make(chan error)
stopChan := make(chan struct{})
//go serveInfo(errchannel)
go initMetrics(containerLister)
go watchAndFeedback(containerLister)
for {
err := <-errchannel
klog.Errorf("failed to serve: %v", err)

podInformer := lister.NewPodInformer(clientset)
go podInformer.Run(stopChan)
ctx, _ := context.WithTimeout(context.Background(), 2*time.Minute)
if !cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced) {
klog.Fatalf("Timed out waiting for caches to sync.")
}
podLister := lister.NewPodLister(podInformer.GetIndexer())
containerLister, err := nvidia.NewContainerLister(podLister)
if err != nil {
klog.Fatalf("Failed to create container lister: %v", err)
}
go watchAndFeedback(containerLister, stopChan)
go initMetrics(podLister, containerLister, stopChan)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
select {
case <-stopChan:
klog.Fatalf("Service terminated abnormally")
case s := <-sigChan:
klog.Infof("Received signal %v, shutting down.", s)
}
}
177 changes: 81 additions & 96 deletions cmd/vGPUmonitor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,17 @@ package main

import (
"fmt"
"log"
"net/http"
"strings"
"os"
"time"

"github.com/Project-HAMi/HAMi/pkg/lister"
"github.com/Project-HAMi/HAMi/pkg/monitor/nvidia"

"github.com/NVIDIA/go-nvml/pkg/nvml"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
listerscorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
)

Expand All @@ -49,7 +46,8 @@ import (
type ClusterManager struct {
Zone string
// Contains many more fields not listed in this example.
PodLister listerscorev1.PodLister
nodeName string
podLister lister.PodLister
containerLister *nvidia.ContainerLister
}

Expand Down Expand Up @@ -179,9 +177,11 @@ func (cc ClusterManagerCollector) Describe(ch chan<- *prometheus.Desc) {
func (cc ClusterManagerCollector) Collect(ch chan<- prometheus.Metric) {
klog.Info("Starting to collect metrics for vGPUMonitor")
containerLister := cc.ClusterManager.containerLister
if err := containerLister.Update(); err != nil {
klog.Error("Update container error: %s", err.Error())
}

// Why do we still need to manually update after starting another goroutine ?
//if err := containerLister.Update(); err != nil {
// klog.Error("Update container error: %s", err.Error())
//}

nvret := nvml.Init()
if nvret != nvml.SUCCESS {
Expand Down Expand Up @@ -229,96 +229,81 @@ func (cc ClusterManagerCollector) Collect(ch chan<- prometheus.Metric) {

}
}

pods, err := cc.ClusterManager.PodLister.List(labels.Everything())
pods, err := cc.ClusterManager.podLister.GetByIndex(lister.PodIndexerKey, cc.ClusterManager.nodeName)
if err != nil {
klog.Error("failed to list pods with err=", err.Error())
return
}
nowSec := time.Now().Unix()

containers := containerLister.ListContainers()
for _, pod := range pods {
for _, c := range containers {
//for sridx := range srPodList {
// if srPodList[sridx].sr == nil {
// continue
// }
if c.Info == nil {
continue
}
//podUID := strings.Split(srPodList[sridx].idstr, "_")[0]
//ctrName := strings.Split(srPodList[sridx].idstr, "_")[1]
podUID := c.PodUID
ctrName := c.ContainerName
if strings.Compare(string(pod.UID), podUID) != 0 {

for _, ctr := range pod.Spec.Containers {
key := fmt.Sprintf("%s_%s", pod.UID, ctr.Name)
c, ok := containerLister.GetUsage(key)
if !ok {
continue
}
fmt.Println("Pod matched!", pod.Name, pod.Namespace, pod.Labels)
for _, ctr := range pod.Spec.Containers {
if strings.Compare(ctr.Name, ctrName) != 0 {
continue
}
fmt.Println("container matched", ctr.Name)
//err := setHostPid(pod, pod.Status.ContainerStatuses[ctridx], &srPodList[sridx])
//if err != nil {
// fmt.Println("setHostPid filed", err.Error())
//}
//fmt.Println("sr.list=", srPodList[sridx].sr)
podlabels := make(map[string]string)
for idx, val := range pod.Labels {
idxfix := strings.ReplaceAll(idx, "-", "_")
valfix := strings.ReplaceAll(val, "-", "_")
podlabels[idxfix] = valfix
}
for i := 0; i < c.Info.DeviceNum(); i++ {
uuid := c.Info.DeviceUUID(i)[0:40]
memoryTotal := c.Info.DeviceMemoryTotal(i)
memoryLimit := c.Info.DeviceMemoryLimit(i)
memoryContextSize := c.Info.DeviceMemoryContextSize(i)
memoryModuleSize := c.Info.DeviceMemoryModuleSize(i)
memoryBufferSize := c.Info.DeviceMemoryBufferSize(i)
memoryOffset := c.Info.DeviceMemoryOffset(i)
smUtil := c.Info.DeviceSmUtil(i)
lastKernelTime := c.Info.LastKernelTime()
fmt.Println("container matched", ctr.Name)
//err := setHostPid(pod, pod.Status.ContainerStatuses[ctridx], &srPodList[sridx])
//if err != nil {
// fmt.Println("setHostPid filed", err.Error())
//}
//fmt.Println("sr.list=", srPodList[sridx].sr)
//podlabels := make(map[string]string)
//for idx, val := range pod.Labels {
// idxfix := strings.ReplaceAll(idx, "-", "_")
// valfix := strings.ReplaceAll(val, "-", "_")
// podlabels[idxfix] = valfix
//}
for i := 0; i < c.Info.DeviceNum(); i++ {
uuid := c.Info.DeviceUUID(i)[0:40]
memoryTotal := c.Info.DeviceMemoryTotal(i)
memoryLimit := c.Info.DeviceMemoryLimit(i)
memoryContextSize := c.Info.DeviceMemoryContextSize(i)
memoryModuleSize := c.Info.DeviceMemoryModuleSize(i)
memoryBufferSize := c.Info.DeviceMemoryBufferSize(i)
memoryOffset := c.Info.DeviceMemoryOffset(i)
smUtil := c.Info.DeviceSmUtil(i)
lastKernelTime := c.Info.LastKernelTime()

//fmt.Println("uuid=", uuid, "length=", len(uuid))
ch <- prometheus.MustNewConstMetric(
ctrvGPUdesc,
prometheus.GaugeValue,
float64(memoryTotal),
pod.Namespace, pod.Name, ctrName, fmt.Sprint(i), uuid, /*,string(sr.sr.uuids[i].uuid[:])*/
)
ch <- prometheus.MustNewConstMetric(
ctrvGPUlimitdesc,
prometheus.GaugeValue,
float64(memoryLimit),
pod.Namespace, pod.Name, ctrName, fmt.Sprint(i), uuid, /*,string(sr.sr.uuids[i].uuid[:])*/
)
ch <- prometheus.MustNewConstMetric(
ctrDeviceMemorydesc,
prometheus.CounterValue,
float64(memoryTotal),
pod.Namespace, pod.Name, ctrName, fmt.Sprint(i), uuid,
fmt.Sprint(memoryContextSize), fmt.Sprint(memoryModuleSize), fmt.Sprint(memoryBufferSize), fmt.Sprint(memoryOffset),
)
//fmt.Println("uuid=", uuid, "length=", len(uuid))
ch <- prometheus.MustNewConstMetric(
ctrvGPUdesc,
prometheus.GaugeValue,
float64(memoryTotal),
pod.Namespace, pod.Name, ctr.Name, fmt.Sprint(i), uuid, /*,string(sr.sr.uuids[i].uuid[:])*/
)
ch <- prometheus.MustNewConstMetric(
ctrvGPUlimitdesc,
prometheus.GaugeValue,
float64(memoryLimit),
pod.Namespace, pod.Name, ctr.Name, fmt.Sprint(i), uuid, /*,string(sr.sr.uuids[i].uuid[:])*/
)
ch <- prometheus.MustNewConstMetric(
ctrDeviceMemorydesc,
prometheus.CounterValue,
float64(memoryTotal),
pod.Namespace, pod.Name, ctr.Name, fmt.Sprint(i), uuid,
fmt.Sprint(memoryContextSize), fmt.Sprint(memoryModuleSize), fmt.Sprint(memoryBufferSize), fmt.Sprint(memoryOffset),
)
ch <- prometheus.MustNewConstMetric(
ctrDeviceUtilizationdesc,
prometheus.GaugeValue,
float64(smUtil),
pod.Namespace, pod.Name, ctr.Name, fmt.Sprint(i), uuid,
)
if lastKernelTime > 0 {
lastSec := nowSec - lastKernelTime
if lastSec < 0 {
lastSec = 0
}
ch <- prometheus.MustNewConstMetric(
ctrDeviceUtilizationdesc,
ctrDeviceLastKernelDesc,
prometheus.GaugeValue,
float64(smUtil),
pod.Namespace, pod.Name, ctrName, fmt.Sprint(i), uuid,
float64(lastSec),
pod.Namespace, pod.Name, ctr.Name, fmt.Sprint(i), uuid,
)
if lastKernelTime > 0 {
lastSec := nowSec - lastKernelTime
if lastSec < 0 {
lastSec = 0
}
ch <- prometheus.MustNewConstMetric(
ctrDeviceLastKernelDesc,
prometheus.GaugeValue,
float64(lastSec),
pod.Namespace, pod.Name, ctrName, fmt.Sprint(i), uuid,
)
}
}
}
}
Expand All @@ -330,23 +315,20 @@ func (cc ClusterManagerCollector) Collect(ch chan<- prometheus.Metric) {
// ClusterManager. Finally, it registers the ClusterManagerCollector with a
// wrapping Registerer that adds the zone as a label. In this way, the metrics
// collected by different ClusterManagerCollectors do not collide.
func NewClusterManager(zone string, reg prometheus.Registerer, containerLister *nvidia.ContainerLister) *ClusterManager {
func NewClusterManager(zone string, reg prometheus.Registerer, podLister lister.PodLister, containerLister *nvidia.ContainerLister) *ClusterManager {
c := &ClusterManager{
Zone: zone,
containerLister: containerLister,
nodeName: os.Getenv("NODE_NAME"),
podLister: podLister,
}

informerFactory := informers.NewSharedInformerFactoryWithOptions(containerLister.Clientset(), time.Hour*1)
c.PodLister = informerFactory.Core().V1().Pods().Lister()
stopCh := make(chan struct{})
informerFactory.Start(stopCh)

cc := ClusterManagerCollector{ClusterManager: c}
prometheus.WrapRegistererWith(prometheus.Labels{"zone": zone}, reg).MustRegister(cc)
return c
}

func initMetrics(containerLister *nvidia.ContainerLister) {
func initMetrics(podLister lister.PodLister, containerLister *nvidia.ContainerLister, stopChan chan<- struct{}) {
// Since we are dealing with custom Collector implementations, it might
// be a good idea to try it out with a pedantic registry.
klog.Info("Initializing metrics for vGPUmonitor")
Expand All @@ -355,7 +337,7 @@ func initMetrics(containerLister *nvidia.ContainerLister) {

// Construct cluster managers. In real code, we would assign them to
// variables to then do something with them.
NewClusterManager("vGPU", reg, containerLister)
NewClusterManager("vGPU", reg, podLister, containerLister)
//NewClusterManager("ca", reg)

// Add the standard process and Go metrics to the custom registry.
Expand All @@ -365,5 +347,8 @@ func initMetrics(containerLister *nvidia.ContainerLister) {
//)

http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
log.Fatal(http.ListenAndServe(":9394", nil))
if err := http.ListenAndServe(":9394", nil); err != nil {
klog.Errorf("failed to serve: %v", err)
close(stopChan)
}
}
1 change: 1 addition & 0 deletions cmd/vGPUmonitor/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
)

var requiredEnvVars = map[string]bool{
"NODE_NAME": true,
"HOOK_PATH": true,
"OTHER_ENV_VAR": false,
}
Expand Down
Loading
Loading