Skip to content

Commit eee9f43

Browse files
authored
Merge pull request kosmos-io#659 from OrangeBao/main
feat: adapt to the new node agent
2 parents 598e3aa + d845016 commit eee9f43

File tree

6 files changed

+43
-24
lines changed

6 files changed

+43
-24
lines changed

.gitignore

+6-1
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,9 @@ kube-config
3030

3131
__debug_bin*
3232

33-
ignore_dir
33+
ignore_dir
34+
35+
cmd/kubenest/node-agent/app.log
36+
cmd/kubenest/node-agent/cert.pem
37+
cmd/kubenest/node-agent/key.pem
38+
cmd/kubenest/node-agent/agent.env

cmd/kubenest/node-agent/app/serve/serve.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -155,12 +155,13 @@ func handleCheck(conn *websocket.Conn, params url.Values) {
155155
listener, err := net.Listen("tcp", address)
156156
if err != nil {
157157
log.Infof("port not avalible %s %v", address, err)
158-
_ = conn.WriteMessage(websocket.BinaryMessage, []byte("1"))
158+
_ = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, fmt.Sprintf("%d", 1)))
159159
return
160160
}
161161
defer listener.Close()
162162
log.Infof("port avalible %s", address)
163-
_ = conn.WriteMessage(websocket.BinaryMessage, []byte("0"))
163+
// _ = conn.WriteMessage(websocket.BinaryMessage, []byte("0"))
164+
_ = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, fmt.Sprintf("%d", 0)))
164165
}
165166

166167
func handleTty(conn *websocket.Conn, queryParams url.Values) {

hack/k8s-in-k8s/kubelet_node_helper.sh

+1-12
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ LOG_NAME=${2:-kubelet}
88
JOIN_HOST=$2
99
JOIN_TOKEN=$3
1010
JOIN_CA_HASH=$4
11-
CHECK_PORT=$2
1211

1312
function unjoin() {
1413
# before unjoin, you need delete node by kubectl
@@ -331,13 +330,6 @@ function version() {
331330
echo "$SCRIPT_VERSION"
332331
}
333332

334-
function port() {
335-
echo "check port(1/1): use netstat check port :$CHECK_PORT"
336-
output=$(netstat -ant | awk '{print $4}' | grep ":$CHECK_PORT" | wc -l)
337-
# Do not modify the fixed format
338-
echo "port:$CHECK_PORT/$output"
339-
}
340-
341333
# See how we were called.
342334
case "$1" in
343335
unjoin)
@@ -346,9 +338,6 @@ case "$1" in
346338
join)
347339
join
348340
;;
349-
port)
350-
port
351-
;;
352341
health)
353342
health
354343
;;
@@ -365,6 +354,6 @@ case "$1" in
365354
version
366355
;;
367356
*)
368-
echo $"usage: $0 unjoin|join|health|log|check|version|revert|port"
357+
echo $"usage: $0 unjoin|join|health|log|check|version|revert"
369358
exit 1
370359
esac

pkg/kubenest/controller/virtualcluster.node.controller/exector/exector.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ type ExectorReturn struct {
3030
Reason string
3131
LastLog string
3232
Text string
33+
Code int
3334
}
3435

3536
func (r *ExectorReturn) String() string {
36-
return fmt.Sprintf("%d, %s, %s", r.Status, r.Reason, r.LastLog)
37+
return fmt.Sprintf("%d, %s, %s, %d", r.Status, r.Reason, r.LastLog, r.Code)
3738
}
3839

3940
type Exector interface {
@@ -98,7 +99,7 @@ func (h *ExectorHelper) DoExector(stopCh <-chan struct{}, exector Exector) *Exec
9899
func (h *ExectorHelper) DoExectorReal(stopCh <-chan struct{}, exector Exector) *ExectorReturn {
99100
// default is error
100101
result := &ExectorReturn{
101-
FAILED, "init exector return status", "", "",
102+
FAILED, "init exector return status", "", "", 0,
102103
}
103104

104105
// nolint
@@ -133,6 +134,7 @@ func (h *ExectorHelper) DoExectorReal(stopCh <-chan struct{}, exector Exector) *
133134
result.Reason = "command not found"
134135
result.Text = cerr.Text
135136
}
137+
result.Code = cerr.Code
136138
} else {
137139
result.Reason = err.Error()
138140
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package exector
2+
3+
import (
4+
"github.com/gorilla/websocket"
5+
)
6+
7+
type CheckExector struct {
8+
Port string
9+
}
10+
11+
func (e *CheckExector) GetWebSocketOption() WebSocketOption {
12+
rawQuery := "port=" + e.Port
13+
return WebSocketOption{
14+
Path: "check/",
15+
RawQuery: rawQuery,
16+
}
17+
}
18+
19+
func (e *CheckExector) SendHandler(_ *websocket.Conn, _ <-chan struct{}, _ chan struct{}, _ *ExectorReturn) {
20+
}

pkg/kubenest/controller/virtualcluster_init_controller.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/base64"
66
"fmt"
77
"sort"
8-
"strings"
98
"sync"
109
"time"
1110

@@ -694,14 +693,14 @@ func findAddress(node corev1.Node) (string, error) {
694693
// Return false to indicate that the port is not occupied
695694
func CheckPortOnHost(addr string, port int32) (bool, error) {
696695
hostExectorHelper := exector.NewExectorHelper(addr, "")
697-
joinCmdStrCmd := &exector.CMDExector{
698-
Cmd: fmt.Sprintf("bash %s port %d", env.GetExectorShellName(), port),
696+
checkCmd := &exector.CheckExector{
697+
Port: fmt.Sprintf("%d", port),
699698
}
700699

701700
var ret *exector.ExectorReturn
702701
err := apiclient.TryRunCommand(func() error {
703-
ret = hostExectorHelper.DoExector(context.TODO().Done(), joinCmdStrCmd)
704-
if ret.Status != exector.SUCCESS {
702+
ret = hostExectorHelper.DoExector(context.TODO().Done(), checkCmd)
703+
if ret.Code != 1000 {
705704
return fmt.Errorf("chekc port failed, err: %s", ret.String())
706705
}
707706
return nil
@@ -711,9 +710,12 @@ func CheckPortOnHost(addr string, port int32) (bool, error) {
711710
klog.Errorf("check port on host error! addr:%s, port %d, err: %s", addr, port, err.Error())
712711
return true, err
713712
}
714-
klog.V(4).Infof("check port on host, addr: %s, port %d, result: %s", addr, port, ret.String())
715713

716-
return !strings.HasSuffix(ret.LastLog, fmt.Sprintf("port:%d/0", port)), nil
714+
if ret.Status != exector.SUCCESS {
715+
return true, fmt.Errorf("pod[%d] is occupied", port)
716+
} else {
717+
return false, nil
718+
}
717719
}
718720

719721
func (c *VirtualClusterInitController) findHostAddresses() ([]string, error) {

0 commit comments

Comments
 (0)