Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for dividing table for internal storage #647

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/storage/internalstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ const (
databasePasswordEnvName = "DB_PASSWORD"
)

type DivisionPolicy string

const (
DivisionPolicyNone DivisionPolicy = "None"
DivisionPolicyGroupVersionResource DivisionPolicy = "GVR"
DivisionPolicyCustom DivisionPolicy = "Custom"
)

type Config struct {
Type string `env:"DB_TYPE" required:"true"`
DSN string `env:"DB_DSN"`
Expand All @@ -48,6 +56,9 @@ type Config struct {

Params map[string]string `yaml:"params"`

SkipAutoMigration bool `yaml:"skipAutoMigration"` // If set to false, no tables will be created
DivisionPolicy DivisionPolicy `yaml:"divisionPolicy"`

Log *LogConfig `yaml:"log"`
}

Expand Down
14 changes: 11 additions & 3 deletions pkg/storage/internalstorage/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,19 @@ func NewStorageFactory(configPath string) (storage.StorageFactory, error) {
sqlDB.SetMaxOpenConns(connPool.MaxOpenConns)
sqlDB.SetConnMaxLifetime(connPool.ConnMaxLifetime)

if err := db.AutoMigrate(&Resource{}); err != nil {
calvin0327 marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
if !cfg.SkipAutoMigration && (cfg.DivisionPolicy == DivisionPolicyNone || cfg.DivisionPolicy == "") {
if exist := db.Migrator().HasTable("resources"); !exist {
if err := db.AutoMigrate(&Resource{}); err != nil {
return nil, err
}
}
}

return &StorageFactory{db}, nil
return &StorageFactory{
db: db,
SkipAutoMigration: cfg.SkipAutoMigration,
DivisionPolicy: cfg.DivisionPolicy,
}, nil
}

func newLogger(cfg *Config) (logger.Interface, error) {
Expand Down
50 changes: 27 additions & 23 deletions pkg/storage/internalstorage/resource_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import (
)

type ResourceStorage struct {
db *gorm.DB
codec runtime.Codec

db *gorm.DB
codec runtime.Codec
storageGroupResource schema.GroupResource
storageVersion schema.GroupVersion
memoryVersion schema.GroupVersion
Expand Down Expand Up @@ -116,14 +115,17 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim
updatedResource["deleted_at"] = sql.NullTime{Time: deletedAt.Time, Valid: true}
}

result := s.db.WithContext(ctx).Model(&Resource{}).Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": metaobj.GetNamespace(),
"name": metaobj.GetName(),
}).Updates(updatedResource)
result := s.db.WithContext(ctx).
Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
calvin0327 marked this conversation as resolved.
Show resolved Hide resolved
"namespace": metaobj.GetNamespace(),
"name": metaobj.GetName(),
}).
Updates(updatedResource)

return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
}

Expand All @@ -143,8 +145,8 @@ func (s *ResourceStorage) ConvertDeletedObject(obj interface{}) (runtime.Object,
return &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name}}, nil
}

func (s *ResourceStorage) deleteObject(cluster, namespace, name string) *gorm.DB {
return s.db.Model(&Resource{}).Where(map[string]interface{}{
func (s *ResourceStorage) deleteObject(ctx context.Context, cluster, namespace, name string) *gorm.DB {
return s.db.WithContext(ctx).Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
Expand All @@ -160,21 +162,23 @@ func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtim
return err
}

if result := s.deleteObject(cluster, metaobj.GetNamespace(), metaobj.GetName()); result.Error != nil {
if result := s.deleteObject(ctx, cluster, metaobj.GetNamespace(), metaobj.GetName()); result.Error != nil {
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
}
return nil
}

func (s *ResourceStorage) genGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB {
return s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": namespace,
"name": name,
})
return s.db.WithContext(ctx).
Select("object").
Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": namespace,
"name": name,
})
}

func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error {
Expand All @@ -199,7 +203,7 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna
result = &ResourceMetadataList{}
}

query := s.db.WithContext(ctx).Model(&Resource{})
query := s.db.WithContext(ctx)
query = query.Where(map[string]interface{}{
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/internalstorage/resource_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func TestResourceStorage_deleteObject(t *testing.T) {
postgreSQL := postgresDB.Session(&gorm.Session{SkipDefaultTransaction: true}).ToSQL(
func(tx *gorm.DB) *gorm.DB {
rs := newTestResourceStorage(tx, test.resource)
return rs.deleteObject(test.cluster, test.namespace, test.resourceName)
return rs.deleteObject(context.TODO(), test.cluster, test.namespace, test.resourceName)
})

if postgreSQL != test.expected.postgres {
Expand All @@ -354,7 +354,7 @@ func TestResourceStorage_deleteObject(t *testing.T) {
mysqlSQL := mysqlDBs[version].Session(&gorm.Session{SkipDefaultTransaction: true}).ToSQL(
func(tx *gorm.DB) *gorm.DB {
rs := newTestResourceStorage(tx, test.resource)
return rs.deleteObject(test.cluster, test.namespace, test.resourceName)
return rs.deleteObject(context.TODO(), test.cluster, test.namespace, test.resourceName)
})

if mysqlSQL != test.expected.mysql {
Expand Down Expand Up @@ -465,7 +465,7 @@ func TestResourceStorage_Update(t *testing.T) {

func newTestResourceStorage(db *gorm.DB, storageGVK schema.GroupVersionResource) *ResourceStorage {
return &ResourceStorage{
db: db,
db: db.Table("resources").Model(&Resource{}),
storageGroupResource: storageGVK.GroupResource(),
storageVersion: storageGVK.GroupVersion(),
}
Expand Down
121 changes: 103 additions & 18 deletions pkg/storage/internalstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internalstorage
import (
"context"
"fmt"
"strings"

"gorm.io/gorm"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -12,18 +13,51 @@ import (
)

type StorageFactory struct {
db *gorm.DB
db *gorm.DB
SkipAutoMigration bool
DivisionPolicy DivisionPolicy
}

func (s *StorageFactory) AutoMigrate() error {
return nil
}

func (s *StorageFactory) GetSupportedRequestVerbs() []string {
return []string{"get", "list"}
}

func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfig) (storage.ResourceStorage, error) {
return &ResourceStorage{
db: s.db,
codec: config.Codec,
gvr := schema.GroupVersionResource{
Group: config.StorageGroupResource.Group,
Version: config.StorageVersion.Version,
Resource: config.StorageGroupResource.Resource,
}
table := s.tableName(gvr)

var model interface{}
switch s.DivisionPolicy {
case DivisionPolicyGroupVersionResource:
model = &GroupVersionResource{}
if !s.SkipAutoMigration {
if exist := s.db.Migrator().HasTable(table); !exist {
if err := s.db.AutoMigrate(&GroupVersionResource{}); err != nil {
return nil, err
}

if err := s.db.Migrator().RenameTable("group_version_resources", table); err != nil {
if !s.db.Migrator().HasTable(table) {
return nil, err
}
}
}
}
default:
model = &Resource{}
}
calvin0327 marked this conversation as resolved.
Show resolved Hide resolved

return &ResourceStorage{
db: s.db.Table(table).Model(model),
codec: config.Codec,
storageGroupResource: config.StorageGroupResource,
storageVersion: config.StorageVersion,
memoryVersion: config.MemoryVersion,
Expand All @@ -40,12 +74,21 @@ func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionRes
}

func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) {
tables, err := s.db.Migrator().GetTables()
if err != nil {
return nil, err
}

var resources []Resource
result := s.db.WithContext(ctx).Select("group", "version", "resource", "namespace", "name", "resource_version").
Where(map[string]interface{}{"cluster": cluster}).
Find(&resources)
if result.Error != nil {
return nil, InterpretDBError(cluster, result.Error)
for _, table := range tables {
result := s.db.WithContext(ctx).
Table(table).
Select("group", "version", "resource", "namespace", "name", "resource_version").
Where(map[string]interface{}{"cluster": cluster}).
Find(&resources)
if result.Error != nil {
return nil, InterpretDBError(cluster, result.Error)
}
}

resourceversions := make(map[schema.GroupVersionResource]map[string]interface{})
Expand All @@ -67,18 +110,41 @@ func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string
}

func (s *StorageFactory) CleanCluster(ctx context.Context, cluster string) error {
result := s.db.WithContext(ctx).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{})
return InterpretDBError(cluster, result.Error)
tables, err := s.db.Migrator().GetTables()
if err != nil {
return err
}

for _, table := range tables {
result := s.db.WithContext(ctx).Table(table).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{})
if result.Error != nil {
return InterpretDBError(cluster, result.Error)
}
}
calvin0327 marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster string, gvr schema.GroupVersionResource) error {
result := s.db.WithContext(ctx).Where(map[string]interface{}{
"cluster": cluster,
"group": gvr.Group,
"version": gvr.Version,
"resource": gvr.Resource,
}).Delete(&Resource{})
return InterpretDBError(fmt.Sprintf("%s/%s", cluster, gvr), result.Error)
err := s.db.Transaction(func(db *gorm.DB) error {
result := s.db.WithContext(ctx).
Table(s.tableName(gvr)).
Where(map[string]interface{}{
"cluster": cluster,
"group": gvr.Group,
"version": gvr.Version,
"resource": gvr.Resource,
}).
Delete(&Resource{})

if result.Error != nil {
return result.Error
}

return nil
})

return InterpretDBError(fmt.Sprintf("%s/%s", cluster, gvr), err)
}

func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*internal.CollectionResource, error) {
Expand All @@ -92,3 +158,22 @@ func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*interna
func (s *StorageFactory) PrepareCluster(cluster string) error {
return nil
}

// GenerateTableFor return table name using gvr string
func GenerateTableFor(gvr schema.GroupVersionResource) string {
if gvr.Group == "" {
return fmt.Sprintf("%s_%s", gvr.Version, gvr.Resource)
}

group := strings.ReplaceAll(gvr.Group, ".", "_")
return fmt.Sprintf("%s_%s_%s", group, gvr.Version, gvr.Resource)
}

func (s *StorageFactory) tableName(gvr schema.GroupVersionResource) string {
table := "resources"
if s.DivisionPolicy == DivisionPolicyGroupVersionResource {
table = GenerateTableFor(gvr)
}

return table
}
24 changes: 23 additions & 1 deletion pkg/storage/internalstorage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Resource struct {
ID uint `gorm:"primaryKey"`

Group string `gorm:"size:63;not null;uniqueIndex:uni_group_version_resource_cluster_namespace_name;index:idx_group_version_resource_namespace_name;index:idx_group_version_resource_name"`
Version string `gorm:"size:15;not null;uniqueIndex:uni_group_version_resource_cluster_namespace_name;index:idx_group_version_resource_namespace_name;index:idx_group_version_resource_name"`
Version string `gorm:"size:14;not null;uniqueIndex:uni_group_version_resource_cluster_namespace_name;index:idx_group_version_resource_namespace_name;index:idx_group_version_resource_name"`
Resource string `gorm:"size:63;not null;uniqueIndex:uni_group_version_resource_cluster_namespace_name;index:idx_group_version_resource_namespace_name;index:idx_group_version_resource_name"`
Kind string `gorm:"size:63;not null"`

Expand Down Expand Up @@ -99,6 +99,28 @@ func (res Resource) ConvertTo(codec runtime.Codec, object runtime.Object) (runti
return obj, err
}

type GroupVersionResource struct {
ID uint `gorm:"primaryKey"`

Group string `gorm:"size:63;not null"`
Version string `gorm:"size:14;not null"`
Resource string `gorm:"size:63;not null"`
Kind string `gorm:"size:63;not null"`

Cluster string `gorm:"size:253;not null;uniqueIndex:uni_group_version_resource_cluster_namespace_name,length:100;index:idx_cluster"`
Namespace string `gorm:"size:253;not null;uniqueIndex:uni_group_version_resource_cluster_namespace_name,length:50;index:idx_group_version_resource_namespace_name"`
Name string `gorm:"size:253;not null;uniqueIndex:uni_group_version_resource_cluster_namespace_name,length:100;index:idx_group_version_resource_namespace_name;index:idx_group_version_resource_name"`
OwnerUID types.UID `gorm:"column:owner_uid;size:36;not null;default:''"`
UID types.UID `gorm:"size:36;not null"`
ResourceVersion string `gorm:"size:30;not null"`

Object datatypes.JSON `gorm:"not null"`

CreatedAt time.Time `gorm:"not null"`
SyncedAt time.Time `gorm:"not null;autoUpdateTime"`
DeletedAt sql.NullTime
}

type ResourceMetadata struct {
ResourceType `gorm:"embedded"`

Expand Down
Loading