Skip to content

Commit

Permalink
Merge pull request kosmos-io#654 from OrangeBao/release-0.4.0
Browse files Browse the repository at this point in the history
fix: check if the host is occupying the port
  • Loading branch information
duanmengkk authored Jul 23, 2024
2 parents 4e21bd3 + f3a36c1 commit a8c7eae
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 9 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,8 @@ kube-config

__debug_bin*

ignore_dir
ignore_dir
cmd/kubenest/node-agent/app.log
cmd/kubenest/node-agent/cert.pem
cmd/kubenest/node-agent/key.pem
cmd/kubenest/node-agent/agent.env
5 changes: 3 additions & 2 deletions cmd/kubenest/node-agent/app/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,13 @@ func handleCheck(conn *websocket.Conn, params url.Values) {
listener, err := net.Listen("tcp", address)
if err != nil {
log.Infof("port not avalible %s %v", address, err)
_ = conn.WriteMessage(websocket.BinaryMessage, []byte("1"))
_ = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, fmt.Sprintf("%d", 1)))
return
}
defer listener.Close()
log.Infof("port avalible %s", address)
_ = conn.WriteMessage(websocket.BinaryMessage, []byte("0"))
// _ = conn.WriteMessage(websocket.BinaryMessage, []byte("0"))
_ = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, fmt.Sprintf("%d", 0)))
}

func handleTty(conn *websocket.Conn, queryParams url.Values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ type ExectorReturn struct {
Reason string
LastLog string
Text string
Code int
}

func (r *ExectorReturn) String() string {
return fmt.Sprintf("%d, %s, %s", r.Status, r.Reason, r.LastLog)
return fmt.Sprintf("%d, %s, %s, %d", r.Status, r.Reason, r.LastLog, r.Code)
}

type Exector interface {
Expand Down Expand Up @@ -98,7 +99,7 @@ func (h *ExectorHelper) DoExector(stopCh <-chan struct{}, exector Exector) *Exec
func (h *ExectorHelper) DoExectorReal(stopCh <-chan struct{}, exector Exector) *ExectorReturn {
// default is error
result := &ExectorReturn{
FAILED, "init exector return status", "", "",
FAILED, "init exector return status", "", "", 0,
}

// nolint
Expand Down Expand Up @@ -133,6 +134,7 @@ func (h *ExectorHelper) DoExectorReal(stopCh <-chan struct{}, exector Exector) *
result.Reason = "command not found"
result.Text = cerr.Text
}
result.Code = cerr.Code
} else {
result.Reason = err.Error()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package exector

import (
"github.com/gorilla/websocket"
)

type CheckExector struct {
Port string
}

func (e *CheckExector) GetWebSocketOption() WebSocketOption {
rawQuery := "port=" + e.Port
return WebSocketOption{
Path: "check/",
RawQuery: rawQuery,
}
}

func (e *CheckExector) SendHandler(_ *websocket.Conn, _ <-chan struct{}, _ chan struct{}, _ *ExectorReturn) {
}
99 changes: 95 additions & 4 deletions pkg/kubenest/controller/virtualcluster_init_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/pkg/errors"
"gopkg.in/yaml.v3"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -32,7 +33,10 @@ import (
"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env"
"github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/exector"
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
apiclient "github.com/kosmos.io/kosmos/pkg/kubenest/util/api-client"
)

type VirtualClusterInitController struct {
Expand Down Expand Up @@ -488,12 +492,13 @@ func GetHostPortPoolFromConfigMap(client kubernetes.Interface, ns, cmName, dataK
return &hostPool, nil
}

func (c *VirtualClusterInitController) isPortAllocated(port int32) bool {
// Return false to indicate that the port is not occupied
func (c *VirtualClusterInitController) isPortAllocated(port int32, hostAddress []string) bool {
vcList := &v1alpha1.VirtualClusterList{}
err := c.List(context.Background(), vcList)
if err != nil {
klog.Errorf("list virtual cluster error: %v", err)
return false
return true
}

for _, vc := range vcList.Items {
Expand All @@ -511,7 +516,84 @@ func (c *VirtualClusterInitController) isPortAllocated(port int32) bool {
}
}

return false
ret, err := checkPortOnHostWithAddresses(port, hostAddress)
if err != nil {
klog.Errorf("check port on host error: %v", err)
return true
}
return ret
}

// Return false to indicate that the port is not occupied
func checkPortOnHostWithAddresses(port int32, hostAddress []string) (bool, error) {
for _, addr := range hostAddress {
flag, err := CheckPortOnHost(addr, port)
if err != nil {
return false, err
}
if flag {
return true, nil
}
}
return false, nil
}

func findAddress(node corev1.Node) (string, error) {
for _, addr := range node.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
return addr.Address, nil
}
}
return "", fmt.Errorf("cannot find internal IP address in node addresses, node name: %s", node.GetName())
}

// Return false to indicate that the port is not occupied
func CheckPortOnHost(addr string, port int32) (bool, error) {
hostExectorHelper := exector.NewExectorHelper(addr, "")
checkCmd := &exector.CheckExector{
Port: fmt.Sprintf("%d", port),
}

var ret *exector.ExectorReturn
err := apiclient.TryRunCommand(func() error {
ret = hostExectorHelper.DoExector(context.TODO().Done(), checkCmd)
if ret.Code != 1000 {
return fmt.Errorf("chekc port failed, err: %s", ret.String())
}
return nil
}, 3)

if err != nil {
klog.Errorf("check port on host error! addr:%s, port %d, err: %s", addr, port, err.Error())
return true, err
}

if ret.Status != exector.SUCCESS {
return true, fmt.Errorf("pod[%d] is occupied", port)
} else {
return false, nil
}
}

func (c *VirtualClusterInitController) findHostAddresses() ([]string, error) {
nodes, err := c.RootClientSet.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{
LabelSelector: env.GetControlPlaneLabel(),
})
if err != nil {
return nil, err
}

ret := []string{}

for _, node := range nodes.Items {
addr, err := findAddress(node)
if err != nil {
return nil, err
}

ret = append(ret, addr)
}
return ret, nil
}

// AllocateHostPort allocate host port for virtual cluster
Expand All @@ -526,11 +608,20 @@ func (c *VirtualClusterInitController) AllocateHostPort(virtualCluster *v1alpha1
if err != nil {
return 0, err
}

hostAddress, err := c.findHostAddresses()
if err != nil {
return 0, err
}

ports := func() []int32 {
ports := make([]int32, 0)
for _, p := range hostPool.PortsPool {
if !c.isPortAllocated(p) {
if !c.isPortAllocated(p, hostAddress) {
ports = append(ports, p)
if len(ports) > constants.VirtualClusterPortNum {
break
}
}
}
return ports
Expand Down

0 comments on commit a8c7eae

Please sign in to comment.