From 29c0306e76e7bd30d0b9e011a1ebb72f1bf17d0c Mon Sep 17 00:00:00 2001 From: calvin Date: Thu, 28 Dec 2023 10:38:11 +0800 Subject: [PATCH 1/2] add extra config for dividing table Signed-off-by: calvin --- pkg/storage/internalstorage/config.go | 31 +++++++++++++++++++++++++ pkg/storage/internalstorage/register.go | 4 ---- pkg/storage/internalstorage/storage.go | 26 ++++++++++++++++++++- 3 files changed, 56 insertions(+), 5 deletions(-) diff --git a/pkg/storage/internalstorage/config.go b/pkg/storage/internalstorage/config.go index 81823c32b..3d175bcbe 100644 --- a/pkg/storage/internalstorage/config.go +++ b/pkg/storage/internalstorage/config.go @@ -15,6 +15,8 @@ import ( "gopkg.in/natefinch/lumberjack.v2" "gorm.io/gorm/logger" "k8s.io/klog/v2" + + clusterv1alpha2 "github.com/clusterpedia-io/api/cluster/v1alpha2" ) const ( @@ -24,6 +26,14 @@ const ( databasePasswordEnvName = "DB_PASSWORD" ) +type DivisionPolicy string + +const ( + DivisionPolicyNone DivisionPolicy = "None" + DivisionPolicyGroupResource DivisionPolicy = "GroupResource" + DivisionPolicyCustom DivisionPolicy = "Custom" +) + type Config struct { Type string `env:"DB_TYPE" required:"true"` DSN string `env:"DB_DSN"` @@ -48,9 +58,30 @@ type Config struct { Params map[string]string `yaml:"params"` + AutoMigration *bool `yaml:"autoMigration"` // If set to false, no tables will be created + DivisionPolicy DivisionPolicy `yaml:"divisionPolicy"` + Mapper []ResourceMapper `yaml:"mapper"` // Only DivisionPolicy is DivisionPolicyCustom it need to specify the mapping between resource and table + Log *LogConfig `yaml:"log"` } +type ResourceMapper struct { + Table *Table `yaml:"table"` + Resources []clusterv1alpha2.ClusterGroupResources `yaml:"resources"` +} + +type Table struct { + Name string `yaml:"name"` + ExtraFields []ExtraField `yaml:"extraFields"` +} + +type ExtraField struct { + Name string `yaml:"name"` + PlainPath string `yaml:"plainPath"` + Type string `yaml:"type"` + Index string `yaml:"index"` +} + type LogConfig struct { Stdout bool `yaml:"stdout"` Level string `yaml:"level"` diff --git a/pkg/storage/internalstorage/register.go b/pkg/storage/internalstorage/register.go index a0bb10375..42061499b 100644 --- a/pkg/storage/internalstorage/register.go +++ b/pkg/storage/internalstorage/register.go @@ -93,10 +93,6 @@ func NewStorageFactory(configPath string) (storage.StorageFactory, error) { sqlDB.SetMaxOpenConns(connPool.MaxOpenConns) sqlDB.SetConnMaxLifetime(connPool.ConnMaxLifetime) - if err := db.AutoMigrate(&Resource{}); err != nil { - return nil, err - } - return &StorageFactory{db}, nil } diff --git a/pkg/storage/internalstorage/storage.go b/pkg/storage/internalstorage/storage.go index ce99ca99d..ea221bf2d 100644 --- a/pkg/storage/internalstorage/storage.go +++ b/pkg/storage/internalstorage/storage.go @@ -12,7 +12,31 @@ import ( ) type StorageFactory struct { - db *gorm.DB + db *gorm.DB + AutoMigration *bool + DivisionPolicy DivisionPolicy + Mapper []ResourceMapper +} + +func (s *StorageFactory) AutoMigrate() error { + if s.AutoMigration != nil && *s.AutoMigration { + switch s.DivisionPolicy { + if err := s.db.AutoMigrate(&Resource{}); err != nil { + return err + } + case "", DivisionPolicyNone: + case DivisionPolicyGroupResource: + + } + + if s.DivisionPolicy == "" || s.DivisionPolicy == DivisionPolicyNone { + if err := s.db.AutoMigrate(&Resource{}); err != nil { + return err + } + } + } + + return nil } func (s *StorageFactory) GetSupportedRequestVerbs() []string { From 723c0d2098cab3884462dd724c355b07f24b024c Mon Sep 17 00:00:00 2001 From: calvin Date: Mon, 29 Jan 2024 11:43:30 +0800 Subject: [PATCH 2/2] support split table for internal storage Signed-off-by: calvin --- pkg/storage/internalstorage/config.go | 30 +--- pkg/storage/internalstorage/register.go | 14 +- .../internalstorage/resource_storage.go | 50 ++++--- .../internalstorage/resource_storage_test.go | 6 +- pkg/storage/internalstorage/storage.go | 137 +++++++++++++----- pkg/storage/internalstorage/types.go | 24 ++- 6 files changed, 170 insertions(+), 91 deletions(-) diff --git a/pkg/storage/internalstorage/config.go b/pkg/storage/internalstorage/config.go index 3d175bcbe..74d9294d4 100644 --- a/pkg/storage/internalstorage/config.go +++ b/pkg/storage/internalstorage/config.go @@ -15,8 +15,6 @@ import ( "gopkg.in/natefinch/lumberjack.v2" "gorm.io/gorm/logger" "k8s.io/klog/v2" - - clusterv1alpha2 "github.com/clusterpedia-io/api/cluster/v1alpha2" ) const ( @@ -29,9 +27,9 @@ const ( type DivisionPolicy string const ( - DivisionPolicyNone DivisionPolicy = "None" - DivisionPolicyGroupResource DivisionPolicy = "GroupResource" - DivisionPolicyCustom DivisionPolicy = "Custom" + DivisionPolicyNone DivisionPolicy = "None" + DivisionPolicyGroupVersionResource DivisionPolicy = "GVR" + DivisionPolicyCustom DivisionPolicy = "Custom" ) type Config struct { @@ -58,30 +56,12 @@ type Config struct { Params map[string]string `yaml:"params"` - AutoMigration *bool `yaml:"autoMigration"` // If set to false, no tables will be created - DivisionPolicy DivisionPolicy `yaml:"divisionPolicy"` - Mapper []ResourceMapper `yaml:"mapper"` // Only DivisionPolicy is DivisionPolicyCustom it need to specify the mapping between resource and table + SkipAutoMigration bool `yaml:"skipAutoMigration"` // If set to false, no tables will be created + DivisionPolicy DivisionPolicy `yaml:"divisionPolicy"` Log *LogConfig `yaml:"log"` } -type ResourceMapper struct { - Table *Table `yaml:"table"` - Resources []clusterv1alpha2.ClusterGroupResources `yaml:"resources"` -} - -type Table struct { - Name string `yaml:"name"` - ExtraFields []ExtraField `yaml:"extraFields"` -} - -type ExtraField struct { - Name string `yaml:"name"` - PlainPath string `yaml:"plainPath"` - Type string `yaml:"type"` - Index string `yaml:"index"` -} - type LogConfig struct { Stdout bool `yaml:"stdout"` Level string `yaml:"level"` diff --git a/pkg/storage/internalstorage/register.go b/pkg/storage/internalstorage/register.go index 42061499b..9f658b783 100644 --- a/pkg/storage/internalstorage/register.go +++ b/pkg/storage/internalstorage/register.go @@ -93,7 +93,19 @@ func NewStorageFactory(configPath string) (storage.StorageFactory, error) { sqlDB.SetMaxOpenConns(connPool.MaxOpenConns) sqlDB.SetConnMaxLifetime(connPool.ConnMaxLifetime) - return &StorageFactory{db}, nil + if !cfg.SkipAutoMigration && (cfg.DivisionPolicy == DivisionPolicyNone || cfg.DivisionPolicy == "") { + if exist := db.Migrator().HasTable("resources"); !exist { + if err := db.AutoMigrate(&Resource{}); err != nil { + return nil, err + } + } + } + + return &StorageFactory{ + db: db, + SkipAutoMigration: cfg.SkipAutoMigration, + DivisionPolicy: cfg.DivisionPolicy, + }, nil } func newLogger(cfg *Config) (logger.Interface, error) { diff --git a/pkg/storage/internalstorage/resource_storage.go b/pkg/storage/internalstorage/resource_storage.go index a42f758c6..1810705fd 100644 --- a/pkg/storage/internalstorage/resource_storage.go +++ b/pkg/storage/internalstorage/resource_storage.go @@ -27,9 +27,8 @@ import ( ) type ResourceStorage struct { - db *gorm.DB - codec runtime.Codec - + db *gorm.DB + codec runtime.Codec storageGroupResource schema.GroupResource storageVersion schema.GroupVersion memoryVersion schema.GroupVersion @@ -116,14 +115,17 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim updatedResource["deleted_at"] = sql.NullTime{Time: deletedAt.Time, Valid: true} } - result := s.db.WithContext(ctx).Model(&Resource{}).Where(map[string]interface{}{ - "cluster": cluster, - "group": s.storageGroupResource.Group, - "version": s.storageVersion.Version, - "resource": s.storageGroupResource.Resource, - "namespace": metaobj.GetNamespace(), - "name": metaobj.GetName(), - }).Updates(updatedResource) + result := s.db.WithContext(ctx). + Where(map[string]interface{}{ + "cluster": cluster, + "group": s.storageGroupResource.Group, + "version": s.storageVersion.Version, + "resource": s.storageGroupResource.Resource, + "namespace": metaobj.GetNamespace(), + "name": metaobj.GetName(), + }). + Updates(updatedResource) + return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error) } @@ -143,8 +145,8 @@ func (s *ResourceStorage) ConvertDeletedObject(obj interface{}) (runtime.Object, return &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name}}, nil } -func (s *ResourceStorage) deleteObject(cluster, namespace, name string) *gorm.DB { - return s.db.Model(&Resource{}).Where(map[string]interface{}{ +func (s *ResourceStorage) deleteObject(ctx context.Context, cluster, namespace, name string) *gorm.DB { + return s.db.WithContext(ctx).Where(map[string]interface{}{ "cluster": cluster, "group": s.storageGroupResource.Group, "version": s.storageVersion.Version, @@ -160,21 +162,23 @@ func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtim return err } - if result := s.deleteObject(cluster, metaobj.GetNamespace(), metaobj.GetName()); result.Error != nil { + if result := s.deleteObject(ctx, cluster, metaobj.GetNamespace(), metaobj.GetName()); result.Error != nil { return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error) } return nil } func (s *ResourceStorage) genGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB { - return s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(map[string]interface{}{ - "cluster": cluster, - "group": s.storageGroupResource.Group, - "version": s.storageVersion.Version, - "resource": s.storageGroupResource.Resource, - "namespace": namespace, - "name": name, - }) + return s.db.WithContext(ctx). + Select("object"). + Where(map[string]interface{}{ + "cluster": cluster, + "group": s.storageGroupResource.Group, + "version": s.storageVersion.Version, + "resource": s.storageGroupResource.Resource, + "namespace": namespace, + "name": name, + }) } func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error { @@ -199,7 +203,7 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna result = &ResourceMetadataList{} } - query := s.db.WithContext(ctx).Model(&Resource{}) + query := s.db.WithContext(ctx) query = query.Where(map[string]interface{}{ "group": s.storageGroupResource.Group, "version": s.storageVersion.Version, diff --git a/pkg/storage/internalstorage/resource_storage_test.go b/pkg/storage/internalstorage/resource_storage_test.go index 4e44168d9..d60d4026d 100644 --- a/pkg/storage/internalstorage/resource_storage_test.go +++ b/pkg/storage/internalstorage/resource_storage_test.go @@ -339,7 +339,7 @@ func TestResourceStorage_deleteObject(t *testing.T) { postgreSQL := postgresDB.Session(&gorm.Session{SkipDefaultTransaction: true}).ToSQL( func(tx *gorm.DB) *gorm.DB { rs := newTestResourceStorage(tx, test.resource) - return rs.deleteObject(test.cluster, test.namespace, test.resourceName) + return rs.deleteObject(context.TODO(), test.cluster, test.namespace, test.resourceName) }) if postgreSQL != test.expected.postgres { @@ -354,7 +354,7 @@ func TestResourceStorage_deleteObject(t *testing.T) { mysqlSQL := mysqlDBs[version].Session(&gorm.Session{SkipDefaultTransaction: true}).ToSQL( func(tx *gorm.DB) *gorm.DB { rs := newTestResourceStorage(tx, test.resource) - return rs.deleteObject(test.cluster, test.namespace, test.resourceName) + return rs.deleteObject(context.TODO(), test.cluster, test.namespace, test.resourceName) }) if mysqlSQL != test.expected.mysql { @@ -465,7 +465,7 @@ func TestResourceStorage_Update(t *testing.T) { func newTestResourceStorage(db *gorm.DB, storageGVK schema.GroupVersionResource) *ResourceStorage { return &ResourceStorage{ - db: db, + db: db.Table("resources").Model(&Resource{}), storageGroupResource: storageGVK.GroupResource(), storageVersion: storageGVK.GroupVersion(), } diff --git a/pkg/storage/internalstorage/storage.go b/pkg/storage/internalstorage/storage.go index ea221bf2d..4043c1b38 100644 --- a/pkg/storage/internalstorage/storage.go +++ b/pkg/storage/internalstorage/storage.go @@ -3,6 +3,7 @@ package internalstorage import ( "context" "fmt" + "strings" "gorm.io/gorm" "k8s.io/apimachinery/pkg/runtime/schema" @@ -12,30 +13,12 @@ import ( ) type StorageFactory struct { - db *gorm.DB - AutoMigration *bool - DivisionPolicy DivisionPolicy - Mapper []ResourceMapper + db *gorm.DB + SkipAutoMigration bool + DivisionPolicy DivisionPolicy } func (s *StorageFactory) AutoMigrate() error { - if s.AutoMigration != nil && *s.AutoMigration { - switch s.DivisionPolicy { - if err := s.db.AutoMigrate(&Resource{}); err != nil { - return err - } - case "", DivisionPolicyNone: - case DivisionPolicyGroupResource: - - } - - if s.DivisionPolicy == "" || s.DivisionPolicy == DivisionPolicyNone { - if err := s.db.AutoMigrate(&Resource{}); err != nil { - return err - } - } - } - return nil } @@ -44,10 +27,37 @@ func (s *StorageFactory) GetSupportedRequestVerbs() []string { } func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfig) (storage.ResourceStorage, error) { - return &ResourceStorage{ - db: s.db, - codec: config.Codec, + gvr := schema.GroupVersionResource{ + Group: config.StorageGroupResource.Group, + Version: config.StorageVersion.Version, + Resource: config.StorageGroupResource.Resource, + } + table := s.tableName(gvr) + + var model interface{} + switch s.DivisionPolicy { + case DivisionPolicyGroupVersionResource: + model = &GroupVersionResource{} + if !s.SkipAutoMigration { + if exist := s.db.Migrator().HasTable(table); !exist { + if err := s.db.AutoMigrate(&GroupVersionResource{}); err != nil { + return nil, err + } + + if err := s.db.Migrator().RenameTable("group_version_resources", table); err != nil { + if !s.db.Migrator().HasTable(table) { + return nil, err + } + } + } + } + default: + model = &Resource{} + } + return &ResourceStorage{ + db: s.db.Table(table).Model(model), + codec: config.Codec, storageGroupResource: config.StorageGroupResource, storageVersion: config.StorageVersion, memoryVersion: config.MemoryVersion, @@ -64,12 +74,21 @@ func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionRes } func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) { + tables, err := s.db.Migrator().GetTables() + if err != nil { + return nil, err + } + var resources []Resource - result := s.db.WithContext(ctx).Select("group", "version", "resource", "namespace", "name", "resource_version"). - Where(map[string]interface{}{"cluster": cluster}). - Find(&resources) - if result.Error != nil { - return nil, InterpretDBError(cluster, result.Error) + for _, table := range tables { + result := s.db.WithContext(ctx). + Table(table). + Select("group", "version", "resource", "namespace", "name", "resource_version"). + Where(map[string]interface{}{"cluster": cluster}). + Find(&resources) + if result.Error != nil { + return nil, InterpretDBError(cluster, result.Error) + } } resourceversions := make(map[schema.GroupVersionResource]map[string]interface{}) @@ -91,18 +110,41 @@ func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string } func (s *StorageFactory) CleanCluster(ctx context.Context, cluster string) error { - result := s.db.WithContext(ctx).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{}) - return InterpretDBError(cluster, result.Error) + tables, err := s.db.Migrator().GetTables() + if err != nil { + return err + } + + for _, table := range tables { + result := s.db.WithContext(ctx).Table(table).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{}) + if result.Error != nil { + return InterpretDBError(cluster, result.Error) + } + } + + return nil } func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster string, gvr schema.GroupVersionResource) error { - result := s.db.WithContext(ctx).Where(map[string]interface{}{ - "cluster": cluster, - "group": gvr.Group, - "version": gvr.Version, - "resource": gvr.Resource, - }).Delete(&Resource{}) - return InterpretDBError(fmt.Sprintf("%s/%s", cluster, gvr), result.Error) + err := s.db.Transaction(func(db *gorm.DB) error { + result := s.db.WithContext(ctx). + Table(s.tableName(gvr)). + Where(map[string]interface{}{ + "cluster": cluster, + "group": gvr.Group, + "version": gvr.Version, + "resource": gvr.Resource, + }). + Delete(&Resource{}) + + if result.Error != nil { + return result.Error + } + + return nil + }) + + return InterpretDBError(fmt.Sprintf("%s/%s", cluster, gvr), err) } func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*internal.CollectionResource, error) { @@ -116,3 +158,22 @@ func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*interna func (s *StorageFactory) PrepareCluster(cluster string) error { return nil } + +// GenerateTableFor return table name using gvr string +func GenerateTableFor(gvr schema.GroupVersionResource) string { + if gvr.Group == "" { + return fmt.Sprintf("%s_%s", gvr.Version, gvr.Resource) + } + + group := strings.ReplaceAll(gvr.Group, ".", "_") + return fmt.Sprintf("%s_%s_%s", group, gvr.Version, gvr.Resource) +} + +func (s *StorageFactory) tableName(gvr schema.GroupVersionResource) string { + table := "resources" + if s.DivisionPolicy == DivisionPolicyGroupVersionResource { + table = GenerateTableFor(gvr) + } + + return table +} diff --git a/pkg/storage/internalstorage/types.go b/pkg/storage/internalstorage/types.go index 30daebab9..80387c726 100644 --- a/pkg/storage/internalstorage/types.go +++ b/pkg/storage/internalstorage/types.go @@ -51,7 +51,7 @@ type Resource struct { ID uint `gorm:"primaryKey"` Group string `gorm:"size:63;not null;uniqueIndex:uni_group_version_resource_cluster_namespace_name;index:idx_group_version_resource_namespace_name;index:idx_group_version_resource_name"` - Version string `gorm:"size:15;not null;uniqueIndex:uni_group_version_resource_cluster_namespace_name;index:idx_group_version_resource_namespace_name;index:idx_group_version_resource_name"` + Version string `gorm:"size:14;not null;uniqueIndex:uni_group_version_resource_cluster_namespace_name;index:idx_group_version_resource_namespace_name;index:idx_group_version_resource_name"` Resource string `gorm:"size:63;not null;uniqueIndex:uni_group_version_resource_cluster_namespace_name;index:idx_group_version_resource_namespace_name;index:idx_group_version_resource_name"` Kind string `gorm:"size:63;not null"` @@ -99,6 +99,28 @@ func (res Resource) ConvertTo(codec runtime.Codec, object runtime.Object) (runti return obj, err } +type GroupVersionResource struct { + ID uint `gorm:"primaryKey"` + + Group string `gorm:"size:63;not null"` + Version string `gorm:"size:14;not null"` + Resource string `gorm:"size:63;not null"` + Kind string `gorm:"size:63;not null"` + + Cluster string `gorm:"size:253;not null;uniqueIndex:uni_group_version_resource_cluster_namespace_name,length:100;index:idx_cluster"` + Namespace string `gorm:"size:253;not null;uniqueIndex:uni_group_version_resource_cluster_namespace_name,length:50;index:idx_group_version_resource_namespace_name"` + Name string `gorm:"size:253;not null;uniqueIndex:uni_group_version_resource_cluster_namespace_name,length:100;index:idx_group_version_resource_namespace_name;index:idx_group_version_resource_name"` + OwnerUID types.UID `gorm:"column:owner_uid;size:36;not null;default:''"` + UID types.UID `gorm:"size:36;not null"` + ResourceVersion string `gorm:"size:30;not null"` + + Object datatypes.JSON `gorm:"not null"` + + CreatedAt time.Time `gorm:"not null"` + SyncedAt time.Time `gorm:"not null;autoUpdateTime"` + DeletedAt sql.NullTime +} + type ResourceMetadata struct { ResourceType `gorm:"embedded"`