diff --git a/.gitignore b/.gitignore index 4ba002f49..5654b91d9 100644 --- a/.gitignore +++ b/.gitignore @@ -30,4 +30,8 @@ kube-config __debug_bin* -ignore_dir \ No newline at end of file +ignore_dir +cmd/kubenest/node-agent/app.log +cmd/kubenest/node-agent/cert.pem +cmd/kubenest/node-agent/key.pem +cmd/kubenest/node-agent/agent.env \ No newline at end of file diff --git a/cmd/kubenest/node-agent/app/serve/serve.go b/cmd/kubenest/node-agent/app/serve/serve.go index eb07e0db6..c3047d5c2 100644 --- a/cmd/kubenest/node-agent/app/serve/serve.go +++ b/cmd/kubenest/node-agent/app/serve/serve.go @@ -155,12 +155,13 @@ func handleCheck(conn *websocket.Conn, params url.Values) { listener, err := net.Listen("tcp", address) if err != nil { log.Infof("port not avalible %s %v", address, err) - _ = conn.WriteMessage(websocket.BinaryMessage, []byte("1")) + _ = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, fmt.Sprintf("%d", 1))) return } defer listener.Close() log.Infof("port avalible %s", address) - _ = conn.WriteMessage(websocket.BinaryMessage, []byte("0")) + // _ = conn.WriteMessage(websocket.BinaryMessage, []byte("0")) + _ = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, fmt.Sprintf("%d", 0))) } func handleTty(conn *websocket.Conn, queryParams url.Values) { diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/exector/exector.go b/pkg/kubenest/controller/virtualcluster.node.controller/exector/exector.go index e2e26a0b5..ed3032732 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/exector/exector.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/exector/exector.go @@ -30,10 +30,11 @@ type ExectorReturn struct { Reason string LastLog string Text string + Code int } func (r *ExectorReturn) String() string { - return fmt.Sprintf("%d, %s, %s", r.Status, r.Reason, r.LastLog) + return fmt.Sprintf("%d, %s, %s, %d", r.Status, r.Reason, r.LastLog, r.Code) } type Exector interface { @@ -98,7 +99,7 @@ func (h *ExectorHelper) DoExector(stopCh <-chan struct{}, exector Exector) *Exec func (h *ExectorHelper) DoExectorReal(stopCh <-chan struct{}, exector Exector) *ExectorReturn { // default is error result := &ExectorReturn{ - FAILED, "init exector return status", "", "", + FAILED, "init exector return status", "", "", 0, } // nolint @@ -133,6 +134,7 @@ func (h *ExectorHelper) DoExectorReal(stopCh <-chan struct{}, exector Exector) * result.Reason = "command not found" result.Text = cerr.Text } + result.Code = cerr.Code } else { result.Reason = err.Error() } diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/exector/remote_check.go b/pkg/kubenest/controller/virtualcluster.node.controller/exector/remote_check.go new file mode 100644 index 000000000..25da806ef --- /dev/null +++ b/pkg/kubenest/controller/virtualcluster.node.controller/exector/remote_check.go @@ -0,0 +1,20 @@ +package exector + +import ( + "github.com/gorilla/websocket" +) + +type CheckExector struct { + Port string +} + +func (e *CheckExector) GetWebSocketOption() WebSocketOption { + rawQuery := "port=" + e.Port + return WebSocketOption{ + Path: "check/", + RawQuery: rawQuery, + } +} + +func (e *CheckExector) SendHandler(_ *websocket.Conn, _ <-chan struct{}, _ chan struct{}, _ *ExectorReturn) { +} diff --git a/pkg/kubenest/controller/virtualcluster_init_controller.go b/pkg/kubenest/controller/virtualcluster_init_controller.go index 6ae7bcdad..652a582a2 100644 --- a/pkg/kubenest/controller/virtualcluster_init_controller.go +++ b/pkg/kubenest/controller/virtualcluster_init_controller.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "gopkg.in/yaml.v3" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -32,7 +33,10 @@ import ( "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env" + "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/exector" "github.com/kosmos.io/kosmos/pkg/kubenest/util" + apiclient "github.com/kosmos.io/kosmos/pkg/kubenest/util/api-client" ) type VirtualClusterInitController struct { @@ -488,12 +492,13 @@ func GetHostPortPoolFromConfigMap(client kubernetes.Interface, ns, cmName, dataK return &hostPool, nil } -func (c *VirtualClusterInitController) isPortAllocated(port int32) bool { +// Return false to indicate that the port is not occupied +func (c *VirtualClusterInitController) isPortAllocated(port int32, hostAddress []string) bool { vcList := &v1alpha1.VirtualClusterList{} err := c.List(context.Background(), vcList) if err != nil { klog.Errorf("list virtual cluster error: %v", err) - return false + return true } for _, vc := range vcList.Items { @@ -511,7 +516,84 @@ func (c *VirtualClusterInitController) isPortAllocated(port int32) bool { } } - return false + ret, err := checkPortOnHostWithAddresses(port, hostAddress) + if err != nil { + klog.Errorf("check port on host error: %v", err) + return true + } + return ret +} + +// Return false to indicate that the port is not occupied +func checkPortOnHostWithAddresses(port int32, hostAddress []string) (bool, error) { + for _, addr := range hostAddress { + flag, err := CheckPortOnHost(addr, port) + if err != nil { + return false, err + } + if flag { + return true, nil + } + } + return false, nil +} + +func findAddress(node corev1.Node) (string, error) { + for _, addr := range node.Status.Addresses { + if addr.Type == corev1.NodeInternalIP { + return addr.Address, nil + } + } + return "", fmt.Errorf("cannot find internal IP address in node addresses, node name: %s", node.GetName()) +} + +// Return false to indicate that the port is not occupied +func CheckPortOnHost(addr string, port int32) (bool, error) { + hostExectorHelper := exector.NewExectorHelper(addr, "") + checkCmd := &exector.CheckExector{ + Port: fmt.Sprintf("%d", port), + } + + var ret *exector.ExectorReturn + err := apiclient.TryRunCommand(func() error { + ret = hostExectorHelper.DoExector(context.TODO().Done(), checkCmd) + if ret.Code != 1000 { + return fmt.Errorf("chekc port failed, err: %s", ret.String()) + } + return nil + }, 3) + + if err != nil { + klog.Errorf("check port on host error! addr:%s, port %d, err: %s", addr, port, err.Error()) + return true, err + } + + if ret.Status != exector.SUCCESS { + return true, fmt.Errorf("pod[%d] is occupied", port) + } else { + return false, nil + } +} + +func (c *VirtualClusterInitController) findHostAddresses() ([]string, error) { + nodes, err := c.RootClientSet.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ + LabelSelector: env.GetControlPlaneLabel(), + }) + if err != nil { + return nil, err + } + + ret := []string{} + + for _, node := range nodes.Items { + addr, err := findAddress(node) + if err != nil { + return nil, err + } + + ret = append(ret, addr) + } + return ret, nil } // AllocateHostPort allocate host port for virtual cluster @@ -526,11 +608,20 @@ func (c *VirtualClusterInitController) AllocateHostPort(virtualCluster *v1alpha1 if err != nil { return 0, err } + + hostAddress, err := c.findHostAddresses() + if err != nil { + return 0, err + } + ports := func() []int32 { ports := make([]int32, 0) for _, p := range hostPool.PortsPool { - if !c.isPortAllocated(p) { + if !c.isPortAllocated(p, hostAddress) { ports = append(ports, p) + if len(ports) > constants.VirtualClusterPortNum { + break + } } } return ports