Skip to content

Commit

Permalink
divide multiple tables by gvr and fix watch bug
Browse files Browse the repository at this point in the history
Co-authored-by: zhangyongxi <[email protected]>
Co-authored-by: wuyingjun <[email protected]>
Co-authored-by: zhouhao <[email protected]>
Signed-off-by: baoyinghai <[email protected]>
  • Loading branch information
4 people committed Nov 15, 2023
1 parent e426f11 commit a62af50
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 140 deletions.
14 changes: 14 additions & 0 deletions pkg/storage/internalstorage/divide_table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package internalstorage

import (
"fmt"
"strings"

"k8s.io/apimachinery/pkg/runtime/schema"
)

// GetTable return table name using gvr string
func GetTable(gvr schema.GroupVersionResource) string {
group := strings.ReplaceAll(gvr.Group, ".", "_")
return fmt.Sprintf("%s_%s_%s", group, gvr.Version, gvr.Resource)
}
4 changes: 0 additions & 4 deletions pkg/storage/internalstorage/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,6 @@ func NewStorageFactory(configPath string) (storage.StorageFactory, error) {
sqlDB.SetMaxOpenConns(connPool.MaxOpenConns)
sqlDB.SetConnMaxLifetime(connPool.ConnMaxLifetime)

if err := db.AutoMigrate(&Resource{}); err != nil {
return nil, err
}

return &StorageFactory{db}, nil
}

Expand Down
89 changes: 42 additions & 47 deletions pkg/storage/internalstorage/resource_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,12 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim
condition := map[string]interface{}{
"namespace": metaobj.GetNamespace(),
"name": metaobj.GetName(),
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"deleted": true,
}
if cluster != "" {
condition["cluster"] = cluster
}
dbResult := s.db.Model(&Resource{}).Where(condition).Delete(&Resource{})
dbResult := s.db.Model(&Resource{}).Table(GetTable(newGvr(s))).Where(condition).Delete(&Resource{})
if dbResult.Error != nil {
err = InterpretResourceDBError(cluster, metaobj.GetName(), dbResult.Error)
return fmt.Errorf("[Create]: Object %s/%s has been created failed in step one, err: %v", metaobj.GetName(), metaobj.GetNamespace(), err)
Expand Down Expand Up @@ -117,7 +114,7 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim
resource.DeletedAt = sql.NullTime{Time: deletedAt.Time, Valid: true}
}

result := s.db.WithContext(ctx).Create(&resource)
result := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Create(&resource)
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
}

Expand Down Expand Up @@ -162,23 +159,17 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim
updatedResource["deleted_at"] = sql.NullTime{Time: deletedAt.Time, Valid: true}
}

result := s.db.WithContext(ctx).Model(&Resource{}).Where(map[string]interface{}{
result := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": metaobj.GetNamespace(),
"name": metaobj.GetName(),
}).Updates(updatedResource)
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
}

func (s *ResourceStorage) deleteObject(cluster, namespace, name string) *gorm.DB {
return s.db.Model(&Resource{}).Where(map[string]interface{}{
return s.db.Table(GetTable(newGvr(s))).Model(&Resource{}).Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": namespace,
"name": name,
}).Delete(&Resource{})
Expand Down Expand Up @@ -211,32 +202,26 @@ func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtim
condition := map[string]interface{}{
"cluster": cluster,
"namespace": metaobj.GetNamespace(),
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
}
if metaobj.GetName() != "" {
condition["name"] = metaobj.GetName()
}

result := s.db.WithContext(ctx).Model(&Resource{}).Where(condition).Updates(updatedResource)
result := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Where(condition).Updates(updatedResource)
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
}

func (s *ResourceStorage) genGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB {
condition := map[string]interface{}{
"namespace": namespace,
"name": name,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"deleted": false,
}

if cluster != "" {
condition["cluster"] = cluster
}
return s.db.WithContext(ctx).Model(&Resource{}).Select("cluster_resource_version, object").Where(condition)
return s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Select("cluster_resource_version, object").Where(condition)
}

func (s *ResourceStorage) GetObj(ctx context.Context, cluster, namespace, name string) (runtime.Object, error) {
Expand All @@ -245,12 +230,9 @@ func (s *ResourceStorage) GetObj(ctx context.Context, cluster, namespace, name s
"namespace": namespace,
"name": name,
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
}

result := s.db.WithContext(ctx).Model(&Resource{}).
result := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).
Select("cluster_resource_version, object").Where(condition).First(&resource)
if result.Error != nil {
return nil, InterpretResourceDBError(cluster, namespace+"/"+name, result.Error)
Expand All @@ -268,19 +250,37 @@ func (s *ResourceStorage) GetObj(ctx context.Context, cluster, namespace, name s
return obj, nil
}

func (s *ResourceStorage) GenGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB {
condition := map[string]interface{}{
"namespace": namespace,
"name": name,
"deleted": false,
}

if cluster != "" {
condition["cluster"] = cluster
}
return s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Select("cluster_resource_version, object").Where(condition)
}

func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error {
var objects [][]byte
if result := s.genGetObjectQuery(ctx, cluster, namespace, name).First(&objects); result.Error != nil {
var resource Resource
if result := s.GenGetObjectQuery(ctx, cluster, namespace, name).First(&resource); result.Error != nil {
return InterpretResourceDBError(cluster, namespace+"/"+name, result.Error)
}

obj, _, err := s.codec.Decode(objects[0], nil, into)
obj, _, err := s.codec.Decode(resource.Object, nil, into)
if err != nil {
return err
}
if obj != into {
return fmt.Errorf("Failed to decode resource, into is %T", into)
}
metaObj, err := meta.Accessor(obj)
if err != nil {
return err
}
metaObj.SetResourceVersion(utils.ParseInt642Str(resource.ClusterResourceVersion))
return nil
}

Expand All @@ -293,14 +293,11 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna
var condition map[string]interface{}
if !isAll {
condition = map[string]interface{}{
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"deleted": false,
"deleted": false,
}
}

query := s.db.WithContext(ctx).Model(&Resource{}).Where(condition)
query := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Where(condition)
offset, amount, query, err := applyListOptionsToResourceQuery(s.db, query, opts)
return offset, amount, query, result, err
}
Expand All @@ -309,11 +306,9 @@ func (s *ResourceStorage) genListQuery(ctx context.Context, newfunc func() runti
var result [][]byte

condition := map[string]interface{}{
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"deleted": false,
}
query := s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(condition)
query := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Select("object").Where(condition)
_, _, query, err := applyListOptionsToResourceQuery(s.db, query, opts)
if err != nil {
return nil, err
Expand Down Expand Up @@ -530,6 +525,14 @@ func (s *ResourceStorage) fetchInitEvents(ctx context.Context, rv string, newfun
}
}

func newGvr(s *ResourceStorage) schema.GroupVersionResource {
return schema.GroupVersionResource{
Group: s.storageGroupResource.Group,
Version: s.storageVersion.Version,
Resource: s.storageGroupResource.Resource,
}
}

func getObjectListAndMaxCrv(objList []Object, onlyMetada bool) ([]Object, []string, string, error) {
crvs := make([]string, 0, len(objList))
var maxCrv int64 = 0
Expand Down Expand Up @@ -575,12 +578,7 @@ func getObjectListAndMaxCrv(objList []Object, onlyMetada bool) ([]Object, []stri
func (s *ResourceStorage) GetMaxCrv(ctx context.Context) (string, error) {
maxCrv := "0"
var metadataList ResourceMetadataList
condition := map[string]interface{}{
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
}
result := s.db.WithContext(ctx).Model(&Resource{}).Select("cluster_resource_version").Where(condition).Order("cluster_resource_version DESC").Limit(1).Find(&metadataList)
result := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Select("cluster_resource_version").Order("cluster_resource_version DESC").Limit(1).Find(&metadataList)
if result.Error != nil {
return maxCrv, InterpretResourceDBError("", s.storageGroupResource.Resource, result.Error)
}
Expand Down Expand Up @@ -610,15 +608,12 @@ func (s *ResourceStorage) PublishEvent(ctx context.Context, wc *watchcomponents.
}

condition := map[string]interface{}{
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"cluster": wc.Cluster,
"namespace": metaObj.GetNamespace(),
"name": metaObj.GetName(),
}

s.db.WithContext(ctx).Model(&Resource{}).Where(condition).Updates(updatedResource)
s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Where(condition).Updates(updatedResource)
}

func (s *ResourceStorage) GenCrv2Event(event *watch.Event) {
Expand Down
72 changes: 59 additions & 13 deletions pkg/storage/internalstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@ package internalstorage
import (
"context"
"fmt"
"sync"

"gorm.io/gorm"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"

internal "github.com/clusterpedia-io/api/clusterpedia"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro/informer"
"github.com/clusterpedia-io/clusterpedia/pkg/utils"
watchcomponents "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
)

var mutex sync.Mutex

type StorageFactory struct {
db *gorm.DB
}
Expand All @@ -30,6 +34,18 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi
Resource: config.StorageGroupResource.Resource,
}

mutex.Lock()
defer mutex.Unlock()
if !s.db.Migrator().HasTable(GetTable(gvr)) {
if err := s.db.AutoMigrate(&Resource{}); err != nil {
return nil, err
}
err := s.db.Migrator().RenameTable("resources", GetTable(gvr))
if err != nil {
return nil, err
}
}

resourceStorage := &ResourceStorage{
db: s.db,
codec: config.Codec,
Expand Down Expand Up @@ -99,11 +115,26 @@ func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionRes

func (f *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) {
var resources []Resource
result := f.db.WithContext(ctx).Select("group", "version", "resource", "namespace", "name", "resource_version").
Where(map[string]interface{}{"cluster": cluster}).
Find(&resources)
if result.Error != nil {
return nil, InterpretDBError(cluster, result.Error)
mutex.Lock()
tables, err := f.db.Migrator().GetTables()
if err != nil {
mutex.Unlock()
return nil, err
}
mutex.Unlock()
for _, table := range tables {
var tableResources []Resource
result := f.db.WithContext(ctx).Table(table).Select("group", "version", "resource",
"namespace", "name", "resource_version", "deleted", "published").
Where(map[string]interface{}{"cluster": cluster, "deleted": false}).
//In case deleted event be losted when synchro manager do a leaderelection or reboot
Or(map[string]interface{}{"cluster": cluster, "deleted": true, "published": false}).
Find(&tableResources)
if result.Error != nil {
return nil, InterpretDBError(cluster, result.Error)
}

resources = append(resources, tableResources...)
}

resourceversions := make(map[schema.GroupVersionResource]map[string]interface{})
Expand All @@ -119,22 +150,37 @@ func (f *StorageFactory) GetResourceVersions(ctx context.Context, cluster string
if resource.Namespace != "" {
key = resource.Namespace + "/" + resource.Name
}
versions[key] = resource.ResourceVersion
versions[key] = informer.StorageElement{
Version: resource.ResourceVersion,
Deleted: resource.Deleted,
Published: resource.Published,
Name: resource.Name,
Namespace: resource.Namespace,
}
}
return resourceversions, nil
}

func (f *StorageFactory) CleanCluster(ctx context.Context, cluster string) error {
result := f.db.WithContext(ctx).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{})
return InterpretDBError(cluster, result.Error)
mutex.Lock()
tables, err := f.db.Migrator().GetTables()
if err != nil {
mutex.Unlock()
return err
}
mutex.Unlock()
for _, table := range tables {
result := f.db.WithContext(ctx).Table(table).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{})
if result.Error != nil {
return InterpretDBError(cluster, result.Error)
}
}
return nil
}

func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster string, gvr schema.GroupVersionResource) error {
result := s.db.Where(map[string]interface{}{
"cluster": cluster,
"group": gvr.Group,
"version": gvr.Version,
"resource": gvr.Resource,
result := s.db.Table(GetTable(gvr)).Where(map[string]interface{}{
"cluster": cluster,
}).Delete(&Resource{})
return InterpretDBError(fmt.Sprintf("%s/%s", cluster, gvr), result.Error)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ func (informer *resourceVersionInformer) HandleDeltas(deltas cache.Deltas, isInI
}

informer.handler.OnAdd(d.Object, isInInitialList)
continue
break
}

if d.Type == cache.Replaced {
if v := compareResourceVersion(d.Object, version); v <= 0 {
if v == 0 {
informer.handler.OnSync(d.Object)
}
continue
break
}
}

Expand Down
Loading

0 comments on commit a62af50

Please sign in to comment.