Skip to content

Commit

Permalink
logs and organization cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
enrichman committed Jan 24, 2025
1 parent 8f24151 commit e2977d4
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 94 deletions.
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ func run(clx *cli.Context) error {
}

ctrlruntimelog.SetLogger(zapr.NewLogger(logger.Desugar().WithOptions(zap.AddCallerSkip(1))))

logger.Info("adding cluster controller")
if err := cluster.Add(ctx, mgr, sharedAgentImage, sharedAgentImagePullPolicy, logger); err != nil {
if err := cluster.Add(ctx, mgr, sharedAgentImage, sharedAgentImagePullPolicy); err != nil {
return fmt.Errorf("failed to add the new cluster controller: %v", err)
}

Expand Down
119 changes: 37 additions & 82 deletions pkg/controller/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"github.com/rancher/k3k/pkg/controller/cluster/agent"
"github.com/rancher/k3k/pkg/controller/cluster/server"
"github.com/rancher/k3k/pkg/controller/cluster/server/bootstrap"
"github.com/rancher/k3k/pkg/log"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -50,12 +48,10 @@ type ClusterReconciler struct {
Scheme *runtime.Scheme
SharedAgentImage string
SharedAgentImagePullPolicy string
logger *log.Logger
}

// Add adds a new controller to the manager
func Add(ctx context.Context, mgr manager.Manager, sharedAgentImage, sharedAgentImagePullPolicy string, logger *log.Logger) error {

func Add(ctx context.Context, mgr manager.Manager, sharedAgentImage, sharedAgentImagePullPolicy string) error {
discoveryClient, err := discovery.NewDiscoveryClientForConfig(mgr.GetConfig())
if err != nil {
return err
Expand All @@ -68,7 +64,6 @@ func Add(ctx context.Context, mgr manager.Manager, sharedAgentImage, sharedAgent
Scheme: mgr.GetScheme(),
SharedAgentImage: sharedAgentImage,
SharedAgentImagePullPolicy: sharedAgentImagePullPolicy,
logger: logger.Named(clusterController),
}

return ctrl.NewControllerManagedBy(mgr).
Expand All @@ -80,77 +75,61 @@ func Add(ctx context.Context, mgr manager.Manager, sharedAgentImage, sharedAgent
}

func (c *ClusterReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
var (
cluster v1alpha1.Cluster
podList v1.PodList
)
log := c.logger.With("Cluster", req.NamespacedName)
log := ctrl.LoggerFrom(ctx)
log.Info("reconciling")

var cluster v1alpha1.Cluster
if err := c.Client.Get(ctx, req.NamespacedName, &cluster); err != nil {
return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err)
return reconcile.Result{}, err
}

// if the Version is not specified we will try to use the same Kubernetes version of the host.
// This version is stored in the Status object, and it will not be updated if already set.
if cluster.Spec.Version == "" && cluster.Status.HostVersion == "" {
hostVersion, err := c.DiscoveryClient.ServerVersion()
if err != nil {
// if DeletionTimestamp is not Zero -> finalize the object
if !cluster.DeletionTimestamp.IsZero() {
return c.finalizeCluster(ctx, cluster)
}

// add finalizers
if !controllerutil.AddFinalizer(&cluster, clusterFinalizerName) {
if err := c.Client.Update(ctx, &cluster); err != nil {
return reconcile.Result{}, err
}
}

// update Status HostVersion
cluster.Status.HostVersion = fmt.Sprintf("v%s.%s.0-k3s1", hostVersion.Major, hostVersion.Minor)
orig := cluster.DeepCopy()

reconcilerErr := c.reconcileCluster(ctx, &cluster)

// update Status if needed
if !reflect.DeepEqual(orig.Status, cluster.Status) {
if err := c.Client.Status().Update(ctx, &cluster); err != nil {
return reconcile.Result{}, err
}
}

if cluster.DeletionTimestamp.IsZero() {
if !controllerutil.ContainsFinalizer(&cluster, clusterFinalizerName) {
controllerutil.AddFinalizer(&cluster, clusterFinalizerName)
if err := c.Client.Update(ctx, &cluster); err != nil {
return reconcile.Result{}, err
}
}
log.Info("enqueue cluster")
return reconcile.Result{}, c.createCluster(ctx, &cluster, log)
}

// remove finalizer from the server pods and update them.
matchingLabels := ctrlruntimeclient.MatchingLabels(map[string]string{"role": "server"})
listOpts := &ctrlruntimeclient.ListOptions{Namespace: cluster.Namespace}
matchingLabels.ApplyToList(listOpts)
if err := c.Client.List(ctx, &podList, listOpts); err != nil {
return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err)
}
for _, pod := range podList.Items {
if controllerutil.ContainsFinalizer(&pod, etcdPodFinalizerName) {
controllerutil.RemoveFinalizer(&pod, etcdPodFinalizerName)
if err := c.Client.Update(ctx, &pod); err != nil {
return reconcile.Result{}, err
}
}
}
return reconcile.Result{}, reconcilerErr
}

if err := c.unbindNodeProxyClusterRole(ctx, &cluster); err != nil {
return reconcile.Result{}, err
}
func (c *ClusterReconciler) reconcileCluster(ctx context.Context, cluster *v1alpha1.Cluster) error {
log := ctrl.LoggerFrom(ctx)
log.Info("reconcileCluster")

if controllerutil.ContainsFinalizer(&cluster, clusterFinalizerName) {
// remove finalizer from the cluster and update it.
controllerutil.RemoveFinalizer(&cluster, clusterFinalizerName)
if err := c.Client.Update(ctx, &cluster); err != nil {
return reconcile.Result{}, err
// if the Version is not specified we will try to use the same Kubernetes version of the host.
// This version is stored in the Status object, and it will not be updated if already set.
if cluster.Spec.Version == "" && cluster.Status.HostVersion == "" {
hostVersion, err := c.DiscoveryClient.ServerVersion()
if err != nil {
return err
}

// update Status HostVersion
cluster.Status.HostVersion = fmt.Sprintf("v%s.%s.0-k3s1", hostVersion.Major, hostVersion.Minor)
}
log.Info("deleting cluster")
return reconcile.Result{}, nil
}

func (c *ClusterReconciler) createCluster(ctx context.Context, cluster *v1alpha1.Cluster, log *zap.SugaredLogger) error {
if err := c.validate(cluster); err != nil {
log.Errorw("invalid change", zap.Error(err))
log.Error(err, "invalid change")
return nil
}

token, err := c.token(ctx, cluster)
if err != nil {
return err
Expand Down Expand Up @@ -341,30 +320,6 @@ func (c *ClusterReconciler) bindNodeProxyClusterRole(ctx context.Context, cluste
return c.Client.Update(ctx, clusterRoleBinding)
}

func (c *ClusterReconciler) unbindNodeProxyClusterRole(ctx context.Context, cluster *v1alpha1.Cluster) error {
clusterRoleBinding := &rbacv1.ClusterRoleBinding{}
if err := c.Client.Get(ctx, types.NamespacedName{Name: "k3k-node-proxy"}, clusterRoleBinding); err != nil {
return fmt.Errorf("failed to get or find k3k-node-proxy ClusterRoleBinding: %w", err)
}

subjectName := controller.SafeConcatNameWithPrefix(cluster.Name, agent.SharedNodeAgentName)

var cleanedSubjects []rbacv1.Subject
for _, subject := range clusterRoleBinding.Subjects {
if subject.Name != subjectName || subject.Namespace != cluster.Namespace {
cleanedSubjects = append(cleanedSubjects, subject)
}
}

// if no subject was removed, all good
if reflect.DeepEqual(clusterRoleBinding.Subjects, cleanedSubjects) {
return nil
}

clusterRoleBinding.Subjects = cleanedSubjects
return c.Client.Update(ctx, clusterRoleBinding)
}

func (c *ClusterReconciler) agent(ctx context.Context, cluster *v1alpha1.Cluster, serviceIP, token string) error {
agent := agent.New(cluster, serviceIP, c.SharedAgentImage, c.SharedAgentImagePullPolicy, token)
agentsConfig := agent.Config()
Expand Down
79 changes: 79 additions & 0 deletions pkg/controller/cluster/cluster_finalize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package cluster

import (
"context"
"fmt"
"reflect"

"github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1"
"github.com/rancher/k3k/pkg/controller"
"github.com/rancher/k3k/pkg/controller/cluster/agent"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func (c *ClusterReconciler) finalizeCluster(ctx context.Context, cluster v1alpha1.Cluster) (reconcile.Result, error) {
log := ctrl.LoggerFrom(ctx)
log.Info("finalizeCluster")

// remove finalizer from the server pods and update them.
matchingLabels := ctrlruntimeclient.MatchingLabels(map[string]string{"role": "server"})
listOpts := &ctrlruntimeclient.ListOptions{Namespace: cluster.Namespace}
matchingLabels.ApplyToList(listOpts)

var podList v1.PodList
if err := c.Client.List(ctx, &podList, listOpts); err != nil {
return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err)
}

for _, pod := range podList.Items {
if controllerutil.ContainsFinalizer(&pod, etcdPodFinalizerName) {
controllerutil.RemoveFinalizer(&pod, etcdPodFinalizerName)
if err := c.Client.Update(ctx, &pod); err != nil {
return reconcile.Result{}, err
}
}
}

if err := c.unbindNodeProxyClusterRole(ctx, &cluster); err != nil {
return reconcile.Result{}, err
}

if controllerutil.ContainsFinalizer(&cluster, clusterFinalizerName) {
// remove finalizer from the cluster and update it.
controllerutil.RemoveFinalizer(&cluster, clusterFinalizerName)
if err := c.Client.Update(ctx, &cluster); err != nil {
return reconcile.Result{}, err
}
}
return reconcile.Result{}, nil
}

func (c *ClusterReconciler) unbindNodeProxyClusterRole(ctx context.Context, cluster *v1alpha1.Cluster) error {
clusterRoleBinding := &rbacv1.ClusterRoleBinding{}
if err := c.Client.Get(ctx, types.NamespacedName{Name: "k3k-node-proxy"}, clusterRoleBinding); err != nil {
return fmt.Errorf("failed to get or find k3k-node-proxy ClusterRoleBinding: %w", err)
}

subjectName := controller.SafeConcatNameWithPrefix(cluster.Name, agent.SharedNodeAgentName)

var cleanedSubjects []rbacv1.Subject
for _, subject := range clusterRoleBinding.Subjects {
if subject.Name != subjectName || subject.Namespace != cluster.Namespace {
cleanedSubjects = append(cleanedSubjects, subject)
}
}

// if no subject was removed, all good
if reflect.DeepEqual(clusterRoleBinding.Subjects, cleanedSubjects) {
return nil
}

clusterRoleBinding.Subjects = cleanedSubjects
return c.Client.Update(ctx, clusterRoleBinding)
}
6 changes: 4 additions & 2 deletions pkg/controller/cluster/cluster_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"path/filepath"
"testing"

"github.com/go-logr/zapr"
"github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1"
"github.com/rancher/k3k/pkg/controller/cluster"
"github.com/rancher/k3k/pkg/log"

"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -53,11 +53,13 @@ var _ = BeforeSuite(func() {
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme})
Expect(err).NotTo(HaveOccurred())

ctrl.SetLogger(zapr.NewLogger(zap.NewNop()))

mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme})
Expect(err).NotTo(HaveOccurred())

ctx, cancel = context.WithCancel(context.Background())
err = cluster.Add(ctx, mgr, "", "", &log.Logger{SugaredLogger: zap.NewNop().Sugar()})
err = cluster.Add(ctx, mgr, "", "")
Expect(err).NotTo(HaveOccurred())

go func() {
Expand Down
24 changes: 15 additions & 9 deletions pkg/controller/cluster/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

Expand All @@ -35,15 +36,16 @@ func (c *ClusterReconciler) token(ctx context.Context, cluster *v1alpha1.Cluster
}

func (c *ClusterReconciler) ensureTokenSecret(ctx context.Context, cluster *v1alpha1.Cluster) (string, error) {
log := ctrl.LoggerFrom(ctx)

// check if the secret is already created
var (
tokenSecret v1.Secret
nn = types.NamespacedName{
Name: TokenSecretName(cluster.Name),
Namespace: cluster.Namespace,
}
)
if err := c.Client.Get(ctx, nn, &tokenSecret); err != nil {
key := types.NamespacedName{
Name: TokenSecretName(cluster.Name),
Namespace: cluster.Namespace,
}

var tokenSecret v1.Secret
if err := c.Client.Get(ctx, key, &tokenSecret); err != nil {
if !apierrors.IsNotFound(err) {
return "", err
}
Expand All @@ -52,15 +54,19 @@ func (c *ClusterReconciler) ensureTokenSecret(ctx context.Context, cluster *v1al
if tokenSecret.Data != nil {
return string(tokenSecret.Data["token"]), nil
}
c.logger.Info("Token secret is not specified, creating a random token")

log.Info("Token secret is not specified, creating a random token")

token, err := random(16)
if err != nil {
return "", err
}

tokenSecret = TokenSecretObj(token, cluster.Name, cluster.Namespace)
if err := controllerutil.SetControllerReference(cluster, &tokenSecret, c.Scheme); err != nil {
return "", err
}

if err := c.ensure(ctx, &tokenSecret, false); err != nil {
return "", err
}
Expand Down

0 comments on commit e2977d4

Please sign in to comment.