Skip to content

Commit e4272ef

Browse files
authored
Merge pull request kosmos-io#628 from qiuwei68/feature_hostports
feat: install api-server-external-service in virtualcluster
2 parents b6925c7 + f8beed3 commit e4272ef

File tree

8 files changed

+416
-1
lines changed

8 files changed

+416
-1
lines changed

cmd/kubenest/operator/app/operator.go

+9
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,15 @@ func startEndPointsControllers(mgr manager.Manager) error {
7575
return fmt.Errorf("error starting %s: %v", endpointscontroller.KonnectivitySyncControllerName, err)
7676
}
7777

78+
ApiServerExternalSyncController := endpointscontroller.ApiServerExternalSyncController{
79+
Client: mgr.GetClient(),
80+
EventRecorder: mgr.GetEventRecorderFor(constants.GlobalNodeControllerName),
81+
}
82+
83+
if err := ApiServerExternalSyncController.SetupWithManager(mgr); err != nil {
84+
return fmt.Errorf("error starting %s: %v", endpointscontroller.ApiServerExternalSyncControllerName, err)
85+
}
86+
7887
return nil
7988
}
8089

pkg/kubenest/constants/constant.go

+4
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ const (
99
KosmosJoinControllerName = "kosmos-join-controller"
1010
KosmosNs = "kosmos-system"
1111
SystemNs = "kube-system"
12+
DefaultNs = "default"
1213
DefaultImageRepositoryEnv = "IMAGE_REPOSITIRY"
1314
DefaultImageVersionEnv = "IMAGE_VERSION"
1415
DefaultCoreDnsImageTagEnv = "COREDNS_IMAGE_TAG"
@@ -117,6 +118,9 @@ const (
117118
StateLabelKey = "kosmos-io/state"
118119

119120
KonnectivityServerSuffix = "konnectivity-server"
121+
122+
//in virtual cluster
123+
ApiServerExternalService = "api-server-external-service"
120124
)
121125

122126
type Action string
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package endpointcontroller
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
8+
v1 "k8s.io/api/core/v1"
9+
apierrors "k8s.io/apimachinery/pkg/api/errors"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/client-go/kubernetes"
12+
"k8s.io/client-go/tools/record"
13+
"k8s.io/client-go/util/retry"
14+
"k8s.io/klog/v2"
15+
controllerruntime "sigs.k8s.io/controller-runtime"
16+
"sigs.k8s.io/controller-runtime/pkg/builder"
17+
"sigs.k8s.io/controller-runtime/pkg/client"
18+
"sigs.k8s.io/controller-runtime/pkg/controller"
19+
"sigs.k8s.io/controller-runtime/pkg/event"
20+
"sigs.k8s.io/controller-runtime/pkg/manager"
21+
"sigs.k8s.io/controller-runtime/pkg/predicate"
22+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
23+
24+
"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
25+
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
26+
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
27+
"github.com/kosmos.io/kosmos/pkg/utils"
28+
)
29+
30+
type ApiServerExternalSyncController struct {
31+
client.Client
32+
EventRecorder record.EventRecorder
33+
}
34+
35+
const ApiServerExternalSyncControllerName string = "api-server-external-service-sync-controller"
36+
37+
func (e *ApiServerExternalSyncController) SetupWithManager(mgr manager.Manager) error {
38+
skipEvent := func(obj client.Object) bool {
39+
return strings.Contains(obj.GetName(), "apiserver") && obj.GetNamespace() != ""
40+
}
41+
42+
return controllerruntime.NewControllerManagedBy(mgr).
43+
Named(ApiServerExternalSyncControllerName).
44+
WithOptions(controller.Options{MaxConcurrentReconciles: 5}).
45+
For(&v1.Endpoints{},
46+
builder.WithPredicates(predicate.Funcs{
47+
CreateFunc: func(createEvent event.CreateEvent) bool {
48+
return skipEvent(createEvent.Object)
49+
},
50+
UpdateFunc: func(updateEvent event.UpdateEvent) bool { return skipEvent(updateEvent.ObjectNew) },
51+
DeleteFunc: func(deleteEvent event.DeleteEvent) bool { return false },
52+
})).
53+
Complete(e)
54+
}
55+
56+
func (e *ApiServerExternalSyncController) SyncApiServerExternalEPS(ctx context.Context, k8sClient kubernetes.Interface) error {
57+
kubeEndpoints, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Get(ctx, "kubernetes", metav1.GetOptions{})
58+
if err != nil {
59+
klog.Errorf("Error getting endpoints: %v", err)
60+
return fmt.Errorf("failed to get endpoints for kubernetes service: %v", err)
61+
} else {
62+
klog.Infof("Endpoints for service 'kubernetes': %v", kubeEndpoints)
63+
for _, subset := range kubeEndpoints.Subsets {
64+
for _, address := range subset.Addresses {
65+
klog.Infof("IP: %s", address.IP)
66+
}
67+
}
68+
}
69+
70+
if len(kubeEndpoints.Subsets) != 1 {
71+
return fmt.Errorf("eps %s Subsets length is not 1", "kubernetes")
72+
}
73+
74+
if kubeEndpoints.Subsets[0].Addresses == nil || len(kubeEndpoints.Subsets[0].Addresses) == 0 {
75+
return fmt.Errorf("eps %s Addresses length is nil", "kubernetes")
76+
}
77+
78+
apiServerExternalEndpoints, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Get(ctx, constants.ApiServerExternalService, metav1.GetOptions{})
79+
if err != nil && !apierrors.IsNotFound(err) {
80+
return fmt.Errorf("failed to get endpoints for %s : %v", constants.ApiServerExternalService, err)
81+
}
82+
83+
updateEPS := apiServerExternalEndpoints.DeepCopy()
84+
85+
if apiServerExternalEndpoints != nil {
86+
klog.Infof("apiServerExternalEndpoints: %v", apiServerExternalEndpoints)
87+
} else {
88+
klog.Info("apiServerExternalEndpoints is nil")
89+
}
90+
91+
if updateEPS != nil {
92+
klog.Infof("updateEPS: %v", updateEPS)
93+
} else {
94+
klog.Info("updateEPS is nil")
95+
}
96+
97+
if len(updateEPS.Subsets) == 1 && len(updateEPS.Subsets[0].Addresses) == 1 {
98+
ip := kubeEndpoints.Subsets[0].Addresses[0].IP
99+
klog.Infof("IP address: %s", ip)
100+
updateEPS.Subsets[0].Addresses[0].IP = ip
101+
102+
if _, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Update(ctx, updateEPS, metav1.UpdateOptions{}); err != nil {
103+
return fmt.Errorf("failed to update endpoints for api-server-external-service: %v", err)
104+
}
105+
} else {
106+
klog.ErrorS(err, "Unexpected format of endpoints for api-server-external-service", "endpoint_data", updateEPS)
107+
return fmt.Errorf("unexpected format of endpoints for api-server-external-service")
108+
}
109+
110+
return nil
111+
}
112+
113+
func (e *ApiServerExternalSyncController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
114+
klog.V(4).Infof("============ %s start to reconcile %s ============", ApiServerExternalSyncControllerName, request.NamespacedName)
115+
defer klog.V(4).Infof("============ %s finish to reconcile %s ============", ApiServerExternalSyncControllerName, request.NamespacedName)
116+
117+
var virtualClusterList v1alpha1.VirtualClusterList
118+
if err := e.List(ctx, &virtualClusterList); err != nil {
119+
if apierrors.IsNotFound(err) {
120+
return reconcile.Result{}, nil
121+
}
122+
klog.V(4).Infof("query virtualcluster failed: %v", err)
123+
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
124+
}
125+
var targetVirtualCluster v1alpha1.VirtualCluster
126+
hasVirtualCluster := false
127+
for _, vc := range virtualClusterList.Items {
128+
if vc.Namespace == request.Namespace {
129+
targetVirtualCluster = vc
130+
klog.V(4).Infof("virtualcluster %s found", targetVirtualCluster.Name)
131+
hasVirtualCluster = true
132+
break
133+
}
134+
}
135+
if !hasVirtualCluster {
136+
klog.V(4).Infof("virtualcluster %s not found", request.Namespace)
137+
return reconcile.Result{}, nil
138+
}
139+
140+
if targetVirtualCluster.Status.Phase != v1alpha1.AllNodeReady && targetVirtualCluster.Status.Phase != v1alpha1.Completed {
141+
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
142+
}
143+
144+
k8sClient, err := util.GenerateKubeclient(&targetVirtualCluster)
145+
if err != nil {
146+
klog.Errorf("virtualcluster %s crd kubernetes client failed: %v", targetVirtualCluster.Name, err)
147+
return reconcile.Result{}, nil
148+
}
149+
150+
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
151+
return e.SyncApiServerExternalEPS(ctx, k8sClient)
152+
}); err != nil {
153+
klog.Errorf("virtualcluster %s sync apiserver external endpoints failed: %v", targetVirtualCluster.Name, err)
154+
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
155+
}
156+
157+
return reconcile.Result{}, nil
158+
}

pkg/kubenest/controlplane/endpoint.go

+158
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package controlplane
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/pkg/errors"
8+
corev1 "k8s.io/api/core/v1"
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
11+
"k8s.io/apimachinery/pkg/runtime"
12+
"k8s.io/apimachinery/pkg/runtime/schema"
13+
"k8s.io/apimachinery/pkg/util/yaml"
14+
"k8s.io/client-go/dynamic"
15+
"k8s.io/klog/v2"
16+
17+
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
18+
"github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/virtualcluster"
19+
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
20+
)
21+
22+
func EnsureApiServerExternalEndPoint(dynamicClient dynamic.Interface) error {
23+
err := installApiServerExternalEndpointInVirtualCluster(dynamicClient)
24+
if err != nil {
25+
return err
26+
}
27+
28+
err = installApiServerExternalServiceInVirtualCluster(dynamicClient)
29+
if err != nil {
30+
return err
31+
}
32+
return nil
33+
}
34+
35+
func installApiServerExternalEndpointInVirtualCluster(dynamicClient dynamic.Interface) error {
36+
klog.Info("begin to get kubernetes endpoint")
37+
kubeEndpointUnstructured, err := dynamicClient.Resource(schema.GroupVersionResource{
38+
Group: "",
39+
Version: "v1",
40+
Resource: "endpoints",
41+
}).Namespace(constants.DefaultNs).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
42+
if err != nil {
43+
klog.Error("get Kubernetes endpoint failed", err)
44+
return errors.Wrap(err, "failed to get kubernetes endpoint")
45+
}
46+
klog.V(4).Info("the Kubernetes endpoint is:", kubeEndpointUnstructured)
47+
48+
if kubeEndpointUnstructured != nil {
49+
kubeEndpoint := &corev1.Endpoints{}
50+
err := runtime.DefaultUnstructuredConverter.FromUnstructured(kubeEndpointUnstructured.Object, kubeEndpoint)
51+
if err != nil {
52+
klog.Error("switch Kubernetes endpoint to typed object failed", err)
53+
return errors.Wrap(err, "failed to convert kubernetes endpoint to typed object")
54+
}
55+
56+
newEndpoint := kubeEndpoint.DeepCopy()
57+
newEndpoint.Name = constants.ApiServerExternalService
58+
newEndpoint.Namespace = constants.DefaultNs
59+
newEndpoint.ResourceVersion = ""
60+
newEndpointUnstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(newEndpoint)
61+
if err != nil {
62+
klog.Error("switch new endpoint to unstructured object failed", err)
63+
return errors.Wrap(err, "failed to convert new endpoint to unstructured object")
64+
}
65+
klog.V(4).Info("after switch the Endpoint unstructured is:", newEndpointUnstructuredObj)
66+
67+
newEndpointUnstructured := &unstructured.Unstructured{Object: newEndpointUnstructuredObj}
68+
createResult, err := dynamicClient.Resource(schema.GroupVersionResource{
69+
Group: "",
70+
Version: "v1",
71+
Resource: "endpoints",
72+
}).Namespace(constants.DefaultNs).Create(context.TODO(), newEndpointUnstructured, metav1.CreateOptions{})
73+
if err != nil {
74+
klog.Error("create api-server-external-service endpoint failed", err)
75+
return errors.Wrap(err, "failed to create api-server-external-service endpoint")
76+
} else {
77+
klog.Info("success create api-server-external-service endpoint:", createResult)
78+
}
79+
} else {
80+
return errors.New("kubernetes endpoint does not exist")
81+
}
82+
83+
return nil
84+
}
85+
86+
func installApiServerExternalServiceInVirtualCluster(dynamicClient dynamic.Interface) error {
87+
port, err := getEndPointPort(dynamicClient)
88+
if err != nil {
89+
return fmt.Errorf("error when getEndPointPort: %w", err)
90+
}
91+
apiServerExternalServiceBytes, err := util.ParseTemplate(virtualcluster.ApiServerExternalService, struct {
92+
ServicePort int32
93+
}{
94+
ServicePort: port,
95+
})
96+
if err != nil {
97+
return fmt.Errorf("error when parsing api-server-external-serive template: %w", err)
98+
}
99+
100+
var obj unstructured.Unstructured
101+
if err := yaml.Unmarshal([]byte(apiServerExternalServiceBytes), &obj); err != nil {
102+
return fmt.Errorf("err when decoding api-server-external service in virtual cluster: %w", err)
103+
}
104+
105+
err = util.CreateObject(dynamicClient, "default", "api-server-external-service", &obj)
106+
if err != nil {
107+
return fmt.Errorf("error when creating api-server-external service in virtual cluster err: %w", err)
108+
}
109+
return nil
110+
}
111+
112+
func getEndPointPort(dynamicClient dynamic.Interface) (int32, error) {
113+
klog.Info("begin to get Endpoints ports...")
114+
endpointsRes := dynamicClient.Resource(schema.GroupVersionResource{
115+
Group: "",
116+
Version: "v1",
117+
Resource: "endpoints",
118+
}).Namespace(constants.DefaultNs)
119+
120+
endpointsRaw, err := endpointsRes.Get(context.TODO(), constants.ApiServerExternalService, metav1.GetOptions{})
121+
if err != nil {
122+
klog.Errorf("get Endpoints failed: %v", err)
123+
return 0, err
124+
}
125+
126+
subsets, found, err := unstructured.NestedSlice(endpointsRaw.Object, "subsets")
127+
if !found || err != nil {
128+
klog.Errorf("The subsets field was not found or parsing error occurred: %v", err)
129+
return 0, fmt.Errorf("subsets field not found or error parsing it")
130+
}
131+
132+
if len(subsets) == 0 {
133+
klog.Errorf("subsets is empty")
134+
return 0, fmt.Errorf("No subsets found in the endpoints")
135+
}
136+
137+
subset := subsets[0].(map[string]interface{})
138+
ports, found, err := unstructured.NestedSlice(subset, "ports")
139+
if !found || err != nil {
140+
klog.Errorf("ports field not found or parsing error: %v", err)
141+
return 0, fmt.Errorf("ports field not found or error parsing it")
142+
}
143+
144+
if len(ports) == 0 {
145+
klog.Errorf("Port not found in the endpoint")
146+
return 0, fmt.Errorf("No ports found in the endpoint")
147+
}
148+
149+
port := ports[0].(map[string]interface{})
150+
portNum, found, err := unstructured.NestedInt64(port, "port")
151+
if !found || err != nil {
152+
klog.Errorf("ports field not found or parsing error: %v", err)
153+
return 0, fmt.Errorf("port field not found or error parsing it")
154+
}
155+
156+
klog.Infof("The port number was successfully obtained: %d", portNum)
157+
return int32(portNum), nil
158+
}

pkg/kubenest/init.go

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func NewInitPhase(opts *InitOptions) *workflow.Phase {
7070
initPhase.AppendTask(tasks.NewCoreDNSTask())
7171
// add server
7272
initPhase.AppendTask(tasks.NewComponentsFromManifestsTask())
73+
initPhase.AppendTask(tasks.NewEndPointTask())
7374

7475
initPhase.SetDataInitializer(func() (workflow.RunData, error) {
7576
return newRunData(opts)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package virtualcluster
2+
3+
const (
4+
ApiServerExternalService = `
5+
apiVersion: v1
6+
kind: Service
7+
metadata:
8+
name: api-server-external-service
9+
namespace: default
10+
spec:
11+
type: NodePort
12+
ports:
13+
- protocol: TCP
14+
port: {{ .ServicePort }}
15+
targetPort: {{ .ServicePort }}
16+
nodePort: 30443
17+
`
18+
)

pkg/kubenest/tasks/coredns.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func runCheckCoreDnsTask(r workflow.RunData) error {
133133
func runCoreDnsVirtualTask(r workflow.RunData) error {
134134
data, ok := r.(InitData)
135135
if !ok {
136-
return errors.New("Virtual cluster manifests-components task invoked with an invalid data struct")
136+
return errors.New("Virtual cluster coreDns task invoked with an invalid data struct")
137137
}
138138

139139
secret, err := data.RemoteClient().CoreV1().Secrets(data.GetNamespace()).Get(context.TODO(),

0 commit comments

Comments
 (0)