Skip to content

Commit

Permalink
csi-wrapper: add peerpod volume support for manually created PVs
Browse files Browse the repository at this point in the history
To support usage of manually created persistent volumes,
a peerpod volume CR needs to be created during `ControllerPublishVolume`,

Signed-off-by: Daniel Weiße <[email protected]>
  • Loading branch information
daniel-weisse authored and katexochen committed Oct 18, 2024
1 parent 488681f commit b5542aa
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 68 deletions.
158 changes: 93 additions & 65 deletions src/csi-wrapper/pkg/wrapper/controllerservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const (

// Parameter key for Peer Pod from StorageClass
PeerpodParamKey = "peerpod"

// peerpodVolumeNamePlaceholder is used when creating a peerpodvolume from an existing PV.
// For existing PVs, we have to create the peerpodvolume CR without knowing the real volume name in [ControllerService.ControllerPublishVolume].
// We only later know the real volume name when we process the CR again in [NodeService.NodePublishVolume].
peerpodVolumeNamePlaceholder = "peerpod-volume-name-placeholder"
)

type ControllerService struct {
Expand Down Expand Up @@ -86,26 +91,7 @@ func (s *ControllerService) CreateVolume(ctx context.Context, req *csi.CreateVol
if peerpod != "" {
volumeID := res.GetVolume().VolumeId
normalizedVolumeID := utils.NormalizeVolumeID(volumeID)
labels := map[string]string{
"volumeName": volumeName,
}
newPeerpodvolume := &v1alpha1.PeerpodVolume{
ObjectMeta: metav1.ObjectMeta{
Name: normalizedVolumeID,
Namespace: s.Namespace,
Labels: labels,
},
Spec: v1alpha1.PeerpodVolumeSpec{
VolumeID: normalizedVolumeID,
VolumeName: volumeName,
},
}
_, err = s.PeerpodvolumeClient.ConfidentialcontainersV1alpha1().PeerpodVolumes(s.Namespace).Create(context.Background(), newPeerpodvolume, metav1.CreateOptions{})
if err != nil {
glog.Errorf("Error happens while creating peerPodVolume with volumeID: %v, err: %v", volumeID, err.Error())
} else {
glog.Infof("Peerpodvolume object is created")
}
_, _ = s.createPeerpodVolume(normalizedVolumeID, volumeName)
}
}); e != nil {
return nil, e
Expand Down Expand Up @@ -142,57 +128,75 @@ func (s *ControllerService) ControllerPublishVolume(ctx context.Context, req *cs
savedPeerpodvolume, err := s.PeerpodvolumeClient.ConfidentialcontainersV1alpha1().PeerpodVolumes(s.Namespace).Get(context.Background(), volumeID, metav1.GetOptions{})
if err != nil {
glog.Infof("Not found PeerpodVolume with volumeID: %v, err: %v", volumeID, err.Error())
if e := s.redirect(ctx, req, func(ctx context.Context, client csi.ControllerClient) {
res, err = client.ControllerPublishVolume(ctx, req)
}); e != nil {
return nil, e
}
} else {
nodeID := req.GetNodeId()
uuid, _ := uid.NewV4() // #nosec G104: Attempt to randomly generate uuid
requestID := uuid.String()

fakePublishContext := map[string]string{
PublishInfoVolumeID: volumeID,
PublishInfoNodeID: "",
PublishInfoStatus: "attached",
PublishInfoDevicePath: "/dev/fff",
PublishInfoRequestID: requestID,
}
res = &csi.ControllerPublishVolumeResponse{PublishContext: fakePublishContext}

glog.Infof("The fake ControllerPublishVolumeResponse is :%v", res)
var reqBuf bytes.Buffer
if err := (&jsonpb.Marshaler{}).Marshal(&reqBuf, req); err != nil {
glog.Error(err, "Error happens while Marshal ControllerPublishVolumeRequest")
// ControllerPublishVolume was called without a matching peerpod volume object existing.
// This can happen when the persistent volume was manually created, e.g. to consume an existing storage backend.
// In this case, we need to create the peerpod volume object here.
peerPod := req.VolumeContext[PeerpodParamKey]
if peerPod == "" {
if e := s.redirect(ctx, req, func(ctx context.Context, client csi.ControllerClient) {
res, err = client.ControllerPublishVolume(ctx, req)
}); e != nil {
return nil, e
}
}
reqJsonString := reqBuf.String()
glog.Infof("ControllerPublishVolumeRequest JSON string: %s\n", reqJsonString)
glog.Info("PeerPod parameter found in ControllerPublishVolumeRequest. Creating a new PeerpodVolume object")

var resBuf bytes.Buffer
if err := (&jsonpb.Marshaler{}).Marshal(&resBuf, res); err != nil {
glog.Error(err, "Error happens while Marshal ControllerPublishVolumeResponse")
}
resJsonString := resBuf.String()
glog.Infof("ControllerPublishVolumeResponse JSON string: %s\n", resJsonString)

savedPeerpodvolume.Labels["nodeID"] = nodeID
savedPeerpodvolume.Spec.NodeID = nodeID
savedPeerpodvolume.Spec.WrapperControllerPublishVolumeReq = string(reqJsonString)
savedPeerpodvolume.Spec.WrapperControllerPublishVolumeRes = string(resJsonString)
savedPeerpodvolume, err = s.PeerpodvolumeClient.ConfidentialcontainersV1alpha1().PeerpodVolumes(s.Namespace).Update(context.Background(), savedPeerpodvolume, metav1.UpdateOptions{})
if err != nil {
glog.Errorf("Error happens while Update PeerpodVolume in ControllerPublishVolume, err: %v", err.Error())
return
}
savedPeerpodvolume.Status = v1alpha1.PeerpodVolumeStatus{
State: v1alpha1.ControllerPublishVolumeCached,
}
_, err = s.PeerpodvolumeClient.ConfidentialcontainersV1alpha1().PeerpodVolumes(s.Namespace).UpdateStatus(context.Background(), savedPeerpodvolume, metav1.UpdateOptions{})
// Delete peerpod key from req parameters because csi driver may check parameters strictly.
delete(req.VolumeContext, PeerpodParamKey)

volumeName := peerpodVolumeNamePlaceholder
savedPeerpodvolume, err = s.createPeerpodVolume(volumeID, volumeName)
if err != nil {
glog.Errorf("Error happens while Update PeerpodVolume status to ControllerPublishVolumeCached, err: %v", err.Error())
return nil, err
}
}

nodeID := req.GetNodeId()
uuid, _ := uid.NewV4() // #nosec G104: Attempt to randomly generate uuid
requestID := uuid.String()

fakePublishContext := map[string]string{
PublishInfoVolumeID: volumeID,
PublishInfoNodeID: "",
PublishInfoStatus: "attached",
PublishInfoDevicePath: "/dev/fff",
PublishInfoRequestID: requestID,
}
res = &csi.ControllerPublishVolumeResponse{PublishContext: fakePublishContext}

glog.Infof("The fake ControllerPublishVolumeResponse is :%v", res)
var reqBuf bytes.Buffer
if err := (&jsonpb.Marshaler{}).Marshal(&reqBuf, req); err != nil {
glog.Error(err, "Error happens while Marshal ControllerPublishVolumeRequest")
}
reqJsonString := reqBuf.String()
glog.Infof("ControllerPublishVolumeRequest JSON string: %s\n", reqJsonString)

var resBuf bytes.Buffer
if err := (&jsonpb.Marshaler{}).Marshal(&resBuf, res); err != nil {
glog.Error(err, "Error happens while Marshal ControllerPublishVolumeResponse")
}
resJsonString := resBuf.String()
glog.Infof("ControllerPublishVolumeResponse JSON string: %s\n", resJsonString)

savedPeerpodvolume.Labels["nodeID"] = nodeID
savedPeerpodvolume.Spec.NodeID = nodeID
savedPeerpodvolume.Spec.WrapperControllerPublishVolumeReq = string(reqJsonString)
savedPeerpodvolume.Spec.WrapperControllerPublishVolumeRes = string(resJsonString)
savedPeerpodvolume, err = s.PeerpodvolumeClient.ConfidentialcontainersV1alpha1().PeerpodVolumes(s.Namespace).Update(context.Background(), savedPeerpodvolume, metav1.UpdateOptions{})
if err != nil {
glog.Errorf("Error happens while Update PeerpodVolume in ControllerPublishVolume, err: %v", err.Error())
return
}
savedPeerpodvolume.Status = v1alpha1.PeerpodVolumeStatus{
State: v1alpha1.ControllerPublishVolumeCached,
}
_, err = s.PeerpodvolumeClient.ConfidentialcontainersV1alpha1().PeerpodVolumes(s.Namespace).UpdateStatus(context.Background(), savedPeerpodvolume, metav1.UpdateOptions{})
if err != nil {
glog.Errorf("Error happens while Update PeerpodVolume status to ControllerPublishVolumeCached, err: %v", err.Error())
}

return
}

Expand Down Expand Up @@ -406,3 +410,27 @@ func (s *ControllerService) SyncHandler(peerPodVolume *peerpodvolumeV1alpha1.Pee
func (s *ControllerService) DeleteFunction(peerPodVolume *peerpodvolumeV1alpha1.PeerpodVolume) {
glog.Infof("deleteFunction from controllerService: %v ", peerPodVolume)
}

func (s *ControllerService) createPeerpodVolume(volumeID, volumeName string) (*v1alpha1.PeerpodVolume, error) {
labels := map[string]string{
"volumeName": volumeName,
}
newPeerpodvolume := &v1alpha1.PeerpodVolume{
ObjectMeta: metav1.ObjectMeta{
Name: volumeID,
Namespace: s.Namespace,
Labels: labels,
},
Spec: v1alpha1.PeerpodVolumeSpec{
VolumeID: volumeID,
VolumeName: volumeName,
},
}
peerpodVolume, err := s.PeerpodvolumeClient.ConfidentialcontainersV1alpha1().PeerpodVolumes(s.Namespace).Create(context.Background(), newPeerpodvolume, metav1.CreateOptions{})
if err != nil {
glog.Errorf("Error happens while creating peerPodVolume with volumeID: %v, err: %v", volumeID, err.Error())
} else {
glog.Infof("Peerpodvolume object is created")
}
return peerpodVolume, err
}
14 changes: 11 additions & 3 deletions src/csi-wrapper/pkg/wrapper/nodeservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package wrapper
import (
"bytes"
"context"
"errors"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -125,10 +126,17 @@ func (s *NodeService) NodePublishVolume(ctx context.Context, req *csi.NodePublis
podUid, volumeName := s.getPodUIDandVolumeName(targetPath)
savedPeerpodvolume.Labels["podUid"] = podUid
savedPeerpodvolume.Spec.PodUid = podUid
if volumeName != savedPeerpodvolume.Labels["volumeName"] {
glog.Error("The volume name from target path doesn't match with the CRD")
return
savedVolumeName := savedPeerpodvolume.Spec.VolumeName
if volumeName != savedVolumeName && savedVolumeName != peerpodVolumeNamePlaceholder {
glog.Error("The volume name from target path doesn't match with the CR")
return nil, errors.New("the volume name from target path doesn't match with the CR")
}
if savedVolumeName == peerpodVolumeNamePlaceholder {
glog.Info("Detected a placeholder volume name in the CR. Updating the CR with the volume name from the target path")
savedPeerpodvolume.Labels["volumeName"] = volumeName
savedPeerpodvolume.Spec.VolumeName = volumeName
}

savedPeerpodvolume.Spec.WrapperNodePublishVolumeReq = nodePublishVolumeRequest
_, err = s.PeerpodvolumeClient.ConfidentialcontainersV1alpha1().PeerpodVolumes(s.Namespace).Update(context.Background(), savedPeerpodvolume, metav1.UpdateOptions{})
if err != nil {
Expand Down

0 comments on commit b5542aa

Please sign in to comment.