Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Scylla Manager cluster labels for cluster reconciliation #2156

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions pkg/controller/manager/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,51 @@ import (
"context"

scyllav1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1"
"github.com/scylladb/scylla-operator/pkg/naming"
"github.com/scylladb/scylla-operator/pkg/pointer"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)

func (c *Controller) calculateStatus(sc *scyllav1.ScyllaCluster, managerState *state) *scyllav1.ScyllaClusterStatus {
func (c *Controller) calculateStatus(sc *scyllav1.ScyllaCluster, state *managerClusterState) *scyllav1.ScyllaClusterStatus {
status := sc.Status.DeepCopy()

status.ManagerID = pointer.Ptr("")
status.Backups = []scyllav1.BackupTaskStatus{}
status.Repairs = []scyllav1.RepairTaskStatus{}

if state.Cluster == nil {
return status
}

ownerUIDLabelValue, hasOwnerUIDLabel := state.Cluster.Labels[naming.OwnerUIDLabel]
if !hasOwnerUIDLabel {
klog.Warningf("Cluster %q is missing an owner UID label.", state.Cluster.Name)
}

if ownerUIDLabelValue != string(sc.UID) {
// Cluster is not owned by ScyllaCluster, do not propagate its status.
return status
}

status.ManagerID = pointer.Ptr(state.Cluster.ID)

repairTaskClientErrorMap := map[string]string{}
for _, rts := range status.Repairs {
if rts.Error != nil {
repairTaskClientErrorMap[rts.Name] = *rts.Error
}
}

status.Repairs = []scyllav1.RepairTaskStatus{}
for _, rt := range sc.Spec.Repairs {
repairTaskStatus := scyllav1.RepairTaskStatus{
TaskStatus: scyllav1.TaskStatus{
Name: rt.Name,
},
}

managerTaskStatus, isInManagerState := managerState.RepairTasks[rt.Name]
managerTaskStatus, isInManagerState := state.RepairTasks[rt.Name]
if isInManagerState {
repairTaskStatus = managerTaskStatus
} else {
Expand All @@ -52,15 +73,14 @@ func (c *Controller) calculateStatus(sc *scyllav1.ScyllaCluster, managerState *s
}
}

status.Backups = []scyllav1.BackupTaskStatus{}
for _, bt := range sc.Spec.Backups {
backupTaskStatus := scyllav1.BackupTaskStatus{
TaskStatus: scyllav1.TaskStatus{
Name: bt.Name,
},
}

managerTaskStatus, isInManagerState := managerState.BackupTasks[bt.Name]
managerTaskStatus, isInManagerState := state.BackupTasks[bt.Name]
if isInManagerState {
backupTaskStatus = managerTaskStatus
} else {
Expand Down
128 changes: 70 additions & 58 deletions pkg/controller/manager/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"fmt"
"time"

"github.com/scylladb/scylla-manager/v3/pkg/managerclient"
"github.com/scylladb/scylla-manager/v3/swagger/gen/scylla-manager/models"
scyllav1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1"
"github.com/scylladb/scylla-operator/pkg/helpers"
"github.com/scylladb/scylla-operator/pkg/helpers/slices"
"github.com/scylladb/scylla-operator/pkg/naming"
apierrors "k8s.io/apimachinery/pkg/api/errors"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
Expand All @@ -24,59 +27,73 @@ func (c *Controller) getAuthToken(sc *scyllav1.ScyllaCluster) (string, error) {
return helpers.GetAgentAuthTokenFromSecret(secret)
}

func (c *Controller) getManagerState(ctx context.Context, clusterID string) (*state, error) {
clusters, err := c.managerClient.ListClusters(ctx)
func (c *Controller) getManagerClusterState(ctx context.Context, sc *scyllav1.ScyllaCluster) (*managerClusterState, error) {
managerClusters, err := c.managerClient.ListClusters(ctx)
if err != nil {
return nil, err
}
var (
repairTasks map[string]scyllav1.RepairTaskStatus
backupTasks map[string]scyllav1.BackupTaskStatus
)

if clusterID != "" {
clusterFound := false
for _, c := range clusters {
if c.ID == clusterID {
clusterFound = true
}
return nil, fmt.Errorf("can't list clusters registered with manager: %w", err)
}

clusterName := naming.ManagerClusterName(sc)
// Cluster names in manager state are unique, so it suffices to only find one with a matching name.
managerCluster, _, found := slices.Find(managerClusters, func(c *models.Cluster) bool {
return c.Name == clusterName
})
if !found {
return &managerClusterState{}, nil
}

ownerUIDLabel := managerCluster.Labels[naming.OwnerUIDLabel]
if ownerUIDLabel != string(sc.UID) {
// Despite the label mismatch the cluster needs to be propagated to state so that we can delete it to avoid a name collision.
return &managerClusterState{
Cluster: managerCluster,
}, nil
}

// Sanity check.
if len(managerCluster.ID) == 0 {
return nil, fmt.Errorf("manager cluster is missing an ID")
}

var repairTaskStatuses map[string]scyllav1.RepairTaskStatus
var backupTaskStatuses map[string]scyllav1.BackupTaskStatus

var managerRepairTasks managerclient.TaskListItems
managerRepairTasks, err = c.managerClient.ListTasks(ctx, managerCluster.ID, "repair", true, "", "")
if err != nil {
return nil, fmt.Errorf("can't list repair tasks registered with manager: %w", err)
}

repairTaskStatuses = make(map[string]scyllav1.RepairTaskStatus, len(managerRepairTasks.TaskListItemSlice))
for _, managerRepairTask := range managerRepairTasks.TaskListItemSlice {
var repairTaskStatus *scyllav1.RepairTaskStatus
repairTaskStatus, err = NewRepairStatusFromManager(managerRepairTask)
if err != nil {
return nil, fmt.Errorf("can't get repair task status from manager task: %w", err)
}
repairTaskStatuses[repairTaskStatus.Name] = *repairTaskStatus
}

var managerBackupTasks managerclient.TaskListItems
managerBackupTasks, err = c.managerClient.ListTasks(ctx, managerCluster.ID, "backup", true, "", "")
if err != nil {
return nil, fmt.Errorf("can't list backup tasks registered with manager: %w", err)
}

if clusterFound {
managerRepairTasks, err := c.managerClient.ListTasks(ctx, clusterID, "repair", true, "", "")
if err != nil {
return nil, err
}

repairTasks = make(map[string]scyllav1.RepairTaskStatus, len(managerRepairTasks.TaskListItemSlice))
for _, managerRepairTask := range managerRepairTasks.TaskListItemSlice {
rts, err := NewRepairStatusFromManager(managerRepairTask)
if err != nil {
return nil, err
}
repairTasks[rts.Name] = *rts
}

managerBackupTasks, err := c.managerClient.ListTasks(ctx, clusterID, "backup", true, "", "")
if err != nil {
return nil, err
}

backupTasks = make(map[string]scyllav1.BackupTaskStatus, len(managerBackupTasks.TaskListItemSlice))
for _, managerBackupTask := range managerBackupTasks.TaskListItemSlice {
bts, err := NewBackupStatusFromManager(managerBackupTask)
if err != nil {
return nil, err
}
backupTasks[bts.Name] = *bts
}
backupTaskStatuses = make(map[string]scyllav1.BackupTaskStatus, len(managerBackupTasks.TaskListItemSlice))
for _, managerBackupTask := range managerBackupTasks.TaskListItemSlice {
var backupTaskStatus *scyllav1.BackupTaskStatus
backupTaskStatus, err = NewBackupStatusFromManager(managerBackupTask)
if err != nil {
return nil, fmt.Errorf("can't get backup task status from manager backup task: %w", err)
}
backupTaskStatuses[backupTaskStatus.Name] = *backupTaskStatus
}

return &state{
Clusters: clusters,
BackupTasks: backupTasks,
RepairTasks: repairTasks,
return &managerClusterState{
Cluster: managerCluster,
BackupTasks: backupTaskStatuses,
RepairTasks: repairTaskStatuses,
}, nil
}

Expand All @@ -102,30 +119,25 @@ func (c *Controller) sync(ctx context.Context, key string) error {
return err
}

clusterID := ""
if sc.Status.ManagerID != nil {
clusterID = *sc.Status.ManagerID
}

managerState, err := c.getManagerState(ctx, clusterID)
state, err := c.getManagerClusterState(ctx, sc)
if err != nil {
return fmt.Errorf("can't get manager state: %w", err)
return fmt.Errorf("can't get manager state for cluster %q: %w", naming.ObjRef(sc), err)
}

status := c.calculateStatus(sc, managerState)
status := c.calculateStatus(sc, state)

if sc.DeletionTimestamp != nil {
return c.updateStatus(ctx, sc, status)
}

authToken, err := c.getAuthToken(sc)
if err != nil {
return fmt.Errorf("can't get auth token: %w", err)
return fmt.Errorf("can't get auth token for cluster %q: %w", naming.ObjRef(sc), err)
}

actions, requeue, err := runSync(ctx, sc, authToken, managerState)
actions, requeue, err := runSync(ctx, sc, authToken, state)
if err != nil {
return fmt.Errorf("can't run sync: %w", err)
return fmt.Errorf("can't run sync for cluster %q: %w", naming.ObjRef(sc), err)
}

var errs []error
Expand Down
Loading