diff --git a/cmd/kubenest/operator/app/operator.go b/cmd/kubenest/operator/app/operator.go index 265314a6a..267f8eb51 100644 --- a/cmd/kubenest/operator/app/operator.go +++ b/cmd/kubenest/operator/app/operator.go @@ -181,6 +181,13 @@ func createKubeConfig(opts *options.Options) (*restclient.Config, error) { } func startEndPointsControllers(mgr manager.Manager) error { + restConfig := mgr.GetConfig() + + kubeClient, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return err + } + coreEndPointsController := endpointscontroller.CoreDNSController{ Client: mgr.GetClient(), EventRecorder: mgr.GetEventRecorderFor(constants.GlobalNodeControllerName), @@ -199,9 +206,12 @@ func startEndPointsControllers(mgr manager.Manager) error { return fmt.Errorf("error starting %s: %v", endpointscontroller.KonnectivitySyncControllerName, err) } + nodeGetter := &endpointscontroller.RealNodeGetter{} APIServerExternalSyncController := endpointscontroller.APIServerExternalSyncController{ Client: mgr.GetClient(), EventRecorder: mgr.GetEventRecorderFor(constants.GlobalNodeControllerName), + KubeClient: kubeClient, + NodeGetter: nodeGetter, } if err := APIServerExternalSyncController.SetupWithManager(mgr); err != nil { diff --git a/pkg/kubenest/common/resource.go b/pkg/kubenest/common/resource.go new file mode 100644 index 000000000..6a53a0ad9 --- /dev/null +++ b/pkg/kubenest/common/resource.go @@ -0,0 +1,14 @@ +package common + +import ( + "k8s.io/client-go/kubernetes" + + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" +) + +type APIServerExternalResource struct { + Namespace string + Name string + Vc *v1alpha1.VirtualCluster + RootClientSet kubernetes.Interface +} diff --git a/pkg/kubenest/constants/constant.go b/pkg/kubenest/constants/constant.go index 3d24c5111..6446add2f 100644 --- a/pkg/kubenest/constants/constant.go +++ b/pkg/kubenest/constants/constant.go @@ -27,6 +27,7 @@ const ( ControllerFinalizerName = "operator.virtualcluster.io/finalizer" DefaultKubeconfigPath = "/etc/cluster-tree/cert" Label = "virtualCluster-app" + LabelValue = "apiserver" ComponentBeReadyTimeout = 300 * time.Second ComponentBeDeletedTimeout = 300 * time.Second diff --git a/pkg/kubenest/controller/endpoints.sync.controller/apiserver_external_sync_controller.go b/pkg/kubenest/controller/endpoints.sync.controller/apiserver_external_sync_controller.go index ad065ab57..2f180dcfa 100644 --- a/pkg/kubenest/controller/endpoints.sync.controller/apiserver_external_sync_controller.go +++ b/pkg/kubenest/controller/endpoints.sync.controller/apiserver_external_sync_controller.go @@ -3,24 +3,20 @@ package endpointcontroller import ( "context" "fmt" - "strings" + "reflect" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -30,153 +26,176 @@ import ( "github.com/kosmos.io/kosmos/pkg/utils" ) +type NodeGetter interface { + GetAPIServerNodes(client kubernetes.Interface, namespace string) (*v1.NodeList, error) +} + +type RealNodeGetter struct{} + +func (r *RealNodeGetter) GetAPIServerNodes(client kubernetes.Interface, namespace string) (*v1.NodeList, error) { + return util.GetAPIServerNodes(client, namespace) +} + type APIServerExternalSyncController struct { client.Client EventRecorder record.EventRecorder + KubeClient kubernetes.Interface + NodeGetter NodeGetter } const APIServerExternalSyncControllerName string = "api-server-external-service-sync-controller" func (e *APIServerExternalSyncController) SetupWithManager(mgr manager.Manager) error { - skipEvent := func(obj client.Object) bool { - return strings.Contains(obj.GetName(), "apiserver") && obj.GetNamespace() != "" - } - return controllerruntime.NewControllerManagedBy(mgr). Named(APIServerExternalSyncControllerName). WithOptions(controller.Options{MaxConcurrentReconciles: 5}). - For(&v1.Endpoints{}, - builder.WithPredicates(predicate.Funcs{ - CreateFunc: func(createEvent event.CreateEvent) bool { - return skipEvent(createEvent.Object) - }, - UpdateFunc: func(updateEvent event.UpdateEvent) bool { return skipEvent(updateEvent.ObjectNew) }, - DeleteFunc: func(deleteEvent event.DeleteEvent) bool { return false }, - })). - Watches(&source.Kind{Type: &v1alpha1.VirtualCluster{}}, handler.EnqueueRequestsFromMapFunc(e.newVirtualClusterMapFunc())). + Watches(&source.Kind{Type: &v1.Pod{}}, handler.EnqueueRequestsFromMapFunc(e.newPodMapFunc())). Complete(e) } -func (e *APIServerExternalSyncController) newVirtualClusterMapFunc() handler.MapFunc { - return func(a client.Object) []reconcile.Request { - var requests []reconcile.Request - vcluster := a.(*v1alpha1.VirtualCluster) - - // Join the Reconcile queue only if the status of the vcluster is Completed - if vcluster.Status.Phase == v1alpha1.Completed { - klog.V(4).Infof("api-server-external-sync-controller: virtualcluster change to completed: %s", vcluster.Name) - // Add the vcluster to the Reconcile queue - requests = append(requests, reconcile.Request{ - NamespacedName: types.NamespacedName{ - Name: vcluster.Name, - Namespace: vcluster.Namespace, +func (e *APIServerExternalSyncController) newPodMapFunc() handler.MapFunc { + return func(obj client.Object) []reconcile.Request { + pod, ok := obj.(*v1.Pod) + + if !ok { + klog.Warningf("Object is not a Pod, skipping: %v", obj) + return nil + } + + // If the pod contains the specified label virtualCluster-app=apiserver,it indicates that it belongs to vc. + if val, exists := pod.Labels[constants.Label]; exists && val == constants.LabelValue { + return []reconcile.Request{ + { + NamespacedName: client.ObjectKey{ + Name: pod.Name, + Namespace: pod.Namespace, + }, }, - }) + } } - return requests + + return nil } } -func (e *APIServerExternalSyncController) SyncAPIServerExternalEPS(ctx context.Context, k8sClient kubernetes.Interface) error { - kubeEndpoints, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Get(ctx, "kubernetes", metav1.GetOptions{}) - if err != nil { - klog.Errorf("Error getting endpoints: %v", err) - return err - } - klog.V(4).Infof("Endpoints for service 'kubernetes': %v", kubeEndpoints) - for _, subset := range kubeEndpoints.Subsets { - for _, address := range subset.Addresses { - klog.V(4).Infof("IP: %s", address.IP) - } +func (e *APIServerExternalSyncController) SyncAPIServerExternalEndpoints(ctx context.Context, k8sClient kubernetes.Interface, vc *v1alpha1.VirtualCluster) error { + if e.NodeGetter == nil { + return fmt.Errorf("NodeGetter is nil") } - if len(kubeEndpoints.Subsets) != 1 { - return fmt.Errorf("eps %s Subsets length is not 1", "kubernetes") + nodes, err := e.NodeGetter.GetAPIServerNodes(e.KubeClient, vc.Namespace) + if err != nil { + return fmt.Errorf("failed to get API server nodes: %w", err) } - if kubeEndpoints.Subsets[0].Addresses == nil || len(kubeEndpoints.Subsets[0].Addresses) == 0 { - klog.Errorf("eps %s Addresses length is nil", "kubernetes") - return err + if len(nodes.Items) == 0 { + return fmt.Errorf("no API server nodes found in the cluster") } - apiServerExternalEndpoints, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Get(ctx, constants.APIServerExternalService, metav1.GetOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - klog.Errorf("failed to get endpoints for %s : %v", constants.APIServerExternalService, err) - return err + var addresses []v1.EndpointAddress + for _, node := range nodes.Items { + for _, address := range node.Status.Addresses { + if address.Type == v1.NodeInternalIP { + addresses = append(addresses, v1.EndpointAddress{ + IP: address.Address, + }) + } + } } - updateEPS := apiServerExternalEndpoints.DeepCopy() - - if apiServerExternalEndpoints != nil { - klog.V(4).Infof("apiServerExternalEndpoints: %v", apiServerExternalEndpoints) - } else { - klog.V(4).Info("apiServerExternalEndpoints is nil") + if len(addresses) == 0 { + return fmt.Errorf("no internal IP addresses found for the API server nodes") } - if updateEPS != nil { - klog.V(4).Infof("updateEPS: %v", updateEPS) - } else { - klog.V(4).Info("updateEPS is nil") + apiServerPort, ok := vc.Status.PortMap[constants.APIServerPortKey] + if !ok { + return fmt.Errorf("failed to get API server port from VirtualCluster status") } + klog.V(4).Infof("API server port: %d", apiServerPort) - if len(updateEPS.Subsets) == 1 && len(updateEPS.Subsets[0].Addresses) == 1 { - ip := kubeEndpoints.Subsets[0].Addresses[0].IP - klog.V(4).Infof("IP address: %s", ip) - updateEPS.Subsets[0].Addresses[0].IP = ip + newEndpoint := &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.APIServerExternalService, + Namespace: constants.KosmosNs, + }, + Subsets: []v1.EndpointSubset{ + { + Addresses: addresses, + Ports: []v1.EndpointPort{ + { + Name: "https", + Port: apiServerPort, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }, + } - if _, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Update(ctx, updateEPS, metav1.UpdateOptions{}); err != nil { - klog.Errorf("failed to update endpoints for api-server-external-service: %v", err) - return err + //avoid unnecessary updates + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + currentEndpoint, err := k8sClient.CoreV1().Endpoints(constants.KosmosNs).Get(ctx, constants.APIServerExternalService, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + _, err := k8sClient.CoreV1().Endpoints(constants.KosmosNs).Create(ctx, newEndpoint, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create api-server-external-service endpoint: %w", err) + } + klog.V(4).Info("Created api-server-external-service Endpoint") + return nil + } else if err != nil { + return fmt.Errorf("failed to get existing api-server-external-service endpoint: %w", err) } - } else { - klog.ErrorS(err, "Unexpected format of endpoints for api-server-external-service", "endpoint_data", updateEPS) - return err - } - return nil + // determine if an update is needed + if !reflect.DeepEqual(currentEndpoint.Subsets, newEndpoint.Subsets) { + _, err := k8sClient.CoreV1().Endpoints(constants.KosmosNs).Update(ctx, newEndpoint, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update api-server-external-service endpoint: %w", err) + } + klog.V(4).Info("Updated api-server-external-service Endpoint") + } else { + klog.V(4).Info("No changes detected in Endpoint, skipping update") + } + return nil + }) } func (e *APIServerExternalSyncController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { klog.V(4).Infof("============ %s start to reconcile %s ============", APIServerExternalSyncControllerName, request.NamespacedName) defer klog.V(4).Infof("============ %s finish to reconcile %s ============", APIServerExternalSyncControllerName, request.NamespacedName) - var virtualClusterList v1alpha1.VirtualClusterList - if err := e.List(ctx, &virtualClusterList); err != nil { - if apierrors.IsNotFound(err) { - return reconcile.Result{}, nil - } - klog.V(4).Infof("query virtualcluster failed: %v", err) + var vcList v1alpha1.VirtualClusterList + if err := e.List(ctx, &vcList, client.InNamespace(request.NamespacedName.Namespace)); err != nil { + klog.Errorf("Failed to list VirtualClusters in namespace %s: %v", request.NamespacedName.Namespace, err) return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } - var targetVirtualCluster v1alpha1.VirtualCluster - hasVirtualCluster := false - for _, vc := range virtualClusterList.Items { - if vc.Namespace == request.Namespace { - targetVirtualCluster = vc - klog.V(4).Infof("virtualcluster %s found", targetVirtualCluster.Name) - hasVirtualCluster = true - break - } - } - if !hasVirtualCluster { - klog.V(4).Infof("virtualcluster %s not found", request.Namespace) + + if len(vcList.Items) == 0 { + klog.V(4).Infof("No VirtualCluster found in namespace %s", request.NamespacedName.Namespace) return reconcile.Result{}, nil } - if targetVirtualCluster.Status.Phase != v1alpha1.Completed { + // A namespace should correspond to only one virtual cluster (vc). If it corresponds to multiple vcs, it indicates an error. + if len(vcList.Items) > 1 { + klog.Errorf("Multiple VirtualClusters found in namespace %s, expected only one", request.NamespacedName.Namespace) + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil + } + + vc := vcList.Items[0] + + if vc.Status.Phase != v1alpha1.Completed { + klog.V(4).Infof("VirtualCluster %s is not in Completed phase", vc.Name) return reconcile.Result{}, nil } - k8sClient, err := util.GenerateKubeclient(&targetVirtualCluster) + k8sClient, err := util.GenerateKubeclient(&vc) if err != nil { - klog.Errorf("virtualcluster %s crd kubernetes client failed: %v", targetVirtualCluster.Name, err) + klog.Errorf("Failed to generate Kubernetes client for VirtualCluster %s: %v", vc.Name, err) return reconcile.Result{}, nil } - if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - return e.SyncAPIServerExternalEPS(ctx, k8sClient) - }); err != nil { - klog.Errorf("virtualcluster %s sync apiserver external endpoints failed: %v", targetVirtualCluster.Name, err) + if err := e.SyncAPIServerExternalEndpoints(ctx, k8sClient, &vc); err != nil { + klog.Errorf("Failed to sync apiserver external Endpoints: %v", err) return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } diff --git a/pkg/kubenest/controller/endpoints.sync.controller/apiserver_external_sync_controller_test.go b/pkg/kubenest/controller/endpoints.sync.controller/apiserver_external_sync_controller_test.go new file mode 100644 index 000000000..b1c4cfbd2 --- /dev/null +++ b/pkg/kubenest/controller/endpoints.sync.controller/apiserver_external_sync_controller_test.go @@ -0,0 +1,161 @@ +package endpointcontroller + +import ( + "context" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" +) + +type MockNodeGetter struct { + Nodes *corev1.NodeList + Err error +} + +func (m *MockNodeGetter) GetAPIServerNodes(_ kubernetes.Interface, _ string) (*corev1.NodeList, error) { + return m.Nodes, m.Err +} + +func TestSyncAPIServerExternalEndpoints(t *testing.T) { + ctx := context.TODO() + vc := &v1alpha1.VirtualCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-vc", + Namespace: "test-ns", + }, + Status: v1alpha1.VirtualClusterStatus{ + Phase: v1alpha1.Completed, + PortMap: map[string]int32{ + constants.APIServerPortKey: 6443, + }, + }, + } + + nodes := &corev1.NodeList{ + Items: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + {Type: corev1.NodeInternalIP, Address: "192.168.1.1"}, + }, + }, + }, + }, + } + + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.APIServerExternalService, + Namespace: constants.KosmosNs, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + {IP: "192.168.1.1"}, + }, + Ports: []corev1.EndpointPort{ + {Name: "https", Port: 6443, Protocol: corev1.ProtocolTCP}, + }, + }, + }, + } + + tests := []struct { + name string + objects []runtime.Object + mockNodes *corev1.NodeList + mockErr error + wantErr bool + wantErrString string + wantSubsets []corev1.EndpointSubset + }{ + { + name: "Successfully syncs external endpoints", + objects: []runtime.Object{}, + mockNodes: nodes, + wantSubsets: endpoint.Subsets, + }, + { + name: "Does not update endpoint if no changes", + objects: []runtime.Object{endpoint}, + mockNodes: nodes, + wantSubsets: endpoint.Subsets, + }, + { + name: "Updates endpoint if changes detected", + objects: []runtime.Object{ + func() runtime.Object { + modifiedEndpoint := endpoint.DeepCopy() + modifiedEndpoint.Subsets[0].Addresses[0].IP = "192.168.1.2" + return modifiedEndpoint + }(), + }, + mockNodes: nodes, + wantSubsets: endpoint.Subsets, + }, + { + name: "Fails if no API server nodes are found", + objects: []runtime.Object{}, + mockNodes: &corev1.NodeList{}, + wantErr: true, + wantErrString: "no API server nodes found in the cluster", + }, + { + name: "Fails if no internal IP addresses are found", + objects: []runtime.Object{}, + mockNodes: &corev1.NodeList{ + Items: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node-2"}, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{}, + }, + }, + }, + }, + wantErr: true, + wantErrString: "no internal IP addresses found for the API server nodes", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Use fake clientset to simulate the Kubernetes API client (host cluster) + fakeHostClusterClient := fake.NewSimpleClientset(tt.objects...) + // Simulate the Virtual Cluster client by passing the same clientset + fakeVCClient := fake.NewSimpleClientset() + // Mock NodeGetter to return the mock nodes for Host cluster + mockNodeGetter := &MockNodeGetter{Nodes: tt.mockNodes, Err: tt.mockErr} + // Use fake clientset to simulate the Kubernetes API client (host cluster) + controller := &APIServerExternalSyncController{ + KubeClient: fakeHostClusterClient, + NodeGetter: mockNodeGetter, + } + // Test the controller method using the VC's client + err := controller.SyncAPIServerExternalEndpoints(ctx, fakeVCClient, vc) + if tt.wantErr { + assert.Error(t, err) + if tt.wantErrString != "" { + assert.Contains(t, err.Error(), tt.wantErrString) + } + } else { + assert.NoError(t, err) + if tt.wantSubsets != nil { + createdEndpoint, err := fakeVCClient.CoreV1().Endpoints(constants.KosmosNs).Get(ctx, constants.APIServerExternalService, metav1.GetOptions{}) + assert.NoError(t, err) + assert.True(t, reflect.DeepEqual(createdEndpoint.Subsets, tt.wantSubsets)) + } + } + }) + } +} diff --git a/pkg/kubenest/controlplane/endpoint.go b/pkg/kubenest/controlplane/endpoint.go index e809eddd5..58e08eae1 100644 --- a/pkg/kubenest/controlplane/endpoint.go +++ b/pkg/kubenest/controlplane/endpoint.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -12,13 +11,25 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + "github.com/kosmos.io/kosmos/pkg/kubenest/common" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" "github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/virtualcluster" "github.com/kosmos.io/kosmos/pkg/kubenest/util" + "github.com/kosmos.io/kosmos/pkg/utils" ) -func EnsureAPIServerExternalEndPoint(kubeClient kubernetes.Interface) error { - err := CreateOrUpdateAPIServerExternalEndpoint(kubeClient) +type IPFamilies struct { + IPv4 bool + IPv6 bool +} + +func EnsureAPIServerExternalEndPoint(kubeClient kubernetes.Interface, apiServerExternalResource common.APIServerExternalResource) error { + err := EnsureKosmosSystemNamespace(kubeClient) + if err != nil { + return err + } + + err = CreateOrUpdateAPIServerExternalEndpoint(kubeClient, apiServerExternalResource) if err != nil { return err } @@ -30,70 +41,93 @@ func EnsureAPIServerExternalEndPoint(kubeClient kubernetes.Interface) error { return nil } -func CreateOrUpdateAPIServerExternalEndpoint(kubeClient kubernetes.Interface) error { - klog.V(4).Info("begin to get kubernetes endpoint") - kubeEndpoint, err := kubeClient.CoreV1().Endpoints(constants.DefaultNs).Get(context.TODO(), "kubernetes", metav1.GetOptions{}) +func CreateOrUpdateAPIServerExternalEndpoint(kubeClient kubernetes.Interface, apiServerExternalResource common.APIServerExternalResource) error { + klog.V(4).Info("begin to create or update api-server-external-service endpoint") + nodes, err := util.GetAPIServerNodes(apiServerExternalResource.RootClientSet, apiServerExternalResource.Namespace) if err != nil { - klog.Error("get Kubernetes endpoint failed", err) - return errors.Wrap(err, "failed to get kubernetes endpoint") - } - klog.V(4).Info("the Kubernetes endpoint is:", kubeEndpoint) - - newEndpoint := kubeEndpoint.DeepCopy() - newEndpoint.Name = constants.APIServerExternalService - newEndpoint.Namespace = constants.DefaultNs - newEndpoint.ResourceVersion = "" - - // Reconstruct the Ports without the 'name' field - for i := range newEndpoint.Subsets { - for j := range newEndpoint.Subsets[i].Ports { - newEndpoint.Subsets[i].Ports[j] = corev1.EndpointPort{ - Port: newEndpoint.Subsets[i].Ports[j].Port, - Protocol: newEndpoint.Subsets[i].Ports[j].Protocol, + return fmt.Errorf("failed to get API server nodes: %w", err) + } + if len(nodes.Items) == 0 { + return fmt.Errorf("no API server nodes found in the cluster") + } + + var addresses []corev1.EndpointAddress + for _, node := range nodes.Items { + klog.V(4).Infof("API server node: %s", node.Name) + for _, address := range node.Status.Addresses { + if address.Type == corev1.NodeInternalIP { + klog.V(4).Infof("Node internal IP: %s", address.Address) + addresses = append(addresses, corev1.EndpointAddress{ + IP: address.Address, + }) } } } - // Try to create the endpoint - _, err = kubeClient.CoreV1().Endpoints(constants.DefaultNs).Create(context.TODO(), newEndpoint, metav1.CreateOptions{}) - if err != nil { - if !apierrors.IsAlreadyExists(err) { - klog.Error("create api-server-external-service endpoint failed", err) - return errors.Wrap(err, "failed to create api-server-external-service endpoint") - } + if len(addresses) == 0 { + return fmt.Errorf("no internal IP addresses found for the API server nodes") + } - // Endpoint already exists, retrieve it - existingEndpoint, err := kubeClient.CoreV1().Endpoints(constants.DefaultNs).Get(context.TODO(), constants.APIServerExternalService, metav1.GetOptions{}) - if err != nil { - klog.Error("get existing api-server-external-service endpoint failed", err) - return errors.Wrap(err, "failed to get existing api-server-external-service endpoint") - } + apiServerPort, ok := apiServerExternalResource.Vc.Status.PortMap[constants.APIServerPortKey] + if !ok { + return fmt.Errorf("failed to get API server port from VirtualCluster status") + } + klog.V(4).Infof("API server port: %d", apiServerPort) + + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.APIServerExternalService, + Namespace: constants.KosmosNs, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: addresses, + Ports: []corev1.EndpointPort{ + { + Name: "https", + Port: apiServerPort, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + }, + } - // Update the existing endpoint - newEndpoint.SetResourceVersion(existingEndpoint.ResourceVersion) - newEndpoint.SetUID(existingEndpoint.UID) - _, err = kubeClient.CoreV1().Endpoints(constants.DefaultNs).Update(context.TODO(), newEndpoint, metav1.UpdateOptions{}) - if err != nil { - klog.Error("update api-server-external-service endpoint failed", err) - return errors.Wrap(err, "failed to update api-server-external-service endpoint") + _, err = kubeClient.CoreV1().Endpoints(constants.KosmosNs).Get(context.TODO(), constants.APIServerExternalService, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + _, err = kubeClient.CoreV1().Endpoints(constants.KosmosNs).Create(context.TODO(), endpoint, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create api-server-external-service endpoint: %w", err) + } + klog.V(4).Info("api-server-external-service endpoint created successfully") + } else { + return fmt.Errorf("failed to get api-server-external-service endpoint: %w", err) } - klog.V(4).Info("successfully updated api-server-external-service endpoint") } else { - klog.V(4).Info("successfully created api-server-external-service endpoint") + _, err = kubeClient.CoreV1().Endpoints(constants.KosmosNs).Update(context.TODO(), endpoint, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update api-server-external-service endpoint: %w", err) + } + klog.V(4).Info("api-server-external-service endpoint updated successfully") } return nil } func CreateOrUpdateAPIServerExternalService(kubeClient kubernetes.Interface) error { - port, err := getEndPointPort(kubeClient) + port, ipFamilies, err := getEndPointInfo(kubeClient) if err != nil { return fmt.Errorf("error when getEndPointPort: %w", err) } apiServerExternalServiceBytes, err := util.ParseTemplate(virtualcluster.APIServerExternalService, struct { ServicePort int32 + IPv4 bool + IPv6 bool }{ ServicePort: port, + IPv4: ipFamilies.IPv4, + IPv6: ipFamilies.IPv6, }) if err != nil { return fmt.Errorf("error when parsing api-server-external-serive template: %w", err) @@ -103,42 +137,89 @@ func CreateOrUpdateAPIServerExternalService(kubeClient kubernetes.Interface) err if err := yaml.Unmarshal([]byte(apiServerExternalServiceBytes), &svc); err != nil { return fmt.Errorf("err when decoding api-server-external-service in virtual cluster: %w", err) } - _, err = kubeClient.CoreV1().Services(constants.DefaultNs).Get(context.TODO(), constants.APIServerExternalService, metav1.GetOptions{}) + _, err = kubeClient.CoreV1().Services(constants.KosmosNs).Get(context.TODO(), constants.APIServerExternalService, metav1.GetOptions{}) if err != nil { if apierrors.IsNotFound(err) { - // Try to create the service - _, err = kubeClient.CoreV1().Services(constants.DefaultNs).Create(context.TODO(), &svc, metav1.CreateOptions{}) + _, err = kubeClient.CoreV1().Services(constants.KosmosNs).Create(context.TODO(), &svc, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("error when creating api-server-external-service: %w", err) } + klog.V(4).Info("successfully created api-server-external-service service") } else { return fmt.Errorf("error when get api-server-external-service: %w", err) } + } else { + _, err = kubeClient.CoreV1().Services(constants.KosmosNs).Update(context.TODO(), &svc, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("error when updating api-server-external-service: %w", err) + } + klog.V(4).Info("successfully updated api-server-external-service service") } - klog.V(4).Info("successfully created api-server-external-service service") + return nil } -func getEndPointPort(kubeClient kubernetes.Interface) (int32, error) { +func getEndPointInfo(kubeClient kubernetes.Interface) (int32, IPFamilies, error) { klog.V(4).Info("begin to get Endpoints ports...") - endpoints, err := kubeClient.CoreV1().Endpoints(constants.DefaultNs).Get(context.TODO(), constants.APIServerExternalService, metav1.GetOptions{}) + endpoints, err := kubeClient.CoreV1().Endpoints(constants.KosmosNs).Get(context.TODO(), constants.APIServerExternalService, metav1.GetOptions{}) if err != nil { klog.Errorf("get Endpoints failed: %v", err) - return 0, err + return 0, IPFamilies{}, err } if len(endpoints.Subsets) == 0 { klog.Errorf("subsets is empty") - return 0, fmt.Errorf("No subsets found in the endpoints") + return 0, IPFamilies{}, fmt.Errorf("No subsets found in the endpoints") } subset := endpoints.Subsets[0] + if len(subset.Ports) == 0 { klog.Errorf("Port not found in the endpoint") - return 0, fmt.Errorf("No ports found in the endpoint") + return 0, IPFamilies{}, fmt.Errorf("No ports found in the endpoint") } port := subset.Ports[0].Port klog.V(4).Infof("The port number was successfully obtained: %d", port) - return port, nil + + ipFamilies := IPFamilies{ + IPv4: false, + IPv6: false, + } + + // Check if the addresses contain IPv4 or IPv6 + for _, address := range subset.Addresses { + if utils.IsIPv4(address.IP) { + ipFamilies.IPv4 = true + } + if utils.IsIPv6(address.IP) { + ipFamilies.IPv6 = true + } + } + + klog.V(4).Infof("IPv4: %v, IPv6: %v", ipFamilies.IPv4, ipFamilies.IPv6) + return port, ipFamilies, nil +} + +func EnsureKosmosSystemNamespace(kubeClient kubernetes.Interface) error { + _, err := kubeClient.CoreV1().Namespaces().Get(context.Background(), constants.KosmosNs, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + namespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.KosmosNs, + }, + } + _, err = kubeClient.CoreV1().Namespaces().Create(context.Background(), namespace, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create kosmos-system namespace: %v", err) + } + klog.V(4).Info("Created kosmos-system namespace") + return nil + } + + return fmt.Errorf("failed to get kosmos-system namespace: %v", err) + } + + return nil } diff --git a/pkg/kubenest/controlplane/endpoint_test.go b/pkg/kubenest/controlplane/endpoint_test.go new file mode 100644 index 000000000..e24719adc --- /dev/null +++ b/pkg/kubenest/controlplane/endpoint_test.go @@ -0,0 +1,133 @@ +package controlplane + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + fakeclientset "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" +) + +func TestEnsureKosmosSystemNamespace(t *testing.T) { + t.Run("Namespace exists", func(t *testing.T) { + client := fakeclientset.NewSimpleClientset(&corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.KosmosNs, + }, + }) + err := EnsureKosmosSystemNamespace(client) + assert.NoError(t, err, "Namespace already exists but failed") + }) + + t.Run("Namespace not exists and created successfully", func(t *testing.T) { + client := fakeclientset.NewSimpleClientset() + err := EnsureKosmosSystemNamespace(client) + assert.NoError(t, err, "Failed to create namespace") + }) + + t.Run("Error creating namespace", func(t *testing.T) { + client := fakeclientset.NewSimpleClientset() + client.PrependReactor("create", "namespaces", func(action k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, fmt.Errorf("creation error") + }) + err := EnsureKosmosSystemNamespace(client) + assert.Error(t, err, "Expected error when creating namespace") + assert.EqualError(t, err, "failed to create kosmos-system namespace: creation error", "Error message mismatch") + }) +} + +func TestCreateOrUpdateAPIServerExternalService(t *testing.T) { + t.Run("Successfully create Service", func(t *testing.T) { + client := fakeclientset.NewSimpleClientset() + + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.APIServerExternalService, + Namespace: constants.KosmosNs, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + {IP: "192.168.1.2"}, + }, + Ports: []corev1.EndpointPort{ + {Port: 6443}, + }, + }, + }, + } + + namespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.KosmosNs, + }, + } + + _, err := client.CoreV1().Namespaces().Create(context.TODO(), namespace, metav1.CreateOptions{}) + assert.NoError(t, err) + + _, err = client.CoreV1().Endpoints(constants.KosmosNs).Create(context.TODO(), endpoint, metav1.CreateOptions{}) + assert.NoError(t, err) + + err = CreateOrUpdateAPIServerExternalService(client) + assert.NoError(t, err) + + svc, err := client.CoreV1().Services(constants.KosmosNs).Get(context.TODO(), constants.APIServerExternalService, metav1.GetOptions{}) + assert.NoError(t, err) + assert.NotNil(t, svc) + assert.Equal(t, constants.APIServerExternalService, svc.Name) + assert.Equal(t, int32(6443), svc.Spec.Ports[0].Port) + }) + + t.Run("Error case - Endpoint not found", func(t *testing.T) { + client := fakeclientset.NewSimpleClientset() + err := CreateOrUpdateAPIServerExternalService(client) + assert.Error(t, err) + assert.Equal(t, "error when getEndPointPort: endpoints \"api-server-external-service\" not found", err.Error()) + }) +} + +func TestGetEndPointInfo(t *testing.T) { + t.Run("Successfully retrieve Endpoint info", func(t *testing.T) { + client := fakeclientset.NewSimpleClientset(&corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.APIServerExternalService, + Namespace: constants.KosmosNs, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + {IP: "192.168.1.1"}, + }, + Ports: []corev1.EndpointPort{ + {Port: 6443}, + }, + }, + }, + }) + port, ipFamilies, err := getEndPointInfo(client) + assert.NoError(t, err) + assert.Equal(t, int32(6443), port) + assert.True(t, ipFamilies.IPv4) + assert.False(t, ipFamilies.IPv6) + }) + + t.Run("No subsets in endpoint", func(t *testing.T) { + client := fakeclientset.NewSimpleClientset(&corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.APIServerExternalService, + Namespace: constants.KosmosNs, + }, + }) + _, _, err := getEndPointInfo(client) + assert.Error(t, err) + assert.Contains(t, err.Error(), "No subsets found in the endpoints") + }) +} diff --git a/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go b/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go index e0d054251..4d9d34257 100644 --- a/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go +++ b/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go @@ -307,7 +307,11 @@ spec: image: {{ .ImageRepository }}/kas-network-proxy-server:{{ .Version }} resources: requests: - cpu: 1m + cpu: 100m + memory: 128Mi + limits: + cpu: 500m + memory: 256Mi securityContext: allowPrivilegeEscalation: false runAsUser: 0 @@ -484,9 +488,11 @@ spec: image: {{ .ImageRepository }}/kas-network-proxy-agent:{{ .Version }} resources: requests: - cpu: 50m + cpu: 100m + memory: 100Mi limits: - memory: 30Mi + cpu: 500m + memory: 500Mi command: [ "/proxy-agent"] args: [ "--logtostderr=true", diff --git a/pkg/kubenest/manifest/controlplane/virtualcluster/manifests_service.go b/pkg/kubenest/manifest/controlplane/virtualcluster/manifests_service.go index 614fbd753..1c69d9f65 100644 --- a/pkg/kubenest/manifest/controlplane/virtualcluster/manifests_service.go +++ b/pkg/kubenest/manifest/controlplane/virtualcluster/manifests_service.go @@ -6,13 +6,23 @@ apiVersion: v1 kind: Service metadata: name: api-server-external-service - namespace: default + namespace: kosmos-system spec: + ipFamilies: + {{- if .IPv4 }} + - IPv4 + {{- end }} + {{- if .IPv6 }} + - IPv6 + {{- end }} + ipFamilyPolicy: PreferDualStack type: NodePort ports: - - protocol: TCP + - name: https + protocol: TCP port: {{ .ServicePort }} targetPort: {{ .ServicePort }} nodePort: 30443 + sessionAffinity: None ` ) diff --git a/pkg/kubenest/tasks/endpoint.go b/pkg/kubenest/tasks/endpoint.go index 8fcd9d31b..be78866a7 100644 --- a/pkg/kubenest/tasks/endpoint.go +++ b/pkg/kubenest/tasks/endpoint.go @@ -9,6 +9,7 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" + "github.com/kosmos.io/kosmos/pkg/kubenest/common" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" "github.com/kosmos.io/kosmos/pkg/kubenest/controlplane" "github.com/kosmos.io/kosmos/pkg/kubenest/util" @@ -60,7 +61,14 @@ func runEndPointInVirtualClusterTask(r workflow.RunData) error { return err } - err = controlplane.EnsureAPIServerExternalEndPoint(kubeClient) + apiServerExternalResource := common.APIServerExternalResource{ + Namespace: data.GetNamespace(), + Name: data.GetName(), + Vc: data.VirtualCluster(), + RootClientSet: data.RemoteClient(), + } + + err = controlplane.EnsureAPIServerExternalEndPoint(kubeClient, apiServerExternalResource) if err != nil { return err } diff --git a/pkg/kubenest/util/node.go b/pkg/kubenest/util/node.go index 3c7c3a936..faa7c8af6 100644 --- a/pkg/kubenest/util/node.go +++ b/pkg/kubenest/util/node.go @@ -6,9 +6,12 @@ import ( "os" "time" + "github.com/pkg/errors" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" drain "k8s.io/kubectl/pkg/drain" ) @@ -59,3 +62,50 @@ func DrainNode(ctx context.Context, nodeName string, client kubernetes.Interface } return nil } + +func GetAPIServerNodes(rootClientSet kubernetes.Interface, namespace string) (*v1.NodeList, error) { + klog.V(4).Info("begin to get API server nodes") + + apiServerPods, err := rootClientSet.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{ + LabelSelector: "virtualCluster-app=apiserver", + }) + if err != nil { + klog.Errorf("failed to list kube-apiserver pod: %v", err) + return nil, errors.Wrap(err, "failed to list kube-apiserver pods") + } + + var nodeNames []string + for _, pod := range apiServerPods.Items { + klog.V(4).Infof("API server pod %s is on node: %s", pod.Name, pod.Spec.NodeName) + nodeNames = append(nodeNames, pod.Spec.NodeName) + } + + if len(nodeNames) == 0 { + klog.Errorf("no API server pods found in the namespace") + return nil, fmt.Errorf("no API server pods found") + } + + var nodesList []v1.Node + for _, nodeName := range nodeNames { + node, err := rootClientSet.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + klog.Errorf("failed to get node %s: %v", nodeName, err) + return nil, fmt.Errorf("failed to get node %s: %v", nodeName, err) + } + klog.V(4).Infof("Found node: %s", node.Name) + nodesList = append(nodesList, *node) + } + + nodes := &v1.NodeList{ + Items: nodesList, + } + + klog.V(4).Infof("got %d API server nodes", len(nodes.Items)) + + if len(nodes.Items) == 0 { + klog.Errorf("no nodes found for the API server pods") + return nil, fmt.Errorf("no nodes found for the API server pods") + } + + return nodes, nil +} diff --git a/pkg/kubenest/util/node_test.go b/pkg/kubenest/util/node_test.go index a04d5419f..90305351d 100644 --- a/pkg/kubenest/util/node_test.go +++ b/pkg/kubenest/util/node_test.go @@ -2,13 +2,16 @@ package util import ( "context" - "testing" // 确保只导入一次 + "errors" + "testing" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" - clientTesting "k8s.io/client-go/testing" // 使用别名避免和 Go 原生 testing 冲突 + clientTesting "k8s.io/client-go/testing" ) // TestIsNodeReady tests the IsNodeReady function @@ -128,7 +131,7 @@ func TestDrainNode(t *testing.T) { wantErr: false, prepare: func() { fakeClient.Fake.PrependReactor("get", "nodes", func(action clientTesting.Action) (bool, runtime.Object, error) { - return true, nil, errors.NewNotFound(v1.Resource("nodes"), "non-existent-node") + return true, nil, apierrors.NewNotFound(v1.Resource("nodes"), "non-existent-node") }) }, }, @@ -147,3 +150,101 @@ func TestDrainNode(t *testing.T) { }) } } + +func TestGetAPIServerNodes(t *testing.T) { + namespace := "test-namespace" + + t.Run("Successfully Get API Server Nodes", func(t *testing.T) { + apiServerPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "apiserver-pod-1", + Namespace: namespace, + Labels: map[string]string{"virtualCluster-app": "apiserver"}, + }, + Spec: v1.PodSpec{ + NodeName: "node1", + }, + } + apiServerNode := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "192.168.1.10"}, + }, + }, + } + + client := fake.NewSimpleClientset(apiServerPod, apiServerNode) + + nodes, err := GetAPIServerNodes(client, namespace) + assert.NoError(t, err, "Should successfully get API server nodes") + assert.Len(t, nodes.Items, 1, "Expected exactly one node") + assert.Equal(t, "node1", nodes.Items[0].Name, "Node name should match") + }) + + t.Run("No API Server Pods Found", func(t *testing.T) { + client := fake.NewSimpleClientset() + + nodes, err := GetAPIServerNodes(client, namespace) + assert.Error(t, err, "Should fail when no API server pods are found") + assert.Contains(t, err.Error(), "no API server pods found", "Error message should match") + assert.Nil(t, nodes, "Nodes should be nil when no API server pods are found") + }) + + t.Run("Error Listing API Server Pods", func(t *testing.T) { + client := fake.NewSimpleClientset() + client.PrependReactor("list", "pods", func(action clientTesting.Action) (bool, runtime.Object, error) { + return true, nil, errors.New("mock error: failed to list pods") + }) + + nodes, err := GetAPIServerNodes(client, namespace) + assert.Error(t, err, "Should fail when listing pods returns an error") + assert.Contains(t, err.Error(), "failed to list kube-apiserver pods", "Error message should match") + assert.Nil(t, nodes, "Nodes should be nil when pod listing fails") + }) + + t.Run("Error Fetching Node Information", func(t *testing.T) { + apiServerPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "apiserver-pod-1", + Namespace: namespace, + Labels: map[string]string{"virtualCluster-app": "apiserver"}, + }, + Spec: v1.PodSpec{ + NodeName: "node1", + }, + } + + client := fake.NewSimpleClientset(apiServerPod) + client.PrependReactor("get", "nodes", func(action clientTesting.Action) (bool, runtime.Object, error) { + return true, nil, errors.New("mock error: failed to get node") + }) + + nodes, err := GetAPIServerNodes(client, namespace) + assert.Error(t, err, "Should fail when fetching node information returns an error") + assert.Contains(t, err.Error(), "failed to get node", "Error message should match") + assert.Nil(t, nodes, "Nodes should be nil when node fetching fails") + }) + + t.Run("Pod Exists but Node Not Found", func(t *testing.T) { + apiServerPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "apiserver-pod-1", + Namespace: namespace, + Labels: map[string]string{"virtualCluster-app": "apiserver"}, + }, + Spec: v1.PodSpec{ + NodeName: "node1", + }, + } + + client := fake.NewSimpleClientset(apiServerPod) + + nodes, err := GetAPIServerNodes(client, namespace) + assert.Error(t, err, "Should fail when node does not exist") + assert.Contains(t, err.Error(), "failed to get node", "Error message should match") + assert.Nil(t, nodes, "Nodes should be nil when node is not found") + }) +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 71b0a17df..e6636f67d 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -2,6 +2,7 @@ package utils import ( "fmt" + "net" "os" "strings" @@ -31,6 +32,12 @@ func IsIPv6(s string) bool { return false } +// IsIPv4 checks if the given IP address is IPv4. +func IsIPv4(ip string) bool { + parsedIP := net.ParseIP(ip) + return parsedIP != nil && parsedIP.To4() != nil +} + func GetEnvWithDefaultValue(envName string, defaultValue string) string { v := os.Getenv(envName) if len(v) == 0 {