Skip to content

Commit

Permalink
Implement volume health monitoring feature to detect abnormal volumes
Browse files Browse the repository at this point in the history
  • Loading branch information
Praveenrajmani committed Jul 24, 2023
1 parent 136b8ac commit 68ce3d2
Show file tree
Hide file tree
Showing 23 changed files with 1,041 additions and 63 deletions.
18 changes: 17 additions & 1 deletion cmd/directpv/node-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"os"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/minio/directpv/pkg/consts"
Expand All @@ -33,7 +34,11 @@ import (
"k8s.io/klog/v2"
)

var metricsPort = consts.MetricsPort
var (
metricsPort = consts.MetricsPort
volumeHealthMonitorInterval = 10 * time.Minute
enableVolumeHealthMonitor bool
)

var nodeServerCmd = &cobra.Command{
Use: consts.NodeServerName,
Expand All @@ -53,6 +58,8 @@ var nodeServerCmd = &cobra.Command{

func init() {
nodeServerCmd.PersistentFlags().IntVar(&metricsPort, "metrics-port", metricsPort, "Metrics port at "+consts.AppPrettyName+" exports metrics data")
nodeServerCmd.PersistentFlags().BoolVar(&enableVolumeHealthMonitor, "enable-volume-health-monitor", enableVolumeHealthMonitor, "Enable volume health monitoring")
nodeServerCmd.PersistentFlags().DurationVar(&volumeHealthMonitorInterval, "volume-health-monitor-interval", volumeHealthMonitorInterval, "interval for volume health monitoring in duration. Example: '20m','1h'")
}

func startNodeServer(ctx context.Context) error {
Expand Down Expand Up @@ -111,5 +118,14 @@ func startNodeServer(ctx context.Context) error {
}
}()

if enableVolumeHealthMonitor {
go func() {
if err := volume.StartHealthMonitor(ctx, nodeID, volumeHealthMonitorInterval); err != nil {
klog.ErrorS(err, "unable to start volume health monitor")
errCh <- err
}
}()
}

return <-errCh
}
39 changes: 22 additions & 17 deletions cmd/kubectl-directpv/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,22 @@ import (
)

var (
image = consts.AppName + ":" + Version
registry = "quay.io"
org = "minio"
nodeSelectorArgs = []string{}
tolerationArgs = []string{}
seccompProfile = ""
apparmorProfile = ""
imagePullSecrets = []string{}
nodeSelector map[string]string
tolerations []corev1.Toleration
k8sVersion = "1.27.0"
kubeVersion *version.Version
legacyFlag bool
declarativeFlag bool
openshiftFlag bool
image = consts.AppName + ":" + Version
registry = "quay.io"
org = "minio"
nodeSelectorArgs = []string{}
tolerationArgs = []string{}
seccompProfile = ""
apparmorProfile = ""
imagePullSecrets = []string{}
nodeSelector map[string]string
tolerations []corev1.Toleration
k8sVersion = "1.27.0"
kubeVersion *version.Version
legacyFlag bool
declarativeFlag bool
openshiftFlag bool
enableVolumeHealthMonitor bool
)

var installCmd = &cobra.Command{
Expand Down Expand Up @@ -82,7 +83,10 @@ var installCmd = &cobra.Command{
$ kubectl {PLUGIN_NAME} install --apparmor-profile directpv
7. Install DirectPV with seccomp profile
$ kubectl {PLUGIN_NAME} install --seccomp-profile profiles/seccomp.json`,
$ kubectl {PLUGIN_NAME} install --seccomp-profile profiles/seccomp.json
8. Install DirectPV with volume health monitoring enabled
$ kubectl {PLUGIN_NAME} install --enable-volume-health-monitoring`,
`{PLUGIN_NAME}`,
consts.AppName,
),
Expand Down Expand Up @@ -128,6 +132,7 @@ func init() {
installCmd.PersistentFlags().BoolVar(&declarativeFlag, "declarative", declarativeFlag, "Output YAML for declarative installation")
installCmd.PersistentFlags().MarkHidden("declarative")
installCmd.PersistentFlags().BoolVar(&openshiftFlag, "openshift", openshiftFlag, "Use OpenShift specific installation")
installCmd.PersistentFlags().BoolVar(&enableVolumeHealthMonitor, "enable-volume-health-monitoring", enableVolumeHealthMonitor, "Enable volume health monitoring")
}

func validateNodeSelectorArgs() error {
Expand Down Expand Up @@ -311,7 +316,7 @@ func installMain(ctx context.Context) {
}
}
args.Declarative = declarativeFlag
args.Openshift = openshiftFlag
args.EnableVolumeHealthMonitor = enableVolumeHealthMonitor

var failed bool
var installedComponents []installer.Component
Expand Down
2 changes: 2 additions & 0 deletions cmd/kubectl-directpv/list_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ func listVolumesMain(ctx context.Context) {
status = "Released"
case volume.IsDriveLost():
status = "Lost"
case volume.HasError():
status = "Error"
case volume.IsPublished():
status = "Bounded"
}
Expand Down
24 changes: 21 additions & 3 deletions docs/metrics.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
---
title: Metrics
title: Metrics and Monitoring
---

Monitoring guidelines
----------------------
### Enable Prometheus metrics
-------------------------

DirectPV nodes export Prometheus compatible metrics data by exposing a metrics endpoint at /directpv/metrics. Users looking to monitor their tenants can point Prometheus configuration to scrape data from this endpoint.

Expand Down Expand Up @@ -81,3 +81,21 @@ directpv_stats_bytes_total{node="node-3"}
```
directpv_stats_bytes_used{tenant="tenant-1", node="node-5"}
```

### Volume health monitoring

This is a [CSI feature](https://kubernetes.io/docs/concepts/storage/volume-health-monitoring/) introduced as an Alpha feature in Kubernetes v1.19 and a second Alpha was done in v1.21. This feature is to detect "abnormal" volume conditions and report them as events on PVCs and Pods. A DirectPV volume will be considered as "abnormal" if the respective volume mounts are not present in the host.

For node side monitoring, the feature gate `CSIVolumeHealth` needs to be enabled. However, DirectPV also installs external health monitor controller which monitors and reports volume health events to PVCs.

To enable volume health monitoring, Install directpv with `--enable-volume-health-monitoring` flag.

```sh
kubectl directpv install --enable-volume-health-monitoring
```

For private registries, please note that the following image is required for enabling volume health monitoring

```
quay.io/minio/csi-external-health-monitor-controller:v0.9.0
```
11 changes: 8 additions & 3 deletions pkg/apis/directpv.min.io/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,23 +120,28 @@ type VolumeConditionType string

// Enum value of VolumeConditionType type.
const (
VolumeConditionTypeLost VolumeConditionType = "Lost"
VolumeConditionTypeLost VolumeConditionType = "Lost"
VolumeConditionTypeError VolumeConditionType = "Error"
)

// VolumeConditionReason denotes volume reason. Allows maximum upto 1024 chars.
type VolumeConditionReason string

// Enum values of VolumeConditionReason type.
const (
VolumeConditionReasonDriveLost VolumeConditionReason = "DriveLost"
VolumeConditionReasonDriveLost VolumeConditionReason = "DriveLost"
VolumeConditionReasonNotMounted VolumeConditionReason = "NotMounted"
VolumeConditionReasonNoError VolumeConditionReason = "NoError"
)

// VolumeConditionMessage denotes drive message. Allows maximum upto 32768 chars.
type VolumeConditionMessage string

// Enum values of VolumeConditionMessage type.
const (
VolumeConditionMessageDriveLost VolumeConditionMessage = "Associated drive was removed. Refer https://github.com/minio/directpv/blob/master/docs/troubleshooting.md"
VolumeConditionMessageDriveLost VolumeConditionMessage = "Associated drive was removed. Refer https://github.com/minio/directpv/blob/master/docs/troubleshooting.md"
VolumeConditionMessageStagingPathNotMounted VolumeConditionMessage = "Staging path is umounted from the host. Please restart the workload"
VolumeConditionMessageTargetPathNotMounted VolumeConditionMessage = "Target path is umounted from the host. Please restart the workload"
)

// DriveConditionType denotes drive condition. Allows maximum upto 316 chars.
Expand Down
25 changes: 25 additions & 0 deletions pkg/apis/directpv.min.io/v1beta1/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1beta1
import (
"github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/consts"
"github.com/minio/directpv/pkg/k8s"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -121,6 +122,12 @@ func (volume DirectPVVolume) IsDriveLost() bool {
return false
}

// HasError returns if the volume is in error state.
func (volume DirectPVVolume) HasError() bool {
condition := k8s.GetConditionByType(volume.Status.Conditions, string(types.VolumeConditionTypeError))
return condition != nil && k8s.ConditionStatusToBool(condition.Status)
}

// SetDriveLost sets associated drive is lost.
func (volume *DirectPVVolume) SetDriveLost() {
c := metav1.Condition{
Expand Down Expand Up @@ -286,6 +293,24 @@ func (volume DirectPVVolume) GetTenantName() string {
return string(volume.getLabel(types.LabelKey(Group + "/tenant")))
}

// ResetStageMountErrorCondition resets the stage volume mount error condition.
func (volume *DirectPVVolume) ResetStageMountErrorCondition() {
k8s.ResetConditionIfMatches(volume.Status.Conditions,
string(types.VolumeConditionTypeError),
string(types.VolumeConditionReasonNotMounted),
string(types.VolumeConditionMessageStagingPathNotMounted),
string(types.VolumeConditionReasonNoError))
}

// ResetTargetMountErrorCondition resets the target volume mount error condition.
func (volume *DirectPVVolume) ResetTargetMountErrorCondition() {
k8s.ResetConditionIfMatches(volume.Status.Conditions,
string(types.VolumeConditionTypeError),
string(types.VolumeConditionReasonNotMounted),
string(types.VolumeConditionMessageTargetPathNotMounted),
string(types.VolumeConditionReasonNoError))
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// DirectPVVolumeList denotes list of volumes.
Expand Down
3 changes: 3 additions & 0 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,7 @@ const (

LegacyNodeServerName = "legacy-node-server"
LegacyControllerServerName = "legacy-controller"

// Volume Health Monitor
VolumeHealthMonitorIntervalInDuration = "1m"
)
3 changes: 3 additions & 0 deletions pkg/consts/consts.go.in
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,7 @@ const (

LegacyNodeServerName = "legacy-node-server"
LegacyControllerServerName = "legacy-controller"

// Volume Health Monitor
VolumeHealthMonitorIntervalInDuration = "1m"
)
64 changes: 60 additions & 4 deletions pkg/csi/controller/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,21 @@ func (c *Server) ControllerGetCapabilities(_ context.Context, _ *csi.ControllerG
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_EXPAND_VOLUME},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_LIST_VOLUMES},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_GET_VOLUME},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_VOLUME_CONDITION},
},
},
},
}, nil
}
Expand Down Expand Up @@ -347,8 +362,34 @@ func (c *Server) ControllerExpandVolume(ctx context.Context, req *csi.Controller

// ListVolumes implements ListVolumes controller RPC
// reference: https://github.com/container-storage-interface/spec/blob/master/spec.md#listvolumes
func (c *Server) ListVolumes(_ context.Context, _ *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
return nil, status.Error(codes.Unimplemented, "unimplemented")
func (c *Server) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
result, err := client.VolumeClient().List(ctx, metav1.ListOptions{
Limit: int64(req.GetMaxEntries()),
Continue: req.GetStartingToken(),
})
if err != nil {
if req.GetStartingToken() != "" {
return nil, status.Errorf(codes.Aborted, "unable to list volumes: %v", err)
}
return nil, status.Errorf(codes.Internal, "unable to list volumes: %v", err)
}
var volumeEntries []*csi.ListVolumesResponse_Entry
for _, volume := range result.Items {
csiVolume, err := getCSIVolume(ctx, &volume)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
volumeEntries = append(volumeEntries, &csi.ListVolumesResponse_Entry{
Volume: csiVolume,
Status: &csi.ListVolumesResponse_VolumeStatus{
VolumeCondition: getCSIVolumeCondition(&volume),
},
})
}
return &csi.ListVolumesResponse{
Entries: volumeEntries,
NextToken: result.Continue,
}, nil
}

// ControllerPublishVolume - controller RPC to publish volumes
Expand All @@ -365,8 +406,23 @@ func (c *Server) ControllerUnpublishVolume(_ context.Context, _ *csi.ControllerU

// ControllerGetVolume - controller RPC for get volume
// reference: https://github.com/container-storage-interface/spec/blob/master/spec.md#controllergetvolume
func (c *Server) ControllerGetVolume(_ context.Context, _ *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "unimplemented")
func (c *Server) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
volume, err := client.VolumeClient().Get(
ctx, req.GetVolumeId(), metav1.GetOptions{TypeMeta: types.NewVolumeTypeMeta()},
)
if err != nil {
return nil, status.Error(codes.NotFound, err.Error())
}
csiVolume, err := getCSIVolume(ctx, volume)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.ControllerGetVolumeResponse{
Volume: csiVolume,
Status: &csi.ControllerGetVolumeResponse_VolumeStatus{
VolumeCondition: getCSIVolumeCondition(volume),
},
}, nil
}

// ListSnapshots - controller RPC for listing snapshots
Expand Down
Loading

0 comments on commit 68ce3d2

Please sign in to comment.