Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.24.0
toolchain go1.24.5

require (
github.com/cert-manager/cert-manager v1.18.2
github.com/cert-manager/cert-manager v1.18.1
github.com/go-logr/logr v1.4.3
github.com/stretchr/testify v1.10.0
go.etcd.io/etcd/api/v3 v3.6.4
Expand Down Expand Up @@ -116,9 +116,9 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.33.3
k8s.io/apiserver v0.33.3 // indirect
k8s.io/component-base v0.33.3 // indirect
k8s.io/apiextensions-apiserver v0.33.0
k8s.io/apiserver v0.33.0 // indirect
k8s.io/component-base v0.33.0 // indirect
k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 // indirect
sigs.k8s.io/e2e-framework v0.6.0
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cert-manager/cert-manager v1.18.2 h1:H2P75ycGcTMauV3gvpkDqLdS3RSXonWF2S49QGA1PZE=
github.com/cert-manager/cert-manager v1.18.2/go.mod h1:icDJx4kG9BCNpGjBvrmsFd99d+lXUvWdkkcrSSQdIiw=
github.com/cert-manager/cert-manager v1.18.1 h1:5qa3UNrgkNc5Zpn0CyAVMyRIchfF3/RHji4JrazYmWw=
github.com/cert-manager/cert-manager v1.18.1/go.mod h1:icDJx4kG9BCNpGjBvrmsFd99d+lXUvWdkkcrSSQdIiw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cockroachdb/datadriven v1.0.2 h1:H9MtNqVoVhvd9nCBwOyDjUEdZCREqbIdCJD93PBm/jA=
Expand Down Expand Up @@ -292,16 +292,16 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.33.3 h1:SRd5t//hhkI1buzxb288fy2xvjubstenEKL9K51KBI8=
k8s.io/api v0.33.3/go.mod h1:01Y/iLUjNBM3TAvypct7DIj0M0NIZc+PzAHCIo0CYGE=
k8s.io/apiextensions-apiserver v0.33.3 h1:qmOcAHN6DjfD0v9kxL5udB27SRP6SG/MTopmge3MwEs=
k8s.io/apiextensions-apiserver v0.33.3/go.mod h1:oROuctgo27mUsyp9+Obahos6CWcMISSAPzQ77CAQGz8=
k8s.io/apiextensions-apiserver v0.33.0 h1:d2qpYL7Mngbsc1taA4IjJPRJ9ilnsXIrndH+r9IimOs=
k8s.io/apiextensions-apiserver v0.33.0/go.mod h1:VeJ8u9dEEN+tbETo+lFkwaaZPg6uFKLGj5vyNEwwSzc=
k8s.io/apimachinery v0.33.3 h1:4ZSrmNa0c/ZpZJhAgRdcsFcZOw1PQU1bALVQ0B3I5LA=
k8s.io/apimachinery v0.33.3/go.mod h1:BHW0YOu7n22fFv/JkYOEfkUYNRN0fj0BlvMFWA7b+SM=
k8s.io/apiserver v0.33.3 h1:Wv0hGc+QFdMJB4ZSiHrCgN3zL3QRatu56+rpccKC3J4=
k8s.io/apiserver v0.33.3/go.mod h1:05632ifFEe6TxwjdAIrwINHWE2hLwyADFk5mBsQa15E=
k8s.io/apiserver v0.33.0 h1:QqcM6c+qEEjkOODHppFXRiw/cE2zP85704YrQ9YaBbc=
k8s.io/apiserver v0.33.0/go.mod h1:EixYOit0YTxt8zrO2kBU7ixAtxFce9gKGq367nFmqI8=
k8s.io/client-go v0.33.3 h1:M5AfDnKfYmVJif92ngN532gFqakcGi6RvaOF16efrpA=
k8s.io/client-go v0.33.3/go.mod h1:luqKBQggEf3shbxHY4uVENAxrDISLOarxpTKMiUuujg=
k8s.io/component-base v0.33.3 h1:mlAuyJqyPlKZM7FyaoM/LcunZaaY353RXiOd2+B5tGA=
k8s.io/component-base v0.33.3/go.mod h1:ktBVsBzkI3imDuxYXmVxZ2zxJnYTZ4HAsVj9iF09qp4=
k8s.io/component-base v0.33.0 h1:Ot4PyJI+0JAD9covDhwLp9UNkUja209OzsJ4FzScBNk=
k8s.io/component-base v0.33.0/go.mod h1:aXYZLbw3kihdkOPMDhWbjGCO6sg+luw554KP51t8qCU=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff h1:/usPimJzUKKu+m+TE36gUyGcf03XZEP0ZIKgKj35LS4=
Expand Down
218 changes: 133 additions & 85 deletions internal/controller/etcdcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ type EtcdClusterReconciler struct {
ImageRegistry string
}

// reconcileState holds all transient data for a single reconciliation loop.
// Every phase of Reconcile stores intermediate information here so that
// subsequent phases can operate without additional lookups.
type reconcileState struct {
cluster *ecv1alpha1.EtcdCluster // cluster custom resource currently being reconciled
sts *appsv1.StatefulSet // associated StatefulSet for the cluster
memberListResp *clientv3.MemberListResponse // member list fetched from the etcd cluster
healthInfos []etcdutils.EpHealth // health information for each etcd member
}

// +kubebuilder:rbac:groups=operator.etcd.io,resources=etcdclusters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=operator.etcd.io,resources=etcdclusters/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=operator.etcd.io,resources=etcdclusters/finalizers,verbs=update
Expand All @@ -55,108 +65,152 @@ type EtcdClusterReconciler struct {
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch;get;list;update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the EtcdCluster object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
// Reconcile orchestrates a single reconciliation cycle for an EtcdCluster. It
// sequentially fetches resources, ensures primitive objects exist, checks the
// health of the etcd cluster and then adjusts its state to match the desired
// specification. Each phase is handled by a dedicated helper method.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/reconcile
// For more details on the controller-runtime Reconcile contract see:
// https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile
func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
state, res, err := r.fetchAndValidateState(ctx, req)
if state == nil || err != nil || res.RequeueAfter != 0 {
return res, err
}

// Fetch the EtcdCluster resource
etcdCluster := &ecv1alpha1.EtcdCluster{}
if primRes, err := r.syncPrimitives(ctx, state); err != nil || primRes.Requeue || primRes.RequeueAfter != 0 { //nolint:staticcheck
return primRes, err
}

err := r.Get(ctx, req.NamespacedName, etcdCluster)
if err != nil {
if err = r.performHealthChecks(ctx, state); err != nil {
return ctrl.Result{}, err
}

return r.reconcileClusterState(ctx, state)
}

// fetchAndValidateState retrieves the EtcdCluster and its StatefulSet and ensures
// the StatefulSet, if present, is owned by the cluster. It returns a populated
// reconcileState for use in later phases. A non-empty ctrl.Result requests a
// requeue when transient issues occur.
func (r *EtcdClusterReconciler) fetchAndValidateState(ctx context.Context, req ctrl.Request) (*reconcileState, ctrl.Result, error) {
logger := log.FromContext(ctx)

ec := &ecv1alpha1.EtcdCluster{}
if err := r.Get(ctx, req.NamespacedName, ec); err != nil {
if errors.IsNotFound(err) {
logger.Info("EtcdCluster resource not found. Ignoring since object may have been deleted")
return ctrl.Result{}, nil
return nil, ctrl.Result{}, nil
}
return ctrl.Result{}, err
return nil, ctrl.Result{}, err
}

// Determine desired etcd image registry
if etcdCluster.Spec.ImageRegistry == "" {
etcdCluster.Spec.ImageRegistry = r.ImageRegistry
if ec.Spec.ImageRegistry == "" {
ec.Spec.ImageRegistry = r.ImageRegistry
}

logger.Info("Reconciling EtcdCluster", "spec", etcdCluster.Spec)
logger.Info("Reconciling EtcdCluster", "spec", ec.Spec)

// Get the statefulsets which has the same name as the EtcdCluster resource
sts, err := getStatefulSet(ctx, r.Client, etcdCluster.Name, etcdCluster.Namespace)
sts, err := getStatefulSet(ctx, r.Client, ec.Name, ec.Namespace)
if err != nil {
if errors.IsNotFound(err) {
logger.Info("Creating StatefulSet with 0 replica", "expectedSize", etcdCluster.Spec.Size)
// Create a new StatefulSet
sts, err = reconcileStatefulSet(ctx, logger, etcdCluster, r.Client, 0, r.Scheme)
if err != nil {
return ctrl.Result{}, err
}
sts = nil
} else {
// If an error occurs during Get/Create, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
logger.Error(err, "Failed to get StatefulSet. Requesting requeue")
return ctrl.Result{RequeueAfter: requeueDuration}, nil
return nil, ctrl.Result{RequeueAfter: requeueDuration}, nil
}
}

// If the Statefulsets is not controlled by this EtcdCluster resource, we should log
// a warning to the event recorder and return error msg.
err = checkStatefulSetControlledByEtcdOperator(etcdCluster, sts)
if err != nil {
logger.Error(err, "StatefulSet is not controlled by this EtcdCluster resource")
return ctrl.Result{}, err
if sts != nil {
if err := checkStatefulSetControlledByEtcdOperator(ec, sts); err != nil {
logger.Error(err, "StatefulSet is not controlled by this EtcdCluster resource")
return nil, ctrl.Result{}, err
}
}

// If statefulset size is 0. try to instantiate the cluster with 1 member
if sts.Spec.Replicas != nil && *sts.Spec.Replicas == 0 {
logger.Info("StatefulSet has 0 replicas. Trying to create a new cluster with 1 member")
return &reconcileState{cluster: ec, sts: sts}, ctrl.Result{}, nil
}

sts, err = reconcileStatefulSet(ctx, logger, etcdCluster, r.Client, 1, r.Scheme)
if err != nil {
// syncPrimitives ensures that the foundational Kubernetes objects for a cluster
// exist and are correctly initialized. It creates the StatefulSet (initially
// with 0 replicas) and the headless Service if necessary. When either resource
// is created or the StatefulSet is scaled from zero to one replica, the returned
// ctrl.Result requests a requeue so the next reconciliation loop can observe the
// new state. The reconcileState is updated with the current StatefulSet.
func (r *EtcdClusterReconciler) syncPrimitives(ctx context.Context, s *reconcileState) (ctrl.Result, error) {
logger := log.FromContext(ctx)
var err error

if s.sts == nil {
logger.Info("Creating StatefulSet with 0 replica", "expectedSize", s.cluster.Spec.Size)
if s.sts, err = reconcileStatefulSet(ctx, logger, s.cluster, r.Client, 0, r.Scheme); err != nil {
return ctrl.Result{}, err
}
if err = createHeadlessServiceIfNotExist(ctx, logger, r.Client, s.cluster, r.Scheme); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: requeueDuration}, nil
}

err = createHeadlessServiceIfNotExist(ctx, logger, r.Client, etcdCluster, r.Scheme)
if err != nil {
if s.sts.Spec.Replicas != nil && *s.sts.Spec.Replicas == 0 {
logger.Info("StatefulSet has 0 replicas. Trying to create a new cluster with 1 member")
if s.sts, err = reconcileStatefulSet(ctx, logger, s.cluster, r.Client, 1, r.Scheme); err != nil {
return ctrl.Result{}, err
}
if err = createHeadlessServiceIfNotExist(ctx, logger, r.Client, s.cluster, r.Scheme); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: requeueDuration}, nil
}

if err = createHeadlessServiceIfNotExist(ctx, logger, r.Client, s.cluster, r.Scheme); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

// performHealthChecks obtains the member list and health status from the etcd
// cluster specified in the StatefulSet. Results are stored on the reconcileState
// for later reconciliation steps.
func (r *EtcdClusterReconciler) performHealthChecks(ctx context.Context, s *reconcileState) error {
logger := log.FromContext(ctx)
logger.Info("Now checking health of the cluster members")
memberListResp, healthInfos, err := healthCheck(sts, logger)
var err error
s.memberListResp, s.healthInfos, err = healthCheck(s.sts, logger)
if err != nil {
return ctrl.Result{}, fmt.Errorf("health check failed: %w", err)
return fmt.Errorf("health check failed: %w", err)
}
return nil
}

// reconcileClusterState compares the desired cluster size with the observed
// etcd member list and StatefulSet replica count. It performs scaling actions
// and handles learner promotion when needed. A ctrl.Result with a requeue
// instructs the controller to retry after adjustments.
func (r *EtcdClusterReconciler) reconcileClusterState(ctx context.Context, s *reconcileState) (ctrl.Result, error) {
logger := log.FromContext(ctx)
memberCnt := 0
if memberListResp != nil {
memberCnt = len(memberListResp.Members)
if s.memberListResp != nil {
memberCnt = len(s.memberListResp.Members)
}
targetReplica := *sts.Spec.Replicas // Start with the current size of the stateful set
targetReplica := *s.sts.Spec.Replicas
var err error

// The number of replicas in the StatefulSet doesn't match the number of etcd members in the cluster.
if int(targetReplica) != memberCnt {
logger.Info("The expected number of replicas doesn't match the number of etcd members in the cluster", "targetReplica", targetReplica, "memberCnt", memberCnt)
if int(targetReplica) < memberCnt {
logger.Info("An etcd member was added into the cluster, but the StatefulSet hasn't scaled out yet")
newReplicaCount := targetReplica + 1
logger.Info("Increasing StatefulSet replicas to match the etcd cluster member count", "oldReplicaCount", targetReplica, "newReplicaCount", newReplicaCount)
_, err = reconcileStatefulSet(ctx, logger, etcdCluster, r.Client, newReplicaCount, r.Scheme)
if err != nil {
if _, err := reconcileStatefulSet(ctx, logger, s.cluster, r.Client, newReplicaCount, r.Scheme); err != nil {
return ctrl.Result{}, err
}
} else {
logger.Info("An etcd member was removed from the cluster, but the StatefulSet hasn't scaled in yet")
newReplicaCount := targetReplica - 1
logger.Info("Decreasing StatefulSet replicas to remove the unneeded Pod.", "oldReplicaCount", targetReplica, "newReplicaCount", newReplicaCount)
_, err = reconcileStatefulSet(ctx, logger, etcdCluster, r.Client, newReplicaCount, r.Scheme)
if err != nil {
if _, err := reconcileStatefulSet(ctx, logger, s.cluster, r.Client, newReplicaCount, r.Scheme); err != nil {
return ctrl.Result{}, err
}
}
Expand All @@ -170,89 +224,83 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
)

if memberCnt > 0 {
// Find the leader status
_, leaderStatus = etcdutils.FindLeaderStatus(healthInfos, logger)
_, leaderStatus = etcdutils.FindLeaderStatus(s.healthInfos, logger)
if leaderStatus == nil {
// If the leader is not available, let's wait for the leader to be elected
return ctrl.Result{}, fmt.Errorf("couldn't find leader, memberCnt: %d", memberCnt)
}

learner, learnerStatus = etcdutils.FindLearnerStatus(healthInfos, logger)
learner, learnerStatus = etcdutils.FindLearnerStatus(s.healthInfos, logger)
if learner > 0 {
// There is at least one learner. Let's try to promote it or wait
// Find the learner status
logger.Info("Learner found", "learnedID", learner, "learnerStatus", learnerStatus)
if etcdutils.IsLearnerReady(leaderStatus, learnerStatus) {
logger.Info("Learner is ready to be promoted to voting member", "learnerID", learner)
logger.Info("Promoting the learner member", "learnerID", learner)
eps := clientEndpointsFromStatefulsets(sts)
eps := clientEndpointsFromStatefulsets(s.sts)
eps = eps[:(len(eps) - 1)]
err = etcdutils.PromoteLearner(eps, learner)
if err != nil {
// The member is not promoted yet, so we error out
if err := etcdutils.PromoteLearner(eps, learner); err != nil {
return ctrl.Result{}, err
}
} else {
// Learner is not yet ready. We can't add another learner or proceed further until this one is promoted
// So let's requeue
logger.Info("The learner member isn't ready to be promoted yet", "learnerID", learner)
return ctrl.Result{RequeueAfter: requeueDuration}, nil
}
}
}

if targetReplica == int32(etcdCluster.Spec.Size) {
if targetReplica == int32(s.cluster.Spec.Size) {
logger.Info("EtcdCluster is already up-to-date")
return ctrl.Result{}, nil
}

eps := clientEndpointsFromStatefulsets(sts)
eps := clientEndpointsFromStatefulsets(s.sts)

// If there is no more learner, then we can proceed to scale the cluster further.
// If there is no more member to add, the control will not reach here after the requeue
if targetReplica < int32(etcdCluster.Spec.Size) {
// scale out
_, peerURL := peerEndpointForOrdinalIndex(etcdCluster, int(targetReplica)) // The index starts at 0, so we should do this before incrementing targetReplica
if targetReplica < int32(s.cluster.Spec.Size) {
_, peerURL := peerEndpointForOrdinalIndex(s.cluster, int(targetReplica))
targetReplica++
logger.Info("[Scale out] adding a new learner member to etcd cluster", "peerURLs", peerURL)
if _, err := etcdutils.AddMember(eps, []string{peerURL}, true); err != nil {
return ctrl.Result{}, err
}

logger.Info("Learner member added successfully", "peerURLs", peerURL)
} else {
// scale in

if s.sts, err = reconcileStatefulSet(ctx, logger, s.cluster, r.Client, targetReplica, r.Scheme); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{RequeueAfter: requeueDuration}, nil
}

if targetReplica > int32(s.cluster.Spec.Size) {
targetReplica--
logger = logger.WithValues("targetReplica", targetReplica, "expectedSize", etcdCluster.Spec.Size)
logger = logger.WithValues("targetReplica", targetReplica, "expectedSize", s.cluster.Spec.Size)

memberID := healthInfos[memberCnt-1].Status.Header.MemberId
memberID := s.healthInfos[memberCnt-1].Status.Header.MemberId

logger.Info("[Scale in] removing one member", "memberID", memberID)
eps = eps[:targetReplica]
if err := etcdutils.RemoveMember(eps, memberID); err != nil {
return ctrl.Result{}, err
}
}

sts, err = reconcileStatefulSet(ctx, logger, etcdCluster, r.Client, targetReplica, r.Scheme)
if err != nil {
return ctrl.Result{}, err
if s.sts, err = reconcileStatefulSet(ctx, logger, s.cluster, r.Client, targetReplica, r.Scheme); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{RequeueAfter: requeueDuration}, nil
}

allMembersHealthy, err := areAllMembersHealthy(sts, logger)
allMembersHealthy, err := areAllMembersHealthy(s.sts, logger)
if err != nil {
return ctrl.Result{}, err
}

if *sts.Spec.Replicas != int32(etcdCluster.Spec.Size) || !allMembersHealthy {
// Requeue if the statefulset size is not equal to the expected size of ETCD cluster
// Or if all members of the cluster are not healthy
if !allMembersHealthy {
return ctrl.Result{RequeueAfter: requeueDuration}, nil
}

logger.Info("EtcdCluster reconciled successfully")
return ctrl.Result{}, nil

}

// SetupWithManager sets up the controller with the Manager.
Expand Down
Loading
Loading