Skip to content

Commit

Permalink
[build] enhance api attach volume
Browse files Browse the repository at this point in the history
  • Loading branch information
cuongpiger committed Jul 1, 2024
1 parent 99d3b68 commit c34df86
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 28 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/cuongpiger/joat v1.0.12
github.com/spf13/pflag v1.0.5
github.com/vngcloud/vngcloud-csi-volume-modifier v1.0.2
github.com/vngcloud/vngcloud-go-sdk/v2 v2.2.29
github.com/vngcloud/vngcloud-go-sdk/v2 v2.2.30
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0
go.opentelemetry.io/otel v1.26.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/vngcloud/vngcloud-csi-volume-modifier v1.0.2 h1:LBSW1T0W4HxJKtpgSYbL5ZL1XiyOTF2nHIs8uqG5uoo=
github.com/vngcloud/vngcloud-csi-volume-modifier v1.0.2/go.mod h1:wkhxk+x3ILNOk0aUxGivvhdiOXxPOVNXeRDHUx/IR50=
github.com/vngcloud/vngcloud-go-sdk/v2 v2.2.29 h1:agCiZYlywX9YWCb4gbLX2/MXDgSBsmEL2CCEbSBrNEY=
github.com/vngcloud/vngcloud-go-sdk/v2 v2.2.29/go.mod h1:TN4rl3ifgmGoFCBOEdzr0lV4raBDmo13XKbRvxtlktw=
github.com/vngcloud/vngcloud-go-sdk/v2 v2.2.30 h1:Fw3/bCi5v1Txarv/aZR8OckUChLS25u4XTYf3WcJ+6A=
github.com/vngcloud/vngcloud-go-sdk/v2 v2.2.30/go.mod h1:TN4rl3ifgmGoFCBOEdzr0lV4raBDmo13XKbRvxtlktw=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0 h1:A3SayB3rNyt+1S6qpI9mHPkeHTZbD7XILEqWnYZb2l0=
Expand Down
109 changes: 90 additions & 19 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (s *cloud) DeleteVolume(volID string) lserr.IError {
return true, nil
}

ierr = lserr.ErrVServerVolumeFailedToGet(volID, sdkErr)
ierr = lserr.ErrVolumeFailedToGet(volID, sdkErr)
return false, nil
}

Expand All @@ -171,7 +171,7 @@ func (s *cloud) DeleteVolume(volID string) lserr.IError {
return true, nil
}

ierr = lserr.ErrVServerVolumeFailedToDelete(volID, sdkErr)
ierr = lserr.ErrVolumeFailedToDelete(volID, sdkErr)
llog.ErrorS(ierr.GetError(), "[ERROR] - DeleteVolume: Failed to delete the volume", ierr.GetListParameters()...)
return false, nil
}
Expand All @@ -191,32 +191,103 @@ func (s *cloud) DeleteVolume(volID string) lserr.IError {
return ierr
}

func (s *cloud) AttachVolume(pinstanceId, pvolumeId string) (*lsentity.Volume, error) {
func (s *cloud) AttachVolume(pinstanceId, pvolumeId string) (*lsentity.Volume, lserr.IError) {
var (
svol *lsentity.Volume
err error
ierr lserr.IError
)

svol, err = s.waitVolumeAchieveStatus(pvolumeId, volumeArchivedStatus)
if err != nil {
return nil, err
}
_ = ljwait.ExponentialBackoff(ljwait.NewBackOff(5, 10, true, ljtime.Minute(10)), func() (bool, error) {
vol, sdkErr := s.client.VServerGateway().V2().VolumeService().
GetBlockVolumeById(lsdkVolumeV2.NewGetBlockVolumeByIdRequest(pvolumeId))
if sdkErr != nil {
if sdkErr.IsError(lsdkErrs.EcVServerVolumeNotFound) || vol == nil {
ierr = lserr.ErrVolumeNotFound(pvolumeId)
llog.ErrorS(ierr.GetError(), "[ERROR] - AttachVolume: Volume not found", ierr.GetListParameters()...)
return false, ierr.GetError()
}

if svol.AttachedTheInstance(pinstanceId) {
return svol, nil
}
return false, nil
}

opt := lsdkComputeV2.NewAttachBlockVolumeRequest(pinstanceId, pvolumeId)
sdkErr := s.client.VServerGateway().V2().ComputeService().AttachBlockVolume(opt)
if sdkErr != nil {
if !sdkErr.IsError(lsdkErrs.EcVServerVolumeAlreadyAttachedThisServer) {
return nil, sdkErr.GetError()
// Volume is in error state
if vol.IsError() {
ierr = lserr.ErrVolumeIsInErrorState(pvolumeId)
llog.ErrorS(ierr.GetError(), "[ERROR] - AttachVolume: The volume is in error state", ierr.GetListParameters()...)
return false, ierr.GetError()
}

if vol.AttachedTheInstance(pinstanceId) {
ierr = nil // reset
llog.InfoS("[INFO] - AttachVolume: The volume is already attached", "volumeId", pvolumeId, "instanceId", pinstanceId)
return true, nil
}

if vol.MultiAttach || vol.IsAvailable() {
sdkErr = s.client.VServerGateway().V2().ComputeService().
AttachBlockVolume(lsdkComputeV2.NewAttachBlockVolumeRequest(pinstanceId, pvolumeId))
if sdkErr != nil {
switch sdkErr.GetErrorCode() {
case lsdkErrs.EcVServerVolumeAlreadyAttachedThisServer:
ierr = nil // reset
llog.InfoS("[INFO] - AttachVolume: The volume is already attached", "volumeId", pvolumeId, "instanceId", pinstanceId)
return true, nil
case lsdkErrs.EcVServerVolumeInProcess:
llog.InfoS("[INFO] - AttachVolume: The volume is in process", "volumeId", pvolumeId, "instanceId", pinstanceId)
return false, nil
default:
ierr = lserr.ErrVolumeFailedToAttach(pinstanceId, pvolumeId, sdkErr)
llog.ErrorS(ierr.GetError(), "[ERROR] - AttachVolume: Failed to attach the volume", ierr.GetListParameters()...)
return false, ierr.GetError()
}
}

ierr = nil // reset
llog.InfoS("[INFO] - AttachVolume: Attached the volume successfully", "volumeId", pvolumeId, "instanceId", pinstanceId)
return true, nil
}

return false, nil
})

if ierr != nil {
return nil, ierr
}

err = s.waitDiskAttached(pinstanceId, pvolumeId)
return svol, err
_ = ljwait.ExponentialBackoff(ljwait.NewBackOff(5, 10, true, ljtime.Minute(10)), func() (bool, error) {
vol, sdkErr := s.client.VServerGateway().V2().VolumeService().
GetBlockVolumeById(lsdkVolumeV2.NewGetBlockVolumeByIdRequest(pvolumeId))
if sdkErr != nil {
if sdkErr.IsError(lsdkErrs.EcVServerVolumeNotFound) || vol == nil {
ierr = lserr.ErrVolumeNotFound(pvolumeId)
llog.ErrorS(ierr.GetError(), "[ERROR] - AttachVolume: Volume not found", ierr.GetListParameters()...)
return false, ierr.GetError()
}

llog.ErrorS(ierr.GetError(), "[ERROR] - AttachVolume: Failed to get the volume when waiting it become archieve status", ierr.GetListParameters()...)
return false, nil
}

if vol.IsError() {
ierr = lserr.ErrVolumeIsInErrorState(pvolumeId)
llog.ErrorS(ierr.GetError(), "[ERROR] - AttachVolume: The volume is in error state", ierr.GetListParameters()...)
return false, ierr.GetError()
}

if vol.AttachedTheInstance(pinstanceId) {
ierr = nil // reset
svol = lsentity.NewVolume(vol)
return true, nil
}

return false, nil
})

if ierr != nil {
return nil, ierr
}

return svol, nil
}

func (s *cloud) DetachVolume(pinstanceId, pvolumeId string) lserr.IError {
Expand All @@ -230,7 +301,7 @@ func (s *cloud) DetachVolume(pinstanceId, pvolumeId string) lserr.IError {
vol, sdkErr := s.client.VServerGateway().V2().VolumeService().
GetBlockVolumeById(lsdkVolumeV2.NewGetBlockVolumeByIdRequest(pvolumeId))
if sdkErr != nil {
ierr = lserr.ErrVServerVolumeFailedToGet(pvolumeId, sdkErr)
ierr = lserr.ErrVolumeFailedToGet(pvolumeId, sdkErr)
llog.ErrorS(ierr.GetError(), "[ERROR] - DetachVolume: Failed to get the volume", ierr.GetListParameters()...)
return false, ierr.GetError()
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/cloud/entity/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ const (
VolumeInUseStatus = "IN-USE"
)

func NewVolume(pvol *lsdkEntity.Volume) *Volume {
return &Volume{
Volume: pvol,
}
}

type Volume struct {
*lsdkEntity.Volume
}
Expand Down
20 changes: 18 additions & 2 deletions pkg/cloud/errors/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var (
WithParameters(psdkErr.GetParameters()))
}

ErrVServerVolumeFailedToGet = func(pvolId string, psdkErr lsdkErr.ISdkError) IError {
ErrVolumeFailedToGet = func(pvolId string, psdkErr lsdkErr.ISdkError) IError {
return NewError(new(lsdkErr.SdkError).
WithErrorCode(EcVServerVolumeFailedToGet).
WithErrors(psdkErr.GetError()).
Expand All @@ -32,12 +32,28 @@ var (
WithParameters(psdkErr.GetParameters()))
}

ErrVServerVolumeFailedToDelete = func(pvolId string, psdkErr lsdkErr.ISdkError) IError {
ErrVolumeFailedToDelete = func(pvolId string, psdkErr lsdkErr.ISdkError) IError {
return NewError(new(lsdkErr.SdkError).
WithErrorCode(EcVServerVolumeFailedToDelete).
WithErrors(psdkErr.GetError()).
WithMessage(lfmt.Sprintf("Failed to delete volume %s", pvolId)).
WithKVparameters("volumeId", pvolId).
WithParameters(psdkErr.GetParameters()))
}

ErrVolumeNotFound = func(pvolId string) IError {
return NewError(new(lsdkErr.SdkError).
WithErrorCode(EcVServerVolumeNotFound).
WithMessage(lfmt.Sprintf("Volume %s not found", pvolId)).
WithKVparameters("volumeId", pvolId))
}

ErrVolumeFailedToAttach = func(pinstanceId, pvolId string, psdkErr lsdkErr.ISdkError) IError {
return NewError(new(lsdkErr.SdkError).
WithErrorCode(EcVServerVolumeFailedToAttach).
WithErrors(psdkErr.GetError()).
WithMessage(lfmt.Sprintf("Failed to attach volume %s to instance %s", pvolId, pinstanceId)).
WithKVparameters("instanceId", pinstanceId, "volumeId", pvolId).
WithParameters(psdkErr.GetParameters()))
}
)
2 changes: 2 additions & 0 deletions pkg/cloud/errors/error_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ const (
EcVServerVolumeFailedToDetach = lsdkErrs.ErrorCode("VServerVolumeFailedToDetach")
EcVServerVolumeFailedToGet = lsdkErrs.ErrorCode("VServerVolumeFailedToGet")
EcVServerVolumeFailedToDelete = lsdkErrs.ErrorCode("VServerVolumeFailedToDelete")
EcVServerVolumeNotFound = lsdkErrs.ErrorCode("VServerVolumeNotFound")
EcVServerVolumeFailedToAttach = lsdkErrs.ErrorCode("VServerVolumeFailedToAttach")
)
15 changes: 11 additions & 4 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,23 +281,30 @@ func (s *controllerService) DeleteVolume(pctx lctx.Context, preq *lcsi.DeleteVol
}

func (s *controllerService) ControllerPublishVolume(pctx lctx.Context, preq *lcsi.ControllerPublishVolumeRequest) (result *lcsi.ControllerPublishVolumeResponse, err error) {
llog.V(5).InfoS("[INFO] - ControllerPublishVolume: called with request", "preq", *preq)
llog.V(5).InfoS("[INFO] - ControllerPublishVolume: Called", "request", *preq)

if err = validateControllerPublishVolumeRequest(preq); err != nil {
llog.ErrorS(err, "[ERROR] - ControllerPublishVolume: invalid request")
llog.ErrorS(err, "[ERROR] - ControllerPublishVolume: Invalid request")
return nil, err
}

volumeID := preq.GetVolumeId() // get the cloud volume ID
nodeID := preq.GetNodeId() // get the cloud node ID
key := volumeID + nodeID

// Make sure there are no 2 operations on the same volume and node at the same time
if !s.inFlight.Insert(volumeID + nodeID) {
llog.InfoS("[INFO] - ControllerPublishVolume: Operation is already in-flight", "volumeID", volumeID, "nodeID", nodeID, "inflightKey", key)
return nil, ErrOperationAlreadyExists(volumeID)
}
defer s.inFlight.Delete(volumeID + nodeID)

llog.V(2).InfoS("[INFO] - ControllerPublishVolume: attaching volume into the instance", "volumeID", volumeID, "nodeID", nodeID)
llog.V(5).InfoS("[INFO] - ControllerPublishVolume: Insert this action to inflight cache", "volumeID", volumeID, "nodeID", nodeID, "inflightKey", key)
defer func() {
llog.InfoS("[INFO] - ControllerPublishVolume: Operation completed", "volumeID", volumeID, "nodeID", nodeID, "inflightKey", key)
s.inFlight.Delete(volumeID + nodeID)
}()

llog.InfoS("[INFO] - ControllerPublishVolume: attaching volume into the instance", "volumeID", volumeID, "nodeID", nodeID)

// Attach the volume and wait for it to be attached
_, err = s.cloud.AttachVolume(nodeID, volumeID)
Expand Down

0 comments on commit c34df86

Please sign in to comment.