Skip to content

Commit

Permalink
Merge pull request #320 from intelligentfu8/revert-replcias-update
Browse files Browse the repository at this point in the history
fix cluster updated and add safe shield for scale down fe
  • Loading branch information
intelligentfu8 authored Dec 24, 2024
2 parents a13e1ab + 74caa10 commit e7d67f3
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 10 deletions.
9 changes: 9 additions & 0 deletions pkg/common/utils/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ func ApplyStatefulSet(ctx context.Context, k8sclient client.Client, st *appv1.St
return err
}

func ApplyDorisCluster(ctx context.Context, k8sclient client.Client, dcr *dorisv1.DorisCluster) error {
err := PatchClientObject(ctx, k8sclient, dcr)
if err == nil || apierrors.IsConflict(err) {
return nil
}

return err
}

func GetStatefulSet(ctx context.Context, k8sclient client.Client, namespace, name string) (*appv1.StatefulSet, error) {
var est appv1.StatefulSet
err := k8sclient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &est)
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/utils/mysql/doris.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func BuildSeqNumberToFrontendMap(frontends []*Frontend, ipMap map[string]string,
}

// FindNeedDeletedFrontends means descending sort fe by index and return top needRemovedAmount
func FindNeedDeletedFrontends(frontendMap map[int]*Frontend, needRemovedAmount int32) []*Frontend {
func FindNeedDeletedObservers(frontendMap map[int]*Frontend, needRemovedAmount int32) []*Frontend {
var topFrontends []*Frontend
if int(needRemovedAmount) <= len(frontendMap) {
keys := make([]int, 0, len(frontendMap))
Expand Down
21 changes: 21 additions & 0 deletions pkg/controller/doriscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ package controller
import (
"context"
dorisv1 "github.com/apache/doris-operator/api/doris/v1"
"github.com/apache/doris-operator/pkg/common/utils/k8s"
"github.com/apache/doris-operator/pkg/controller/sub_controller"
"github.com/apache/doris-operator/pkg/controller/sub_controller/be"
bk "github.com/apache/doris-operator/pkg/controller/sub_controller/broker"
Expand Down Expand Up @@ -150,9 +151,29 @@ func (r *DorisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
}

//if dcr has updated by doris operator, should update it in apiserver. if not ignore it.
if err = r.revertDorisClusterSomeFields(ctx, &edcr, dcr); err != nil {
klog.Errorf("DorisClusterReconciler updateDorisClusterToOld update dorisCluster namespace=%s, name=%s failed, err=%s", dcr.Namespace, dcr.Name, err.Error())
return requeueIfError(err)
}

return r.updateDorisClusterStatus(ctx, dcr)
}

//if cluster spec be reverted, doris operator should revert to old.
//this action is not good, but this will be a good shield for scale down of fe.
func(r *DorisClusterReconciler) revertDorisClusterSomeFields(ctx context.Context, getDcr, updatedDcr *dorisv1.DorisCluster) error {
if *getDcr.Spec.FeSpec.Replicas != *updatedDcr.Spec.FeSpec.Replicas {
return k8s.ApplyDorisCluster(ctx, r.Client, updatedDcr)
}

return nil
}

func(r *DorisClusterReconciler) updateDorisCluster(ctx context.Context, dcr *dorisv1.DorisCluster) error {
return k8s.ApplyDorisCluster(ctx, r.Client, dcr)
}

func (r *DorisClusterReconciler) clearNoEffectResources(context context.Context, cluster *dorisv1.DorisCluster) {
//calculate the status of doris cluster by subresource's status.
//clear resources when sub resource deleted. example: deployed fe,be,cn, when cn spec is deleted we should delete cn resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (dfc *DisaggregatedFEController) dropFEBySQLClient(ctx context.Context, k8s
return nil
}
}
observes := mysql.FindNeedDeletedFrontends(frontendMap, needRemovedAmount)
observes := mysql.FindNeedDeletedObservers(frontendMap, needRemovedAmount)
// drop node and return
return masterDBClient.DropObserver(observes)
}
36 changes: 28 additions & 8 deletions pkg/controller/sub_controller/fe/prepare_modify.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,7 @@ func (fc *Controller) prepareStatefulsetApply(ctx context.Context, cluster *v1.D
cluster.Spec.FeSpec.Replicas = resource.GetInt32Pointer(0)
}

ele := cluster.GetElectionNumber()

if *(cluster.Spec.FeSpec.Replicas) < ele {
fc.K8srecorder.Event(cluster, string(sc.EventWarning), string(sc.FESpecSetError), "The number of fe ElectionNumber is large than Replicas, Replicas has been corrected to the correct minimum value")
klog.Errorf("prepareStatefulsetApply namespace=%s,name=%s ,The number of fe ElectionNumber(%d) is large than Replicas(%d)", cluster.Namespace, cluster.Name, ele, *(cluster.Spec.FeSpec.Replicas))
cluster.Spec.FeSpec.Replicas = &ele
}
fc.safeScaleDown(cluster, &oldSt)

// wroa means: oldReplicas - newReplicas, the opposite of removedAmount, willRemovedOppositeAmount shortly as wroa
wroa := *(cluster.Spec.FeSpec.Replicas) - *(oldSt.Spec.Replicas)
Expand All @@ -76,6 +70,32 @@ func (fc *Controller) prepareStatefulsetApply(ctx context.Context, cluster *v1.D
return nil
}

func (fc *Controller) safeScaleDown(cluster *v1.DorisCluster, ost *appv1.StatefulSet) {
ele := cluster.GetElectionNumber()
nr := *cluster.Spec.FeSpec.Replicas
or := *ost.Spec.Replicas
//if not scale down do nothing.
if nr >= or {
return
}

//if scale down observers,(replicas > election number), be allowed.
if nr >= ele {
return
}

if or >= ele {
// if the scale down nodes have observer and follower roles, scale down observers.
*cluster.Spec.FeSpec.Replicas = ele
fc.K8srecorder.Event(cluster,string(sc.EventWarning), sc.FollowerScaleDownFailed,"Replicas is not allowed less than ElectionNumber, because of the bdbje (like raft) consistency protocol, if want do that please set ElectionNumber less than replicas. like that \"spec:{feSpec:{electionNumber}}\"")
} else {
//if the scale down nodes only have followers, not be allowed.
*cluster.Spec.FeSpec.Replicas =or
fc.K8srecorder.Event(cluster,string(sc.EventWarning), sc.FollowerScaleDownFailed,"Replicas less than electionNumber, so not allowed scale down. This is because the bdbje(like raft) consistency protocol, if want do that please set ElectionNumber less than replicas. like that \"spec:{feSpec:{electionNumber}}\"")
}

return
}
// dropObserverBySqlClient handles doris'SQL(drop frontend) through the MySQL client when dealing with scale in observer
// targetDCR is new dcr
func (fc *Controller) dropObserverBySqlClient(ctx context.Context, k8sclient client.Client, targetDCR *v1.DorisCluster) error {
Expand Down Expand Up @@ -150,7 +170,7 @@ func (fc *Controller) dropObserverBySqlClient(ctx context.Context, k8sclient cli
return nil
}
}
observes := mysql.FindNeedDeletedFrontends(frontendMap, needRemovedAmount)
observes := mysql.FindNeedDeletedObservers(frontendMap, needRemovedAmount)
// drop node and return
return masterDBClient.DropObserver(observes)

Expand Down

0 comments on commit e7d67f3

Please sign in to comment.