Skip to content

Commit

Permalink
[feat] merge code with vngcloud manage csi driver
Browse files Browse the repository at this point in the history
  • Loading branch information
cuongpiger committed Jun 4, 2024
1 parent d32a9f5 commit bb771cc
Show file tree
Hide file tree
Showing 21 changed files with 1,098 additions and 672 deletions.
167 changes: 87 additions & 80 deletions cmd/vngcloud-blockstorage-csi-driver/main.go
Original file line number Diff line number Diff line change
@@ -1,83 +1,84 @@
package main

import (
"context"
"fmt"
flag "github.com/spf13/pflag"
"github.com/vngcloud/vngcloud-blockstorage-csi-driver/pkg/driver"
"github.com/vngcloud/vngcloud-blockstorage-csi-driver/pkg/metrics"
"k8s.io/component-base/featuregate"
"os"
"strings"

logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/component-base/logs/json"
"k8s.io/klog/v2"
"time"
lctx "context"
lfmt "fmt"
los "os"
lstr "strings"
ltime "time"

lflag "github.com/spf13/pflag"
lfeaturegate "k8s.io/component-base/featuregate"
llogApi "k8s.io/component-base/logs/api/v1"
ljson "k8s.io/component-base/logs/json"
llog "k8s.io/klog/v2"

lsdriver "github.com/vngcloud/vngcloud-blockstorage-csi-driver/pkg/driver"
lsmetrics "github.com/vngcloud/vngcloud-blockstorage-csi-driver/pkg/metrics"
)

func main() {
fs := flag.NewFlagSet("vngcloud-blockstorage-csi-driver", flag.ExitOnError)
if err := logsapi.RegisterLogFormat(logsapi.JSONLogFormat, json.Factory{}, logsapi.LoggingBetaOptions); err != nil {
klog.ErrorS(err, "failed to register JSON log format")
fs := lflag.NewFlagSet("vngcloud-blockstorage-csi-driver", lflag.ExitOnError)
if err := llogApi.RegisterLogFormat(llogApi.JSONLogFormat, ljson.Factory{}, llogApi.LoggingBetaOptions); err != nil {
llog.ErrorS(err, "failed to register JSON log format")
}

options := GetOptions(fs)
// Start tracing as soon as possible
if options.ServerOptions.EnableOtelTracing {
exporter, err := driver.InitOtelTracing()
exporter, err := lsdriver.InitOtelTracing()
if err != nil {
klog.ErrorS(err, "failed to initialize otel tracing")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
llog.ErrorS(err, "failed to initialize otel tracing")
llog.FlushAndExit(llog.ExitFlushTimeout, 1)
}
// Exporter will flush traces on shutdown
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := lctx.WithTimeout(lctx.Background(), 10*ltime.Second)
defer cancel()
if err := exporter.Shutdown(ctx); err != nil {
klog.ErrorS(err, "could not shutdown otel exporter")
llog.ErrorS(err, "could not shutdown otel exporter")
}
}()
}

if options.ServerOptions.HttpEndpoint != "" {
r := metrics.InitializeRecorder()
r := lsmetrics.InitializeRecorder()
r.InitializeMetricsHandler(options.ServerOptions.HttpEndpoint, "/metrics")
}

drv, err := driver.NewDriver(
driver.WithClientID(options.Global.ClientID),
driver.WithClientSecret(options.Global.ClientSecret),
driver.WithIdentityURL(options.Global.IdentityURL),
driver.WithVServerURL(options.Global.VServerURL),
driver.WithEndpoint(options.ServerOptions.Endpoint),
driver.WithMode(options.DriverMode),
driver.WithOtelTracing(options.ServerOptions.EnableOtelTracing),
driver.WithModifyVolumeRequestHandlerTimeout(options.ControllerOptions.ModifyVolumeRequestHandlerTimeout),
driver.WithClusterID(options.ServerOptions.ClusterID),
driver.WithTagKeyLength(options.ServerOptions.TagKeyLength),
driver.WithTagValueLength(options.ServerOptions.TagValueLength),
drv, err := lsdriver.NewDriver(
lsdriver.WithClientID(options.Global.ClientID),
lsdriver.WithClientSecret(options.Global.ClientSecret),
lsdriver.WithIdentityURL(options.Global.IdentityURL),
lsdriver.WithVServerURL(options.Global.VServerURL),
lsdriver.WithEndpoint(options.ServerOptions.Endpoint),
lsdriver.WithMode(options.DriverMode),
lsdriver.WithOtelTracing(options.ServerOptions.EnableOtelTracing),
lsdriver.WithModifyVolumeRequestHandlerTimeout(options.ControllerOptions.ModifyVolumeRequestHandlerTimeout),
lsdriver.WithClusterID(options.ServerOptions.ClusterID),
lsdriver.WithTagKeyLength(options.ServerOptions.TagKeyLength),
lsdriver.WithTagValueLength(options.ServerOptions.TagValueLength),
)

if err != nil {
klog.ErrorS(err, "failed to create driver")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
llog.ErrorS(err, "failed to create driver")
llog.FlushAndExit(llog.ExitFlushTimeout, 1)
}
if err := drv.Run(); err != nil {
klog.ErrorS(err, "failed to run driver")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
llog.ErrorS(err, "failed to run driver")
llog.FlushAndExit(llog.ExitFlushTimeout, 1)
}
}

var (
featureGate = featuregate.NewFeatureGate()
featureGate = lfeaturegate.NewFeatureGate()

// used for testing
osExit = os.Exit
osExit = los.Exit
)

type Options struct {
DriverMode driver.Mode
DriverMode lsdriver.Mode

*ServerOptions
*ControllerOptions
Expand All @@ -99,120 +100,126 @@ type ServerOptions struct {
EnableOtelTracing bool
TagKeyLength int
TagValueLength int
CacheUri string
AlertChannel string
AlertChannelSize int
}

func (s *ServerOptions) AddFlags(fs *flag.FlagSet) {
fs.StringVar(&s.Endpoint, "endpoint", driver.DefaultCSIEndpoint, "Endpoint for the CSI driver server")
func (s *ServerOptions) AddFlags(fs *lflag.FlagSet) {
fs.StringVar(&s.Endpoint, "endpoint", lsdriver.DefaultCSIEndpoint, "Endpoint for the CSI driver server")
fs.StringVar(&s.HttpEndpoint, "http-endpoint", "", "The TCP network address where the HTTP server for metrics will listen (example: `:8080`). The default is empty string, which means the server is disabled.")
fs.BoolVar(&s.EnableOtelTracing, "enable-otel-tracing", false, "To enable opentelemetry tracing for the driver. The tracing is disabled by default. Configure the exporter endpoint with OTEL_EXPORTER_OTLP_ENDPOINT and other env variables, see https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration.")
fs.StringVar(&s.ClusterID, "cluster-id", "", "The unique ID of the cluster. This is used to identify the cluster in the logs.")
fs.IntVar(&s.TagKeyLength, "tag-key-length", 15, "The maximum length of the tag key.")
fs.IntVar(&s.TagValueLength, "tag-value-length", 255, "The maximum length of the tag value.")
fs.StringVar(&s.CacheUri, "cache-uri", "", "The URI of the cache server.")
fs.StringVar(&s.AlertChannel, "alert-channel", "", "The alert channel to send alerts to.")
fs.IntVar(&s.AlertChannelSize, "alert-channel-size", 100, "The size of the alert channel.")
}

type ControllerOptions struct {
ModifyVolumeRequestHandlerTimeout time.Duration
ModifyVolumeRequestHandlerTimeout ltime.Duration
UserAgentExtra string
}

func (s *ControllerOptions) AddFlags(fs *flag.FlagSet) {
fs.DurationVar(&s.ModifyVolumeRequestHandlerTimeout, "modify-volume-request-handler-timeout", driver.DefaultModifyVolumeRequestHandlerTimeout, "Timeout for the window in which volume modification calls must be received in order for them to coalesce into a single volume modification call to AWS. This must be lower than the csi-resizer and volumemodifier timeouts")
func (s *ControllerOptions) AddFlags(fs *lflag.FlagSet) {
fs.DurationVar(&s.ModifyVolumeRequestHandlerTimeout, "modify-volume-request-handler-timeout", lsdriver.DefaultModifyVolumeRequestHandlerTimeout, "Timeout for the window in which volume modification calls must be received in order for them to coalesce into a single volume modification call to AWS. This must be lower than the csi-resizer and volumemodifier timeouts")
fs.StringVar(&s.UserAgentExtra, "user-agent-extra", "", "Extra string appended to user agent.")

}

type NodeOptions struct{}

func (s *NodeOptions) AddFlags(fs *flag.FlagSet) {
func (s *NodeOptions) AddFlags(fs *lflag.FlagSet) {
}

func (s *NodeOptions) Validate() error {
return nil
}

func GetOptions(fs *flag.FlagSet) *Options {
func GetOptions(fs *lflag.FlagSet) *Options {
var (
version = fs.Bool("version", false, "Print the version and exit.")
toStderr = fs.Bool("logtostderr", false, "log to standard error instead of files. DEPRECATED: will be removed in a future release.")

args = os.Args[1:]
cmd = string(driver.AllMode)
args = los.Args[1:]
cmd = string(lsdriver.AllMode)

serverOptions = ServerOptions{}
controllerOptions = ControllerOptions{}
nodeOptions = NodeOptions{}
)

serverOptions.AddFlags(fs)
c := logsapi.NewLoggingConfiguration()
c := llogApi.NewLoggingConfiguration()

err := logsapi.AddFeatureGates(featureGate)
err := llogApi.AddFeatureGates(featureGate)
if err != nil {
klog.ErrorS(err, "failed to add feature gates")
llog.ErrorS(err, "failed to add feature gates")
}

logsapi.AddFlags(c, fs)
llogApi.AddFlags(c, fs)

if len(os.Args) > 1 && !strings.HasPrefix(os.Args[1], "-") {
cmd = os.Args[1]
args = os.Args[2:]
if len(los.Args) > 1 && !lstr.HasPrefix(los.Args[1], "-") {
cmd = los.Args[1]
args = los.Args[2:]
}

switch cmd {
case string(driver.ControllerMode):
case string(lsdriver.ControllerMode):
controllerOptions.AddFlags(fs)

case string(driver.NodeMode):
case string(lsdriver.NodeMode):
nodeOptions.AddFlags(fs)

case string(driver.AllMode):
case string(lsdriver.AllMode):
controllerOptions.AddFlags(fs)
nodeOptions.AddFlags(fs)

default:
klog.Errorf("Unknown driver mode %s: Expected %s, %s, %s", cmd, driver.ControllerMode, driver.NodeMode, driver.AllMode)
klog.FlushAndExit(klog.ExitFlushTimeout, 0)
llog.Errorf("Unknown driver mode %s: Expected %s, %s, %s", cmd, lsdriver.ControllerMode, lsdriver.NodeMode, lsdriver.AllMode)
llog.FlushAndExit(llog.ExitFlushTimeout, 0)
}

if err = fs.Parse(args); err != nil {
panic(err)
}

if cmd != string(driver.ControllerMode) {
if cmd != string(lsdriver.ControllerMode) {
// nodeOptions must have been populated from the cmdline, validate them.
if err := nodeOptions.Validate(); err != nil {
klog.Error(err.Error())
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
llog.Error(err.Error())
llog.FlushAndExit(llog.ExitFlushTimeout, 1)
}
}

err = logsapi.ValidateAndApply(c, featureGate)
err = llogApi.ValidateAndApply(c, featureGate)
if err != nil {
klog.ErrorS(err, "failed to validate and apply logging configuration")
llog.ErrorS(err, "failed to validate and apply logging configuration")
}

if *version {
versionInfo, err := driver.GetVersionJSON()
versionInfo, err := lsdriver.GetVersionJSON()
if err != nil {
klog.ErrorS(err, "failed to get version")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
llog.ErrorS(err, "failed to get version")
llog.FlushAndExit(llog.ExitFlushTimeout, 1)
}
fmt.Println(versionInfo)
llog.InfoS(versionInfo)
osExit(0)
}

if *toStderr {
klog.SetOutput(os.Stderr)
llog.SetOutput(los.Stderr)
}

config, err := getConfigFromEnv()
if err != nil {
klog.Errorf("Failed to get config from files: %v", err)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
llog.Errorf("Failed to get config from files: %v", err)
llog.FlushAndExit(llog.ExitFlushTimeout, 1)
}

options := &Options{
DriverMode: driver.Mode(cmd),
DriverMode: lsdriver.Mode(cmd),
ServerOptions: &serverOptions,
ControllerOptions: &controllerOptions,
NodeOptions: &nodeOptions,
Expand All @@ -223,13 +230,13 @@ func GetOptions(fs *flag.FlagSet) *Options {
}

func getConfigFromEnv() (*Global, error) {
clientID := os.Getenv("VNGCLOUD_ACCESS_KEY_ID")
clientSecret := os.Getenv("VNGCLOUD_SECRET_ACCESS_KEY")
identityEndpoint := os.Getenv("VNGCLOUD_IDENTITY_ENDPOINT")
vserverEndpoint := os.Getenv("VNGCLOUD_VSERVER_ENDPOINT")
clientID := los.Getenv("VNGCLOUD_ACCESS_KEY_ID")
clientSecret := los.Getenv("VNGCLOUD_SECRET_ACCESS_KEY")
identityEndpoint := los.Getenv("VNGCLOUD_IDENTITY_ENDPOINT")
vserverEndpoint := los.Getenv("VNGCLOUD_VSERVER_ENDPOINT")

if clientID == "" || clientSecret == "" || identityEndpoint == "" || vserverEndpoint == "" {
return nil, fmt.Errorf("missing required environment variables")
return nil, lfmt.Errorf("missing required environment variables")
}

var cfg Global
Expand Down
Loading

0 comments on commit bb771cc

Please sign in to comment.