Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for bulk-init based on ellipses #964

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions cmd/directpv/bulk-init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// This file is part of MinIO DirectPV
// Copyright (c) 2024 MinIO, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package main

import (
"context"
"errors"
"fmt"
"os"
"strings"
"sync"

directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/consts"
devicepkg "github.com/minio/directpv/pkg/device"
"github.com/minio/directpv/pkg/ellipsis"
"github.com/minio/directpv/pkg/initrequest"
"github.com/minio/directpv/pkg/sys"
"github.com/minio/directpv/pkg/utils"
"github.com/spf13/cobra"
"k8s.io/klog/v2"
)

var (
drivesArgs []string
initAll bool
)

var bulkInitCmd = &cobra.Command{
Use: "bulk-init --drives [DRIVE-ELLIPSIS...]",
Short: "Bulk initialize the devices",
SilenceUsage: true,
SilenceErrors: true,
Hidden: true,
RunE: func(c *cobra.Command, _ []string) error {
if err := sys.Mkdir(consts.MountRootDir, 0o755); err != nil && !errors.Is(err, os.ErrExist) {
return err
}
switch len(drivesArgs) {
case 0:
return errors.New("--drives must be provided")
case 1:
if drivesArgs[0] == "*" {
initAll = true
}
}

var drives []string
for i := range drivesArgs {
drivesArgs[i] = strings.TrimSpace(utils.TrimDevPrefix(drivesArgs[i]))
if drivesArgs[i] == "" {
return fmt.Errorf("empty drive name")
}
result, err := ellipsis.Expand(drivesArgs[i])
if err != nil {
return err
}
drives = append(drives, result...)
}
if !initAll && len(drives) == 0 {
return errors.New("invalid ellipsis input; no drives selected")
}
return startBulkInit(c.Context(), drives)
},
}

func init() {
bulkInitCmd.PersistentFlags().BoolVar(&dryRunFlag, "dry-run", dryRunFlag, "No modify mode")
bulkInitCmd.PersistentFlags().StringSliceVarP(&drivesArgs, "drives", "d", drivesArgs, "drives to be initialized; supports ellipses pattern e.g. sd{a...z}")
}

func startBulkInit(ctx context.Context, drives []string) error {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()

initRequestHandler, err := initrequest.NewHandler(
ctx,
nodeID,
map[string]string{
string(directpvtypes.TopologyDriverIdentity): identity,
string(directpvtypes.TopologyDriverRack): rack,
string(directpvtypes.TopologyDriverZone): zone,
string(directpvtypes.TopologyDriverRegion): region,
string(directpvtypes.TopologyDriverNode): string(nodeID),
},
)
if err != nil {
return fmt.Errorf("unable to create initrequest handler; %v", err)
}

devices, err := devicepkg.Probe()
if err != nil {
return fmt.Errorf("unable to probe devices; %v", err)
}

var filteredDevices []devicepkg.Device
for _, device := range devices {
klog.Infoln(device.Name)
if !device.Available() {
continue
}
if initAll || utils.Contains(drives, device.Name) {
filteredDevices = append(filteredDevices, device)
}
}

if len(filteredDevices) == 0 {
return errors.New("no available drives selected to initialize")
}

var wg sync.WaitGroup
var failed bool
for i := range filteredDevices {
wg.Add(1)
go func(device devicepkg.Device, force bool) {
defer wg.Done()
if dryRunFlag {
klog.Infof("\n[DRY-RUN] initializing device %v with force: %v", device.Name, force)
return
}
if err := initRequestHandler.InitDevice(device, force); err != nil {
failed = true
klog.ErrorS(err, "unable to init device %v", device.Name)
}
}(filteredDevices[i], filteredDevices[i].FSType() != "")
}
wg.Wait()

if failed {
return errors.New("failed to initialize all the drives")
}

return nil
}
1 change: 1 addition & 0 deletions cmd/directpv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func init() {
mainCmd.AddCommand(legacyNodeServerCmd)
mainCmd.AddCommand(nodeControllerCmd)
mainCmd.AddCommand(repairCmd)
mainCmd.AddCommand(bulkInitCmd)
}

func main() {
Expand Down
5 changes: 5 additions & 0 deletions pkg/device/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ func (d Device) deniedReason() string {
return reason
}

// Available denotes if the device is available for initialization
func (d Device) Available() bool {
return d.deniedReason() == ""
}

// ToNodeDevice constructs the NodeDevice object from Device info.
func (d Device) ToNodeDevice(nodeID directpvtypes.NodeID) types.Device {
return types.Device{
Expand Down
26 changes: 16 additions & 10 deletions pkg/initrequest/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ const (
resyncPeriod = 5 * time.Minute
)

type initRequestEventHandler struct {
// Handler represents the handler to initialize
type Handler struct {
nodeID directpvtypes.NodeID
reflink bool
topology map[string]string
Expand All @@ -64,7 +65,8 @@ type initRequestEventHandler struct {
mu sync.Mutex
}

func newInitRequestEventHandler(ctx context.Context, nodeID directpvtypes.NodeID, topology map[string]string) (*initRequestEventHandler, error) {
// NewHandler returns the initreqhandler
func NewHandler(ctx context.Context, nodeID directpvtypes.NodeID, topology map[string]string) (*Handler, error) {
reflink, err := reflinkSupported(ctx)
if err != nil {
return nil, err
Expand All @@ -76,7 +78,7 @@ func newInitRequestEventHandler(ctx context.Context, nodeID directpvtypes.NodeID
klog.V(3).Infof("XFS reflink support is disabled")
}

return &initRequestEventHandler{
return &Handler{
reflink: reflink,
nodeID: nodeID,
topology: topology,
Expand Down Expand Up @@ -129,7 +131,8 @@ func newInitRequestEventHandler(ctx context.Context, nodeID directpvtypes.NodeID
}, nil
}

func (handler *initRequestEventHandler) ListerWatcher() cache.ListerWatcher {
// ListerWatcher represents the listerwatcher of initrequest
func (handler *Handler) ListerWatcher() cache.ListerWatcher {
labelSelector := fmt.Sprintf("%s=%s", directpvtypes.NodeLabelKey, handler.nodeID)
return cache.NewFilteredListWatchFromClient(
client.RESTClient(),
Expand All @@ -141,11 +144,13 @@ func (handler *initRequestEventHandler) ListerWatcher() cache.ListerWatcher {
)
}

func (handler *initRequestEventHandler) ObjectType() runtime.Object {
// ObjectType returns an empty instance of init request
func (handler *Handler) ObjectType() runtime.Object {
return &types.InitRequest{}
}

func (handler *initRequestEventHandler) Handle(ctx context.Context, eventType controller.EventType, object runtime.Object) error {
// Handle handles the controller events
func (handler *Handler) Handle(ctx context.Context, eventType controller.EventType, object runtime.Object) error {
switch eventType {
case controller.UpdateEvent, controller.AddEvent:
initRequest := object.(*types.InitRequest)
Expand All @@ -157,7 +162,7 @@ func (handler *initRequestEventHandler) Handle(ctx context.Context, eventType co
return nil
}

func (handler *initRequestEventHandler) initDevices(ctx context.Context, req *types.InitRequest) error {
func (handler *Handler) initDevices(ctx context.Context, req *types.InitRequest) error {
handler.mu.Lock()
defer handler.mu.Unlock()

Expand Down Expand Up @@ -196,7 +201,7 @@ func (handler *initRequestEventHandler) initDevices(ctx context.Context, req *ty
wg.Add(1)
go func(i int, device pkgdevice.Device, force bool) {
defer wg.Done()
if err := handler.initDevice(device, force); err != nil {
if err := handler.InitDevice(device, force); err != nil {
results[i].Error = err.Error()
}
}(i, device, req.Spec.Devices[i].Force)
Expand All @@ -223,7 +228,8 @@ func updateInitRequest(ctx context.Context, name string, results []types.InitDev
return retry.RetryOnConflict(retry.DefaultRetry, updateFunc)
}

func (handler *initRequestEventHandler) initDevice(device pkgdevice.Device, force bool) error {
// InitDevice initialize the device and creates a corresponding directpvdrive
func (handler *Handler) InitDevice(device pkgdevice.Device, force bool) error {
devPath := utils.AddDevPrefix(device.Name)

deviceMap, majorMinorMap, err := handler.getMounts()
Expand Down Expand Up @@ -295,7 +301,7 @@ func (handler *initRequestEventHandler) initDevice(device pkgdevice.Device, forc

// StartController starts initrequest controller.
func StartController(ctx context.Context, nodeID directpvtypes.NodeID, identity, rack, zone, region string) {
initRequestHandler, err := newInitRequestEventHandler(
initRequestHandler, err := NewHandler(
ctx,
nodeID,
map[string]string{
Expand Down
28 changes: 28 additions & 0 deletions resources/init/ClusterRole.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
annotations:
rbac.authorization.kubernetes.io/autoupdate: "true"
creationTimestamp: null
labels:
directpv.min.io/created-by: bulk-init
directpv.min.io/version: v1beta1
name: directpv-bulk-init
rules:
- apiGroups:
- ""
resources:
- events
verbs:
- create
- list
- patch
- update
- watch
- apiGroups:
- directpv.min.io
resources:
- directpvdrives
verbs:
- create
- get
19 changes: 19 additions & 0 deletions resources/init/ClusterRoleBinding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
annotations:
rbac.authorization.kubernetes.io/autoupdate: "true"
creationTimestamp: null
labels:
directpv.min.io/created-by: bulk-init
directpv.min.io/version: v1beta1
name: directpv-bulk-init
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: directpv-bulk-init
subjects:
- kind: ServiceAccount
name: directpv-bulk-init
namespace: directpv
71 changes: 71 additions & 0 deletions resources/init/Job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@

apiVersion: batch/v1
kind: Job
metadata:
labels:
directpv.min.io/created-by: bulk-init
directpv.min.io/version: v1beta1
name: bulk-init
namespace: directpv
spec:
ttlSecondsAfterFinished: 3600
backoffLimit: 0
template:
metadata:
name: bulk-init
namespace: directpv
spec:
containers:
- args:
- bulk-init
- -v=3
- --kube-node-name=$(KUBE_NODE_NAME)
- --drives=$(DRIVE_ELLIPSES)
env:
- name: KUBE_NODE_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
image: quay.io/minio/directpv@sha256:83fd05fe114ed15c3975333c90cbe18c782d9c4d5c7ad6fdb8cc835e380ba505
name: bulk-init
resources: {}
securityContext:
privileged: true
terminationMessagePath: /var/log/driver-termination-log
terminationMessagePolicy: FallbackToLogsOnError
volumeMounts:
- mountPath: /var/lib/directpv/
mountPropagation: Bidirectional
name: directpv-common-root
- mountPath: /sys
mountPropagation: Bidirectional
name: sysfs
- mountPath: /dev
mountPropagation: HostToContainer
name: devfs
readOnly: true
- mountPath: /run/udev/data
mountPropagation: Bidirectional
name: run-udev-data-dir
readOnly: true
restartPolicy: Never
hostPID: true
serviceAccountName: directpv-bulk-init
volumes:
- hostPath:
path: /var/lib/directpv/
type: DirectoryOrCreate
name: directpv-common-root
- hostPath:
path: /sys
type: DirectoryOrCreate
name: sysfs
- hostPath:
path: /dev
type: DirectoryOrCreate
name: devfs
- hostPath:
path: /run/udev/data
type: DirectoryOrCreate
name: run-udev-data-dir
12 changes: 12 additions & 0 deletions resources/init/Namespace.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: v1
kind: Namespace
metadata:
creationTimestamp: null
finalizers:
- foregroundDeletion
labels:
directpv.min.io/version: v1beta1
pod-security.kubernetes.io/enforce: privileged
name: directpv
spec: {}
status: {}
Loading